In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, hour, avg, count, unix_timestamp
import pandas as pd
import matplotlib.pyplot as plt

# Spark 세션 생성

In [4]:
spark = SparkSession.builder \
    .appName("NYC Taxi Analysis") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .getOrCreate()

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

# Parquet 파일 로드

In [3]:
file_path = "data/yellow_tripdata_2024-01.parquet"
df = spark.read.parquet(file_path)

NameError: name 'spark' is not defined

# 데이터 프레임 스키마 확인

In [None]:
df.printSchema()

# 누락된 값 제거

In [6]:
df = df.dropna()

NameError: name 'df' is not defined

# 전처리

In [None]:
df = df.withColumn("pickup_datetime", to_timestamp(col("tpep_pickup_datetime"))).withColumn("dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))

df = df.filter((col("trip_distance") > 0) & (col("tpep_dropoff_datetime") > col("tpep_pickup_datetime")))

# 여행 시간

In [None]:
df = df.withColumn("trip_duration", (unix_timestamp(col("tpep_dropoff_datetime")) - unix_timestamp(col("tpep_pickup_datetime"))) / 60)

# 평균 여행 시간, 거리

In [7]:
avg_trip_duration = df.agg(avg("trip_duration")).first()[0]
avg_trip_distance = df.agg(avg("trip_distance")).first()[0]

NameError: name 'df' is not defined

In [None]:
avg_trip_duration

In [None]:
avg_trip_distance

# 시간대 추출

In [None]:
df = df.withColumn("hour", hour(col("pickup_datetime")))

# 시간대별 여행 수 계산

In [None]:
hourly_trips = df.groupBy("hour").agg(count("*").alias("trip_count")).orderBy("hour")

# 시각화

In [None]:
hourly_trips_pd = hourly_trips.toPandas()

plt.figure(figsize=(12, 6))
plt.bar(hourly_trips_pd["hour"], hourly_trips_pd["trip_count"])
plt.xlabel("Hour of the Day")
plt.ylabel("Number of Trips")
plt.title("Number of Trips per Hour of the Day")
plt.xticks(range(0, 24))
plt.grid(True)
plt.show()

# 저장

In [None]:
output_path = "results/hourly_trips.csv"
hourly_trips_pd.to_csv(output_path, index=False)