In [None]:
import findspark
import os

os.environ['SPARK_HOME'] = '/opt/homebrew/Cellar/apache-spark/4.0.0/libexec'
os.environ['JAVA_HOME'] = '/opt/homebrew/Cellar/openjdk@17/17.0.16/'

findspark.init()

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("m2spark") \
    .master("local[*]") \
    .config('spark.driver.host', '127.0.0.1') \
    .config('spark.executor.memory', '16g') \
    .config('spark.executor.cores', '8') \
    .getOrCreate()

In [None]:
spark

# 1. 데이터 로딩 및 샘플링

In [None]:
from pyspark.sql.functions import col, unix_timestamp, expr, rand, mean, round

FILE_NAME_HEADER = "shared/data/fhv_tripdata_2024_1278/fhvhv_tripdata_2024-"
months = ['01', '01', '07', '08']
fraction = 0.1
sampled_df_all = None

In [None]:
for month in months:
    file_path = f"{FILE_NAME_HEADER}{month}.parquet"
    df = spark.read.parquet(file_path)
    
    sampled_df = df.sample(False, fraction, seed=42)
    
    if sampled_df_all is None:
        sampled_df_all = sampled_df
    else:
        sampled_df_all = sampled_df_all.union(sampled_df)

In [None]:
# sampled_df_all.show()

In [None]:
# 2. 불필요한 열 제거

columns_to_drop = [
    'dispatching_base_num', 'originating_base_num', 'shared_request_flag',
    'shared_match_flag', 'access_a_ride_flag', 'wav_request_flag', 'wav_match_flag'
]
df_clean = sampled_df_all.drop(*columns_to_drop)

# 3. 시간 조건 필터링
df_clean = df_clean \
    .withColumn("pickup_ts", unix_timestamp("pickup_datetime")) \
    .withColumn("dropoff_ts", unix_timestamp("dropoff_datetime")) \
    .withColumn("scene_ts", unix_timestamp("on_scene_datetime")) \
    .withColumn("request_ts", unix_timestamp("request_datetime"))

df_clean = df_clean.filter((col("dropoff_ts") > col("pickup_ts")) &
                           (col("scene_ts") > col("request_ts")) &
                           (col("base_passenger_fare") > 0) &
                           (col("driver_pay") >= 0))

# scene_time 계산
df_clean = df_clean.withColumn("scene_time", expr("pickup_ts - scene_ts"))

# ts 열 제거
df_clean = df_clean.drop("pickup_ts", "dropoff_ts", "scene_ts", "request_ts")

# 분단위 시간 변환
# 반올림
df_clean = df_clean.withColumn("scene_time", round(expr("scene_time / 60")))
df_clean = df_clean.withColumn("trip_time", round(expr("trip_time / 60")))


In [None]:
# df_clean.show(5)

In [None]:
# 날짜 정보 추출
from pyspark.sql.functions import year, month, dayofmonth, hour
df_with_date = df_clean \
    .withColumn("year", year("request_datetime")) \
    .withColumn("month", month("request_datetime")) \
    .withColumn("day", dayofmonth("request_datetime")) \
    .withColumn("hour", hour("request_datetime"))

# 계절 정보 추가
df_with_date = df_with_date.withColumn(
    "season",
    expr("""
        CASE
            WHEN month = 1 OR month = 2 THEN 'Winter'
            WHEN month = 7 OR month = 8 THEN 'Summer'
        END
    """)
)

# 택시 라이센스 타입 추가
df_with_date = df_with_date.withColumn(
    "service_type",
    expr("""
        CASE
            WHEN hvfhs_license_num='HV0003' THEN 'Uber'
            WHEN hvfhs_license_num='HV0004' THEN 'Lyft'
        END
    """)
)

In [None]:
# df_with_date.show(5)

# 4. 지역 정보 로드 및 조인 (CSV -> Spark DataFrame)

In [None]:
taxi_zone = spark.read.option("header", True).csv(
    "/Users/admin/softeer_de_wiki/mission/W4/m2/shared/data/taxi_zone_lookup.csv"
)

from pyspark.sql.functions import col

# 1. PULocationID에 대한 join
# df_with_date와 taxi_zone에 각각 별칭(alias)을 부여합니다.
pu_zones = taxi_zone.alias("pu_zones")

df_with_pu = df_with_date.join(
    pu_zones,
    df_with_date.PULocationID == col("pu_zones.LocationID"),
    how="left"
).select(
    df_with_date["*"],
    col("pu_zones.Borough").alias("PULocation") # 별칭을 사용해 명확히 지정
)

# 2. DOLocationID에 대한 join
# taxi_zone에 다시 새로운 별칭을 부여합니다.
do_zones = taxi_zone.alias("do_zones")

df_with_do = df_with_pu.join(
    do_zones,
    df_with_pu.DOLocationID == col("do_zones.LocationID"),
    how="left"
).select(
    df_with_pu["*"],
    col("do_zones.Borough").alias("DOLocation") # 별칭을 사용해 명확히 지정
)

# drop locationid
df_zone = df_with_do.drop("PULocationID", "DOLocationID", "LocationID")

In [None]:
# df_zone.show(5)

# 5. 분석 또는 저장용 처리

In [None]:
# df_zone.cache()  # 후속 분석을 위해 캐싱

# 날씨 데이터 처리

In [None]:
# 날씨 데이터 로드
weather_path_header = "/Users/admin/softeer_de_wiki/mission/W4/m2/shared/data/2024_weather/"
weather_df = spark.read.option("header", True).csv(weather_path_header + "*.csv")

In [None]:
from pyspark.sql.functions import col, regexp_replace, sum as spark_sum, round, expr
from pyspark.sql.functions import year, month, dayofmonth

for col_name in ['precipitation1', 'precipitation2', 'precipitation3']:
    weather_df = weather_df.withColumn(col_name, regexp_replace(col(col_name), 'T', '0').cast("float"))

weather_df = weather_df.withColumn(
    "precipitation",
    col("precipitation1") + col("precipitation2") + col("precipitation3")
)

weather_df = weather_df \
    .withColumn("max", round((col("max") - 32) * 5 / 9, 1)) \
    .withColumn("min", round((col("min") - 32) * 5 / 9, 1))

weather_df = weather_df.drop("precipitation1", "precipitation2", "precipitation3")


df_final = df_zone.join(
    weather_df,
    on=["year", "month", "day"],
    how="left"
)

In [None]:
# df_zone.show(5)

In [None]:
df_final.select("year", "month", "hour", "season", "precipitation", "max_C", "min_C")