In [1]:
#export
from pyspark.sql.functions import *
from pyspark.sql import *

Calculation started (calculation_id=decb17b4-8073-5433-aa1b-262bbfb0b28b) in (session=1acb17b4-7243-8e54-447a-958e0328119a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [9]:
input_path  = "s3://cse6242-nyc-trip/yellow_tripdata_202*.parquet"

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

test_df = spark.read.parquet(input_path)

df = df.withColumn("PULocationID", col("PULocationID").cast("int")) \
                 .withColumn("DOLocationID", col("DOLocationID").cast("int"))

columns = df.columns
print(columns)

Calculation started (calculation_id=40cb16b8-ac4f-2fb4-0ece-5d7e281df11d) in (session=1ecb16b1-e1a9-baa7-f760-e157ed47fab0). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']



In [7]:
from pyspark.sql.functions import col, year, month, dayofweek, count, sum, avg

# Set Spark configurations.
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.parquet.mergeSchema", "false")

# Read the Parquet files from S3.
input_path = "s3://cse6242-nyc-trip/yellow_tripdata_2024-*.parquet"
df = spark.read.parquet(input_path)

# Ensure consistent types by casting location IDs.
df = df.withColumn("PULocationID", col("PULocationID").cast("int")) \
       .withColumn("DOLocationID", col("DOLocationID").cast("int"))

# Optionally cast fare and tip amounts if needed.
# df = df.withColumn("fare_amount", col("fare_amount").cast("double")) \
#        .withColumn("tip_amount", col("tip_amount").cast("double"))

# 1. Aggregate trips by year and pickup location.
pickup_year_df = df.withColumn("year", year(col("tpep_pickup_datetime"))) \
    .groupBy("year", "PULocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 2. Aggregate trips by year and dropoff location.
dropoff_year_df = df.withColumn("year", year(col("tpep_dropoff_datetime"))) \
    .groupBy("year", "DOLocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 3. Aggregate trips by year, month, and pickup location.
pickup_year_month_df = df.withColumn("year", year(col("tpep_pickup_datetime"))) \
    .withColumn("month", month(col("tpep_pickup_datetime"))) \
    .groupBy("year", "month", "PULocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 4. Aggregate trips by year, month, and dropoff location.
dropoff_year_month_df = df.withColumn("year", year(col("tpep_dropoff_datetime"))) \
    .withColumn("month", month(col("tpep_dropoff_datetime"))) \
    .groupBy("year", "month", "DOLocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 5. Aggregate trips by year, month, day-of-week, and pickup location.
pickup_year_month_day_df = df.withColumn("year", year(col("tpep_pickup_datetime"))) \
    .withColumn("month", month(col("tpep_pickup_datetime"))) \
    .withColumn("dayOfWeek", dayofweek(col("tpep_pickup_datetime"))) \
    .groupBy("year", "month", "dayOfWeek", "PULocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 6. Aggregate trips by year, month, day-of-week, and dropoff location.
dropoff_year_month_day_df = df.withColumn("year", year(col("tpep_dropoff_datetime"))) \
    .withColumn("month", month(col("tpep_dropoff_datetime"))) \
    .withColumn("dayOfWeek", dayofweek(col("tpep_dropoff_datetime"))) \
    .groupBy("year", "month", "dayOfWeek", "DOLocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# Define the output S3 paths (update these paths as required)
pickup_year_output_path            = "s3://cse6242-nyc-trip/pickup_year_2024.csv"
dropoff_year_output_path           = "s3://cse6242-nyc-trip/dropoff_year_2024.csv"
pickup_year_month_output_path      = "s3://cse6242-nyc-trip/pickup_year_month_2024.csv"
dropoff_year_month_output_path     = "s3://cse6242-nyc-trip/dropoff_year_month_2024.csv"
pickup_year_month_day_output_path  = "s3://cse6242-nyc-trip/pickup_year_month_day_2024.csv"
dropoff_year_month_day_output_path = "s3://cse6242-nyc-trip/dropoff_year_month_day_2024.csv"

# Write the results as CSV files with headers. (No overwrite mode is specified.)
pickup_year_df.write.mode("overwrite").option("header", "true").csv(pickup_year_output_path)
dropoff_year_df.write.mode("overwrite").option("header", "true").csv(dropoff_year_output_path)
pickup_year_month_df.write.mode("overwrite").option("header", "true").csv(pickup_year_month_output_path)
dropoff_year_month_df.write.mode("overwrite").option("header", "true").csv(dropoff_year_month_output_path)
pickup_year_month_day_df.write.mode("overwrite").option("header", "true").csv(pickup_year_month_day_output_path)
dropoff_year_month_day_df.write.mode("overwrite").option("header", "true").csv(dropoff_year_month_day_output_path)


Calculation started (calculation_id=aecb17c3-10f3-7541-3ef3-999f821554f5) in (session=1acb17b4-7243-8e54-447a-958e0328119a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [8]:
from pyspark.sql.functions import col, year, month, dayofweek, count, sum, avg

# Set Spark configurations.
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
spark.conf.set("spark.sql.parquet.mergeSchema", "false")

# Read the Parquet files from S3.
input_path = "s3://cse6242-nyc-trip/yellow_tripdata_2023-*.parquet"
df = spark.read.parquet(input_path)

# Ensure consistent types by casting location IDs.
df = df.withColumn("PULocationID", col("PULocationID").cast("int")) \
       .withColumn("DOLocationID", col("DOLocationID").cast("int"))

# Optionally cast fare and tip amounts if needed.
# df = df.withColumn("fare_amount", col("fare_amount").cast("double")) \
#        .withColumn("tip_amount", col("tip_amount").cast("double"))

# 1. Aggregate trips by year and pickup location.
pickup_year_df = df.withColumn("year", year(col("tpep_pickup_datetime"))) \
    .groupBy("year", "PULocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 2. Aggregate trips by year and dropoff location.
dropoff_year_df = df.withColumn("year", year(col("tpep_dropoff_datetime"))) \
    .groupBy("year", "DOLocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 3. Aggregate trips by year, month, and pickup location.
pickup_year_month_df = df.withColumn("year", year(col("tpep_pickup_datetime"))) \
    .withColumn("month", month(col("tpep_pickup_datetime"))) \
    .groupBy("year", "month", "PULocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 4. Aggregate trips by year, month, and dropoff location.
dropoff_year_month_df = df.withColumn("year", year(col("tpep_dropoff_datetime"))) \
    .withColumn("month", month(col("tpep_dropoff_datetime"))) \
    .groupBy("year", "month", "DOLocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 5. Aggregate trips by year, month, day-of-week, and pickup location.
pickup_year_month_day_df = df.withColumn("year", year(col("tpep_pickup_datetime"))) \
    .withColumn("month", month(col("tpep_pickup_datetime"))) \
    .withColumn("dayOfWeek", dayofweek(col("tpep_pickup_datetime"))) \
    .groupBy("year", "month", "dayOfWeek", "PULocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# 6. Aggregate trips by year, month, day-of-week, and dropoff location.
dropoff_year_month_day_df = df.withColumn("year", year(col("tpep_dropoff_datetime"))) \
    .withColumn("month", month(col("tpep_dropoff_datetime"))) \
    .withColumn("dayOfWeek", dayofweek(col("tpep_dropoff_datetime"))) \
    .groupBy("year", "month", "dayOfWeek", "DOLocationID") \
    .agg( count("*").alias("trip_count"),
          sum("fare_amount").alias("total_fare"),
          sum("tip_amount").alias("total_tip"),
          avg("fare_amount").alias("avg_fare"),
          avg("tip_amount").alias("avg_tip") )

# Define the output S3 paths (update these paths as required)
pickup_year_output_path            = "s3://cse6242-nyc-trip/pickup_year_2023.csv"
dropoff_year_output_path           = "s3://cse6242-nyc-trip/dropoff_year_2023.csv"
pickup_year_month_output_path      = "s3://cse6242-nyc-trip/pickup_year_month_2023.csv"
dropoff_year_month_output_path     = "s3://cse6242-nyc-trip/dropoff_year_month_2023.csv"
pickup_year_month_day_output_path  = "s3://cse6242-nyc-trip/pickup_year_month_day_2023.csv"
dropoff_year_month_day_output_path = "s3://cse6242-nyc-trip/dropoff_year_month_day_2023.csv"

# Write the results as CSV files with headers. (No overwrite mode is specified.)
pickup_year_df.write.mode("overwrite").option("header", "true").csv(pickup_year_output_path)
dropoff_year_df.write.mode("overwrite").option("header", "true").csv(dropoff_year_output_path)
pickup_year_month_df.write.mode("overwrite").option("header", "true").csv(pickup_year_month_output_path)
dropoff_year_month_df.write.mode("overwrite").option("header", "true").csv(dropoff_year_month_output_path)
pickup_year_month_day_df.write.mode("overwrite").option("header", "true").csv(pickup_year_month_day_output_path)
dropoff_year_month_day_df.write.mode("overwrite").option("header", "true").csv(dropoff_year_month_day_output_path)


Calculation started (calculation_id=becb17c3-7709-248f-f4c5-c3c843e2ade1) in (session=1acb17b4-7243-8e54-447a-958e0328119a). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [5]:
filtered_df = df.select("PULocationID", "DOLocationID", "fare_amount" )

filtered_df.show()

Calculation started (calculation_id=88cb1653-08d1-a50e-52c2-8ae533cacdbb) in (session=f2cb1650-a0d1-8460-fab5-ce0c232c7978). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+------------+------------+-----------+
|PULocationID|DOLocationID|fare_amount|
+------------+------------+-----------+
|         161|         141|        9.3|
|          43|         237|        7.9|
|          48|         238|       14.9|
|         138|           7|       12.1|
|         107|          79|       11.4|
|         161|         137|       12.8|
|         239|         143|       12.1|
|         142|         200|       45.7|
|         164|         236|       17.7|
|         141|         107|       14.9|
|         234|          68|       11.4|
|          79|         264|       33.8|
|         164|         143|       26.1|
|         138|          33|       44.3|
|          33|          61|       17.7|
|          79|         186|       10.0|
|          90|          48|       19.8|
|         113|         255|       20.5|
|         237|         239|        8.6|
|         143|         229|       15.6|
+------------+------------+-----------+
only showing top 

In [14]:
count = df.count()
print(count)

Calculation started (calculation_id=8ecb165a-4980-6ee7-8ba4-7253f569b4d3) in (session=f2cb1650-a0d1-8460-fab5-ce0c232c7978). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
79479946



In [5]:
coordinate_lookup = spark.read.csv("s3://cse6242-nyc-trip/taxi_zone_lookup.csv")
coordinate_lookup.show()

Calculation started (calculation_id=c6caf1cd-d9d7-0fcc-eb4a-ee60f8f5c687) in (session=50caf1c8-00d9-77a0-ed03-8970b33022d7). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.
+----------+-------------+--------------------+------------+
|       _c0|          _c1|                 _c2|         _c3|
+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|

In [8]:
# Calculate tip percentage safely by checking fare_amount != 0 to avoid division by zero.
df_with_tip_pct = df.withColumn(
    "tip_pct", 
    when(col("fare_amount") != 0, col("tip_amount") / col("fare_amount") * 100).otherwise(0)
)

# Group by pickup and dropoff location IDs and compute average tip percentage
avg_tip_df = df_with_tip_pct.groupBy("PULocationID", "DOLocationID") \
    .agg(avg("tip_pct").alias("avg_tip_pct"))

# Sort by average tip percentage in descending order
sorted_tip_df = avg_tip_df.orderBy(desc("avg_tip_pct"))

# Define output path (modify accordingly)
output_path = "s3://cse6242-nyc-trip/nyc_avg_tip_pct.csv"

# Write to CSV (ensure it's a single file)
sorted_tip_df.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

Calculation started (calculation_id=16caf1d4-006d-e853-1f45-60a062ec6ca3) in (session=50caf1c8-00d9-77a0-ed03-8970b33022d7). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [8]:
# Merge to assign neighborhood names to the pickup locations
avg_tip_zones = avg_tip_pd.merge(coordinate_lookup, left_on="PULocationID", right_on="LocationID", how="left")

# Further aggregate by neighborhood (Zone) to get the average tip percentage per neighborhood
avg_tip_by_zone = avg_tip_zones.groupby("Zone", as_index=False)["avg_tip_percentage"].mean()


Calculation started (calculation_id=9acaf1b1-4942-d448-f402-b2b04d7f7972) in (session=44caf1ab-b63d-2a66-7f18-6a0956d04f39). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation 9acaf1b1-4942-d448-f402-b2b04d7f7972 failed


  File "<stdin>", line 1, in <module>
  File "/opt/amazon/python3.9/lib/python3.9/site-packages/pandas/core/frame.py", line 9351, in merge
    return merge(
  File "/opt/amazon/python3.9/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 107, in merge
    op = _MergeOperation(
  File "/opt/amazon/python3.9/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 629, in __init__
    _right = _validate_operand(right)
  File "/opt/amazon/python3.9/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 2285, in _validate_operand
    raise TypeError(
TypeError: Can only merge Series or DataFrame objects, a <class 'pyspark.sql.dataframe.DataFrame'> was passed



In [10]:
# Convert the Spark DataFrame to Pandas.
# Warning: Ensure that the data size is manageable in memory or sample a subset.
pdf = df.toPandas()

# -------------------------------
# Step 2: Prepare Time-related Columns
# -------------------------------
# Ensure the pickup datetime column is in datetime format.
pdf['tpep_pickup_datetime'] = pd.to_datetime(pdf['tpep_pickup_datetime'])

# Create additional time-related columns.
pdf['date'] = pdf['tpep_pickup_datetime'].dt.date
pdf['week'] = pdf['tpep_pickup_datetime'].dt.isocalendar().week
pdf['day_of_week'] = pdf['tpep_pickup_datetime'].dt.weekday  # Monday=0, Sunday=6
pdf['hour'] = pdf['tpep_pickup_datetime'].dt.hour
pdf['month'] = pdf['tpep_pickup_datetime'].dt.month
pdf['day'] = pdf['tpep_pickup_datetime'].dt.day

# -------------------------------
# Step 3: Daily Calendar Heatmap (Week vs. Day)
# -------------------------------
# Aggregate taxi counts by ISO week and day-of-week.
daily_counts = pdf.groupby(['week', 'day_of_week']).size().reset_index(name='count')
daily_pivot = daily_counts.pivot(index='week', columns='day_of_week', values='count').fillna(0)

# Define labels for days of the week.
day_labels = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']

fig_daily = px.imshow(
    daily_pivot,
    labels=dict(x="Day of Week", y="ISO Week", color="Trip Count"),
    x=day_labels,
    title="Daily Taxi Activity (Calendar Heatmap: Week vs Day)"
)
fig_daily.update_xaxes(side="top")
fig_daily.show()

# -------------------------------
# Step 4: Weekly Heatmap (Day vs. Hour)
# -------------------------------
# Aggregate taxi counts by day-of-week and hour.
dow_hour = pdf.groupby(['day_of_week', 'hour']).size().reset_index(name='count')
dow_hour_pivot = dow_hour.pivot(index='day_of_week', columns='hour', values='count').fillna(0)

# Set the row index labels to day names.
dow_hour_pivot.index = day_labels

fig_weekly = px.imshow(
    dow_hour_pivot,
    labels=dict(x="Hour of Day", y="Day of Week", color="Trip Count"),
    title="Weekly Taxi Activity (Day vs Hour Heatmap)"
)
fig_weekly.update_xaxes(side="top")
fig_weekly.show()

# -------------------------------
# Step 5: Monthly Calendar Heatmap (Month vs. Day)
# -------------------------------
# Aggregate taxi counts by month and day-of-month.
monthly_counts = pdf.groupby(['month', 'day']).size().reset_index(name='count')
monthly_pivot = monthly_counts.pivot(index='month', columns='day', values='count').fillna(0)

# Create a list of month abbreviations.
month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 
               'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
# Replace numeric month index with month names.
monthly_pivot.index = [month_names[i-1] for i in monthly_pivot.index]

fig_monthly = px.imshow(
    monthly_pivot,
    labels=dict(x="Day of Month", y="Month", color="Trip Count"),
    title="Monthly Taxi Activity (Month vs Day Heatmap)"
)
fig_monthly.update_xaxes(side="top")
fig_monthly.show()

Calculation started (calculation_id=1ecaf1b5-36ae-6a1e-45cf-4db1df778c0b) in (session=44caf1ab-b63d-2a66-7f18-6a0956d04f39). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation 1ecaf1b5-36ae-6a1e-45cf-4db1df778c0b failed


  File "<stdin>", line 1, in <module>
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/conf.py", line 36, in set
    self._jconf.setAthenaRestricted(key, value)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Cannot modify the value of a Spark config: spark.driver.maxResultSize



In [13]:
# Install necessary libraries if not already installed
# !pip install pandas seaborn matplotlib pyarrow

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import glob  # To read multiple parquet files

# Set plot style
sns.set_theme(style="whitegrid")

# Step 1: Load all Parquet files from the Dataset folder
parquet_files = glob.glob("s3://cse6242-nyc-trip/*.parquet")  # List all parquet files
df_list = [pd.read_parquet(file) for file in parquet_files]  # Read each parquet file into DataFrame
df = pd.concat(df_list, ignore_index=True)  # Concatenate all DataFrames

# Step 2: Load zone.csv
zone_df = pd.read_csv("s3://cse6242-nyc-trip/taxi_zones_geocoded.csv")


df['trip_duration_min'] = (df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']).dt.total_seconds() / 60

# Step 3: Merge the data if there is a common column (assuming 'zone_id' is the common key)
df = df.merge(zone_df, left_on="PULocationID", right_on="LocationID", how="left")

heatmap_data = df.pivot_table(index="PULocationID", columns="LocationID", values="trip_duration_min", aggfunc="mean")

# Plot the heatmap
plt.figure(figsize=(12, 6))
sns.heatmap(heatmap_data, cmap="coolwarm", annot=False, linewidths=0.5)

# Step 5: Show the heatmap
plt.title("Heatmap of Average Trip Duration by Pickup Location")
plt.show()

Calculation started (calculation_id=dccaf1bd-9a69-52fb-e8d6-4b4d0be046ac) in (session=44caf1ab-b63d-2a66-7f18-6a0956d04f39). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation dccaf1bd-9a69-52fb-e8d6-4b4d0be046ac failed


  File "<stdin>", line 12, in <module>
  File "/opt/amazon/python3.9/lib/python3.9/site-packages/pandas/util/_decorators.py", line 311, in wrapper
    return func(*args, **kwargs)
  File "/opt/amazon/python3.9/lib/python3.9/site-packages/pandas/core/reshape/concat.py", line 347, in concat
    op = _Concatenator(
  File "/opt/amazon/python3.9/lib/python3.9/site-packages/pandas/core/reshape/concat.py", line 404, in __init__
    raise ValueError("No objects to concatenate")
ValueError: No objects to concatenate



In [3]:
# Extract date, month, and hour for aggregation
df_with_time = df.withColumn("pickup_date", to_date(col("tpep_pickup_datetime"))) \
                 .withColumn("pickup_month", date_format(col("tpep_pickup_datetime"), "yyyy-MM")) \
                 .withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))

Calculation started (calculation_id=b6caf222-244b-938b-f08e-593b9f628991) in (session=02caf221-e38c-2aa8-ab55-11c30d59e494). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [4]:
daily_activity = df_with_time.groupBy("pickup_date").agg(count("*").alias("trip_count"))

Calculation started (calculation_id=b4caf222-5336-defa-469b-823ce63b3fe3) in (session=02caf221-e38c-2aa8-ab55-11c30d59e494). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [5]:
monthly_activity = df_with_time.groupBy("pickup_month").agg(count("*").alias("trip_count"))

Calculation started (calculation_id=cecaf222-6f6b-b2c6-7a22-0635fb0ed3cf) in (session=02caf221-e38c-2aa8-ab55-11c30d59e494). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [6]:
hourly_activity = df_with_time.groupBy("pickup_hour").agg(count("*").alias("trip_count"))


Calculation started (calculation_id=e8caf222-8484-3a26-4c8d-f777bd9facf8) in (session=02caf221-e38c-2aa8-ab55-11c30d59e494). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [10]:
from pyspark.sql.functions import col, to_date, date_format, hour, count
# Define output paths
daily_output_path = "s3://cse6242-nyc-trip/nyc_daily_activity.csv"
monthly_output_path = "s3://cse6242-nyc-trip/nyc_monthly_activity.csv"
hourly_output_path = "s3://cse6242-nyc-trip/nyc_hourly_activity.csv"

# Save to CSV with headers
daily_activity.coalesce(1).write.csv(daily_output_path, header=True, mode="overwrite")
monthly_activity.coalesce(1).write.csv(monthly_output_path, header=True, mode="overwrite")
hourly_activity.coalesce(1).write.csv(hourly_output_path, header=True, mode="overwrite")


Calculation started (calculation_id=cecaf223-2c4e-d8d7-3959-9779674f0922) in (session=02caf221-e38c-2aa8-ab55-11c30d59e494). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation completed.


In [None]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# Load daily data
daily_df = pd.read_csv("nyc_daily_activity.csv", parse_dates=["pickup_date"])
daily_df["day"] = daily_df["pickup_date"].dt.day
daily_df["month"] = daily_df["pickup_date"].dt.month_name()

# Pivot for heatmap (daily)
daily_pivot = daily_df.pivot("month", "day", "trip_count")
plt.figure(figsize=(12, 6))
sns.heatmap(daily_pivot, cmap="YlGnBu", annot=False)
plt.title("Daily Taxi Activity Heatmap")
plt.show()


In [10]:
# Add temporal dimensions with null checks
from itertools import chain
# 2. Temporal analysis with null handling
# Extract date components from the correct pickup datetime column
taxi_df = taxi_df.withColumn("year", year("tpep_pickup_datetime")) \
                 .withColumn("month", month("tpep_pickup_datetime")) \
                 .withColumn("day_of_week", dayofweek("tpep_pickup_datetime"))

# Aggregate the data by year, month, day of week, pickup location, and drop-off location
aggregated_df = taxi_df.groupBy(
    "year",
    "month",
    "day_of_week",
    "PULocationID",
    "DOLocationID"
).agg(count("*").alias("num_trips"))

# Display the aggregated data
aggregated_df.show(truncate=False)

Calculation started (calculation_id=f0cb169a-914f-6c0b-086f-f91db4f417f9) in (session=40cb1698-5db5-4b54-871f-bb5c6f2edb87). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation f0cb169a-914f-6c0b-086f-f91db4f417f9 failed


  File "<stdin>", line 18, in <module>
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 423, in show
    print(self._jdf.showString(n, int_truncate, vertical))
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o178.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 105) ([2600:1f18:aa1:3a19:3d2f:233:1dca:44d5] executor 11): org.apache.spark.sql.execution.QueryExecutionException: Encounter 

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Disable vectorized reader to sidestep aggressive schema conversion issues
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

# Read the dataset (let Spark infer the schema)
taxi_df = spark.read.parquet("s3://cse6242-nyc-trip/yellow_tripdata_202*.parquet")

# Now cast the location ID columns to long if needed (or keep as int if that's acceptable)
taxi_df = taxi_df.withColumn("PULocationID", F.col("PULocationID").cast("long")) \
                 .withColumn("DOLocationID", F.col("DOLocationID").cast("long"))

# Extract date components from the pickup datetime column
taxi_df = taxi_df.withColumn("year", F.year("tpep_pickup_datetime")) \
                 .withColumn("month", F.month("tpep_pickup_datetime")) \
                 .withColumn("day_of_week", F.dayofweek("tpep_pickup_datetime"))

# Aggregate the data
aggregated_df = taxi_df.groupBy(
    "year",
    "month",
    "day_of_week",
    "PULocationID",
    "DOLocationID"
).agg(F.count("*").alias("num_trips"))

aggregated_df.show(truncate=False)


Calculation started (calculation_id=5ecb169b-6aff-4fe9-ab18-7945a8d365e3) in (session=40cb1698-5db5-4b54-871f-bb5c6f2edb87). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation 5ecb169b-6aff-4fe9-ab18-7945a8d365e3 failed


  File "<stdin>", line 5, in <module>
AttributeError: 'function' object has no attribute 'getOrCreate'



In [4]:
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

# Create a SparkContext.
sc = SparkContext.getOrCreate()

# Create an SQLContext using the SparkContext.
sqlContext = SQLContext(sc)

# Define an explicit schema that matches the parquet file schema.
schema = StructType([
    StructField("VendorID", IntegerType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", IntegerType(), True),  # matches INT32 in file
    StructField("DOLocationID", IntegerType(), True),   # matches INT32 in file
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),
    StructField("airport_fee", DoubleType(), True)
])

# Read the dataset with the explicit schema (adjust the S3 path as needed)
taxi_df = sqlContext.read.schema(schema).parquet("s3://cse6242-nyc-trip/yellow_tripdata_2024-10.parquet")

# Extract date components from the pickup datetime column.
taxi_df = taxi_df.withColumn("year", F.year("tpep_pickup_datetime")) \
                 .withColumn("month", F.month("tpep_pickup_datetime")) \
                 .withColumn("day_of_week", F.dayofweek("tpep_pickup_datetime"))

# Aggregate by year, month, day of week, pickup location, and drop-off location.
aggregated_df = taxi_df.groupBy(
    "year",
    "month",
    "day_of_week",
    "PULocationID"
    "DOLocationID"
).agg(F.count("*").alias("num_trips"))

aggregated_df.show(truncate=False)


Calculation started (calculation_id=06cb16b3-91ba-1643-8123-49cd5b4092a7) in (session=1ecb16b1-e1a9-baa7-f760-e157ed47fab0). Checking calculation status...


Progress:   0%|          |elapsed time = 00:00s

Calculation 06cb16b3-91ba-1643-8123-49cd5b4092a7 failed


  File "<stdin>", line 2, in <module>
ModuleNotFoundError: No module named 'pyspark.sql.context'

