# 코드 작성

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col, count, sum
from functools import reduce # <-- 여러 DataFrame을 합칠 때 사용
import datetime
import pandas as pd

In [2]:
# SparkSession 생성
spark = SparkSession.builder \
        .appName("TLC 2024") \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/04 05:01:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
tlc_path = "TLC/2024/"

## RDD로 작업하기

In [4]:
# 월별 DataFrame을 저장할 리스트
rdd_list = []

In [5]:
# 1~11월 데이터를 불러오기
for month in range(1, 12):
    month_str = f"{month:02d}"
    file_path = f"{tlc_path}yellow_tripdata_2024-{month_str}.parquet"
    # file_path = f"{tlc_path}yellow_tripdata_2024-*.parquet" # <-- Spark는 와일드카드(*)를 지원함
    
    # 파일을 읽어 데이터프레임 생성
    df_month = spark.read.parquet(file_path)
    # 데이터프레임을 RDD로 변환
    rdd = df_month.rdd
    # print(type(rdd))
    rdd_list.append(rdd)
    
    print(f"Loaded data for month {month_str} from {file_path}")

Loaded data for month 01 from TLC/2024/yellow_tripdata_2024-01.parquet
Loaded data for month 02 from TLC/2024/yellow_tripdata_2024-02.parquet
Loaded data for month 03 from TLC/2024/yellow_tripdata_2024-03.parquet
Loaded data for month 04 from TLC/2024/yellow_tripdata_2024-04.parquet
Loaded data for month 05 from TLC/2024/yellow_tripdata_2024-05.parquet
Loaded data for month 06 from TLC/2024/yellow_tripdata_2024-06.parquet
Loaded data for month 07 from TLC/2024/yellow_tripdata_2024-07.parquet
Loaded data for month 08 from TLC/2024/yellow_tripdata_2024-08.parquet
Loaded data for month 09 from TLC/2024/yellow_tripdata_2024-09.parquet
Loaded data for month 10 from TLC/2024/yellow_tripdata_2024-10.parquet
Loaded data for month 11 from TLC/2024/yellow_tripdata_2024-11.parquet


In [6]:
type(rdd_list[0])

pyspark.rdd.RDD

In [7]:
# 모든 달의 RDD를 하나로 합치기 (스키마가 동일해야만 union 사용 가능)
if rdd_list:
    combined_rdd = reduce(lambda rdd1, rdd2: rdd1.union(rdd2), rdd_list)
    print("All monthly data combined Successfully")
else:
    print("Something's wrong with combining data")

All monthly data combined Successfully


In [8]:
# combined_rdd.printSchema()
combined_rdd.take(5)

[Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2024, 1, 1, 0, 57, 55), tpep_dropoff_datetime=datetime.datetime(2024, 1, 1, 1, 17, 43), passenger_count=1, trip_distance=1.72, RatecodeID=1, store_and_fwd_flag='N', PULocationID=186, DOLocationID=79, payment_type=2, fare_amount=17.7, extra=1.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=22.7, congestion_surcharge=2.5, Airport_fee=0.0),
 Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 1, 1, 0, 3), tpep_dropoff_datetime=datetime.datetime(2024, 1, 1, 0, 9, 36), passenger_count=1, trip_distance=1.8, RatecodeID=1, store_and_fwd_flag='N', PULocationID=140, DOLocationID=236, payment_type=1, fare_amount=10.0, extra=3.5, mta_tax=0.5, tip_amount=3.75, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=18.75, congestion_surcharge=2.5, Airport_fee=0.0),
 Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2024, 1, 1, 0, 17, 6), tpep_dropoff_datetime=datetime.datetime(2024, 1, 

## 요구사항:
**변환 논리**:
* 요금이 0이거나 음수인 여행을 제외하기 위해 필터링을 구현합니다.
* 데이터를 매핑하여 관련 열을 추출하고 이를 적절한 데이터 유형으로 변환합니다.
* 데이터를 줄여 총 수익과 총 여행 횟수를 계산합니다.
* 날짜별로 그룹화하여 일일 지표를 계산합니다.

**집계 논리**:
* 총 여행 횟수를 계산하여 표시합니다.
* 여행에서 발생한 총 수익을 계산하여 표시합니다.
* 평균 여행 거리를 계산하고 표시합니다.
* 하루당 여행 횟수를 계산하여 표시합니다.
* 하루별 총 수익을 계산하고 표시합니다.

**데이터 변환**:
* 데이터에 최소 5가지 변환을 수행합니다(예: filtering, mapping, reducing, joining, aggregating).

**데이터 집계**:
* 다음 측정 항목을 계산합니다.
* 총 여행 횟수.
* 총 수익(요금 금액의 합계)
* 평균 여행 거리.
* 하루 여행 횟수.
* 하루 총 수익.
---

2024년이 아닌 데이터를 배제하는 함수

In [9]:
def is_valid_2024(row):
    try:
        dt_val = row['tpep_pickup_datetime']
        if isinstance(dt_val, str):
            dt = datetime.strptime(dt_val, "%Y-%m-%d %H:%M:%S")
        else:
            dt = dt_val
        return dt.year==2024
    except Exception as e:
        return False

**요금이 0 이하인 trip을 제외하는 필터링:**

In [10]:
# 요금 0 필터링
filtered_rdd = combined_rdd.filter(lambda row: row['fare_amount'] is not None and row['fare_amount'] > 0)
filtered_rdd = filtered_rdd.filter(is_valid_2024)
# filtered_rdd.cache()

In [11]:
# (요금, 여행 거리, 운행 횟수)
fare_dist_total = filtered_rdd.map(lambda row: (row['fare_amount'], row['trip_distance'], 1)) \
            .reduce(lambda a, b: (a[0] + b[0], a[1] + b[1], a[2] + b[2]))
            
total_fare, total_dist, total_trips = fare_dist_total
avg_dist = total_dist / total_trips if total_trips else 0

print("총 여행 횟수: ", total_trips)
print("총 수익: ", total_fare)
print("평균 여행 거리", avg_dist)
            
# 각 행은 (fare_amount, dist, 1) 형태의 튜플로 변환됩니다.
# 나중에 reduce나 reduceByKey 같은 연산을 통해 **총 수익(sum of fare_amount)**과 
# **총 여행 거리**, **총 트립 수(count)**를 쉽게 계산하기 위함입니다.



총 여행 횟수:  36833515
총 수익:  733440925.7900434
평균 여행 거리 4.917688045248834


                                                                                

In [23]:
import json
total_data = {
    "Total trips" : total_trips,
    "Total fare"  : total_fare,
    "Avg trip distance" : avg_dist
}
with open('2024_total_trip_data.json', 'w') as f:
    json.dump(total_data, f, indent=4)

In [12]:
daily_rdd = filtered_rdd.map(lambda row: (
    row['tpep_pickup_datetime'].strftime("%Y-%m-%d"), # --> key (datetime)
    (row['fare_amount'], 1)) # --> value (fare, count)
)

daily_metrics_rdd = daily_rdd.reduceByKey(lambda a, b: (
                    a[0] + b[0], # fare amount 합계
                    a[1] + b[1], # trip count 합계
)).sortByKey()


                                                                                

In [14]:
daily_records = {
    "Date":[],
    "Trips":[],
    "Daily Fare":[]
}
for date, (total_fare, total_count) in daily_metrics_rdd.collect():
    # print(f"Date: {date} -> Trips: {total_count}, Total Fare: {round(total_fare, 2)}")
    daily_records['Date'].append(date)
    daily_records['Trips'].append(total_count)
    daily_records['Daily Fare'].append(total_fare)

                                                                                

|집계 요소| 값|
|---|---|
|총 여행 횟수|   36833515|
|총 수익|        733440925.79|
|평균 여행 거리| 4.918|


In [15]:
# 날짜별 기록을 parquet으로 저장
pydf = pd.DataFrame(daily_records)
spark_df = spark.createDataFrame(pydf)
spark_df.printSchema()
spark_df.show(truncate=False)

root
 |-- Date: string (nullable = true)
 |-- Trips: long (nullable = true)
 |-- Daily Fare: double (nullable = true)

+----------+------+------------------+
|Date      |Trips |Daily Fare        |
+----------+------+------------------+
|2024-01-01|79684 |1792070.620000007 |
|2024-01-02|74398 |1605948.7100000128|
|2024-01-03|81266 |1644556.7400000033|
|2024-01-04|101573|1922241.8000000191|
|2024-01-05|101821|1864732.1499999792|
|2024-01-06|95799 |1705149.3000000194|
|2024-01-07|66551 |1336990.6299999945|
|2024-01-08|78959 |1517613.6500000139|
|2024-01-09|92795 |1606340.0000000135|
|2024-01-10|93942 |1722422.719999995 |
|2024-01-11|103766|1972963.2500000126|
|2024-01-12|102397|1945405.7800000096|
|2024-01-13|103394|1835294.6900000146|
|2024-01-14|93129 |1712697.2699999884|
|2024-01-15|76075 |1506477.6700000195|
|2024-01-16|91978 |1804007.8599999945|
|2024-01-17|109048|2032234.6000000231|
|2024-01-18|109059|1998752.9600000135|
|2024-01-19|94707 |1673888.970000008 |
|2024-01-20|107330|1822

In [21]:
spark_df.coalesce(1).write.mode("overwrite").csv("./output/2024_daily_data")

In [24]:
spark.stop()