In [62]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, to_date
from pyspark import StorageLevel

spark = (
    SparkSession.builder
    .appName("W5M2-NYC-Taxi-Weather-RDD")
    .getOrCreate()
)

In [63]:
# trip 데이터 로드 및 전처리
trip_df = (
    spark.read.parquet("/data/yellow_tripdata_2024-01.parquet")
    .withColumn("pickup_date", to_date("tpep_pickup_datetime"))        # 날짜 컬럼 생성
    .filter((col("fare_amount") > 0) & (col("trip_distance") > 0))     # 이상치 제거
)

# DF -> RDD
trip_rdd = (
    trip_df
    .select("pickup_date", "fare_amount")   
    .rdd
    .map(lambda r: (r["pickup_date"], (1, r["fare_amount"])))          
    .persist(StorageLevel.MEMORY_AND_DISK)
)
_ = trip_rdd.count()        # Action을 통해 캐시를 메모리에 확정적으로 올림

# 한 번의 reduceByKey 로 일별 운행 수 및 수익 동시 집계
agg_rdd = trip_rdd.reduceByKey(
    lambda a, b: (a[0] + b[0], a[1] + b[1])
)

                                                                                

In [64]:
# 날씨 데이터 로드 및 RDD 변환
weather_rdd = (
    spark.read.option("header", True)
        .csv("/data/NYC_weather.csv", inferSchema=True)
        .withColumn("weather_date", to_date("datetime"))
        .select("weather_date", "temp")
        .rdd
        .map(lambda r: (r["weather_date"], r["temp"]))
)
# 날짜 기준 Join
joined = agg_rdd.join(weather_rdd)

In [65]:
# 최종 Row 변환 및 저장
result_df = (
    joined
    .map(lambda x: Row(
        date=str(x[0]),
        total_trips=x[1][0][0],
        total_revenue=x[1][0][1],
        avg_temperature=x[1][1]
    ))
    .toDF()
)

result_df.write.mode("overwrite")\
        .csv("/output/rdd_weather_summary", header=True)

                                                                                

In [66]:
summary_df = spark.createDataFrame(
    [
        Row(metric="total_trips", value=float(result_df.agg({"total_trips": "sum"}).first()[0])),
        Row(metric="total_revenue", value=float(result_df.agg({"total_revenue": "sum"}).first()[0])),
        Row(metric="avg_distance", value=float(
            trip_df.select("trip_distance").agg({"trip_distance": "avg"}).first()[0]
        )),
    ]
)
summary_df.write.mode("overwrite")\
        .csv("/output/rdd_summary", header=True)

In [61]:
spark.stop()