In [1]:
import os
os.environ["SPARK_HOME"] = "/home/hadoop/spark"

import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName = "tomato")

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

24/06/06 19:38:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/06 19:38:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
import os

# 새로운 SparkSession 생성
spark = SparkSession.builder \
    .appName("Tomato Data Analysis") \
    .getOrCreate()

# 경로 설정
data_path="tomato/"
# CSV 파일 리스트
csv_files = [
    "TOMATO_FLOWER_CLUSTER_HEIGHT_ENV_20231123.csv",
    "TOMATO_FLOWER_NUM_ENV_20231123.csv",
    "TOMATO_FLOWER_PER_TRUSS_ENV_20231123.csv",
    "TOMATO_FRUIT_LEN_ENV_20231123.csv",
    "TOMATO_FRUIT_PER_TRUSS_ENV_20231123.csv",
    "TOMATO_FRUIT_SETTING_ENV_20231123.csv",
    "TOMATO_FRUIT_WEIGHT_ENV_20231123.csv",
    "TOMATO_FRUIT_WIDTH_ENV_20231123.csv",
    "TOMATO_GROWTH_LENGTH_ENV_20231123.csv",
    "TOMATO_HARVEST_ENV_20231123.csv",
    "TOMATO_HARVEST_ENV_20231204.csv",
    "TOMATO_HARVEST_PER_TRUSS_ENV_20231123.csv",
    "TOMATO_LAST_FLOWERING_BUD_ENV_20231123.csv",
    "TOMATO_LEAF_LEN_ENV_20231123.csv",
    "TOMATO_LEAF_NUM_ENV_20231123.csv",
    "TOMATO_LEAF_WIDTH_ENV_20231123.csv",
    "TOMATO_SOIL_SURFACE_LEN_ENV_20231123.csv",
    "TOMATO_STEM_THICKNESS_ENV_20231123.csv"
]

# 모든 파일에서 열 이름 수집
all_columns = set()
for file in csv_files:
    df = spark.read.csv(data_path+file, header=True, inferSchema=True)
    all_columns.update(df.columns)

all_columns = list(all_columns)
all_columns.remove("MSRM_DT")  # MSRM_DT는 중복으로 처리하지 않음
all_columns = ["MSRM_DT"] + all_columns

# 각 파일을 읽어와서 데이터 채우기
dataframes = []
for file in csv_files:
    df = spark.read.csv(os.path.join(data_path, file), header=True, inferSchema=True)
    df = df.drop("ZONE_NM")  # ZONE_NM 열 제거
    for col in all_columns:
        if col not in df.columns:
            df = df.withColumn(col, F.lit(None).cast(T.StringType()))
    dataframes.append(df.select(all_columns))

# 모든 데이터프레임 병합
merged_df = dataframes[0]
for df in dataframes[1:]:
    merged_df = merged_df.union(df)

# 중복된 열에 대한 평균 계산
agg_exprs = []
for col in merged_df.columns:
    if col != "MSRM_DT":
        agg_exprs.append(F.mean(col).alias(col))

final_df = merged_df.groupBy("MSRM_DT").agg(*agg_exprs)
final_df_sorted = final_df.orderBy("MSRM_DT")

# 통합된 데이터 저장
output_path = "/home/hadoop/bigdata_project/test_merge_tomato_data.csv"
final_df_sorted.coalesce(1).write.csv(output_path, header=True, mode='overwrite')

24/06/06 17:02:12 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
                                                                                

In [2]:
import pandas as pd

# 파일 경로 설정
file_path = "bigdata_project/test_merge_tomato_data.csv/part-00000-6fe08533-7d31-4459-a9a1-2a81375fa6ad-c000.csv"

# pandas를 사용하여 CSV 파일 불러오기
df = pd.read_csv(file_path)

# 상위 5개 행 출력
print(df.head())


                         MSRM_DT  ABSLT_HMDT  INNER_TPRT_1  FRST_CLUSTER  \
0  2022-10-07T10:15:00.000+09:00       13.80         23.55           NaN   
1  2022-10-07T10:20:00.000+09:00       13.95         23.50           NaN   
2  2022-10-07T10:25:00.000+09:00       13.90         23.35           NaN   
3  2022-10-07T10:30:00.000+09:00       14.15         23.00           NaN   
4  2022-10-07T10:35:00.000+09:00       13.60         24.40           NaN   

   CLR_OPRT_YN_2  SPRYN_DEVICE  FRST_TREE_CNT  TRWVLV_OPDR_RATE_2  \
0            0.0           1.0            NaN               100.0   
1            0.0           0.0            NaN               100.0   
2            0.0           0.0            NaN               100.0   
3            0.0           0.0            NaN               100.0   
4            0.0           1.0            NaN               100.0   

   SKLT_OPMD_2_LEFT  SKLT_OPMD_1_LEFT  ...  LEAF_LNGTH  RTTN_PUMP_OPRT_YN_2  \
0               0.0               1.0  ...       