In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("NewYorkTaxiData") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .getOrCreate()

In [16]:
green_file_names = [
    "green_tripdata_2023-01.parquet",
    "green_tripdata_2023-02.parquet",
    "green_tripdata_2023-03.parquet",
    "green_tripdata_2023-04.parquet",
    "green_tripdata_2023-05.parquet",
    "green_tripdata_2023-06.parquet"
]

df_green = spark.read.parquet(green_file_names[0])
for file_name in green_file_names[1:]:
    df_green = df_green.union(spark.read.parquet(file_name))
    
yellow_file_names = [
    "yellow_tripdata_2023-01.parquet",
    "yellow_tripdata_2023-02.parquet",
    "yellow_tripdata_2023-03.parquet",
    "yellow_tripdata_2023-04.parquet",
    "yellow_tripdata_2023-05.parquet",
    "yellow_tripdata_2023-06.parquet"
]

df_yellow = spark.read.parquet(yellow_file_names[0])
for file_name in yellow_file_names[1:]:
    df_yellow = df_yellow.union(spark.read.parquet(file_name))

    
df_lookUp = spark.read.option("header", "true").csv("taxi_zone_lookup.csv")


df_green = df_green.withColumnRenamed("lpep_pickup_datetime", "tpep_pickup_datetime")
df_green = df_green.withColumnRenamed("lpep_dropoff_datetime", "tpep_dropoff_datetime")
selected_columns = ["tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count","trip_distance","PULocationID","DOLocationID","total_amount"]
df_yellow = df_yellow.select(selected_columns)
df_green = df_green.select(selected_columns)
df = df_yellow.union(df_green)
df.show(10)
    



+--------------------+---------------------+---------------+-------------+------------+------------+------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|PULocationID|DOLocationID|total_amount|
+--------------------+---------------------+---------------+-------------+------------+------------+------------+
| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|         161|         141|        14.3|
| 2023-01-01 00:55:08|  2023-01-01 01:01:27|            1.0|          1.1|          43|         237|        16.9|
| 2023-01-01 00:25:04|  2023-01-01 00:37:49|            1.0|         2.51|          48|         238|        34.9|
| 2023-01-01 00:03:48|  2023-01-01 00:13:25|            0.0|          1.9|         138|           7|       20.85|
| 2023-01-01 00:10:29|  2023-01-01 00:21:19|            1.0|         1.43|         107|          79|       19.68|
| 2023-01-01 00:50:34|  2023-01-01 01:02:52|            1.0|         1.84|         161| 

In [17]:
df_lookUp.show(10)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 10 rows



In [18]:
count = df.count()
print("Number of rows in the DataFrame:", count)


Number of rows in the DataFrame: 19898800


In [19]:
df = df.join(df_lookUp.select("LocationID", "Borough").withColumnRenamed("LocationID", "PULocationID"), on="PULocationID", how="left")
df = df.withColumnRenamed("Borough", "PUBorough")
df = df.join(df_lookUp.select("LocationID", "Zone").withColumnRenamed("LocationID", "PULocationID"), on="PULocationID", how="left")
df = df.withColumnRenamed("Zone", "PUZone")
df = df.join(df_lookUp.select("LocationID", "service_zone").withColumnRenamed("LocationID", "PULocationID"), on="PULocationID", how="left")
df = df.withColumnRenamed("service_zone", "PUservice_zone")
df = df.join(df_lookUp.select("LocationID", "Borough").withColumnRenamed("LocationID", "DOLocationID"), on="DOLocationID", how="left")
df = df.withColumnRenamed("Borough", "DOBorough")
df = df.join(df_lookUp.select("LocationID", "Zone").withColumnRenamed("LocationID", "DOLocationID"), on="DOLocationID", how="left")
df = df.withColumnRenamed("Zone", "DOZone")
df = df.join(df_lookUp.select("LocationID", "service_zone").withColumnRenamed("LocationID", "DOLocationID"), on="DOLocationID", how="left")
df = df.withColumnRenamed("service_zone", "DOservice_zone")

df.show(10)

+------------+------------+--------------------+---------------------+---------------+-------------+------------+---------+--------------------+--------------+---------+--------------------+--------------+
|DOLocationID|PULocationID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|total_amount|PUBorough|              PUZone|PUservice_zone|DOBorough|              DOZone|DOservice_zone|
+------------+------------+--------------------+---------------------+---------------+-------------+------------+---------+--------------------+--------------+---------+--------------------+--------------+
|         141|         161| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|        14.3|Manhattan|      Midtown Center|   Yellow Zone|Manhattan|     Lenox Hill West|   Yellow Zone|
|         237|          43| 2023-01-01 00:55:08|  2023-01-01 01:01:27|            1.0|          1.1|        16.9|Manhattan|        Central Park|   Yellow Zone|Manhattan|Upper E

In [20]:
from pyspark.sql.functions import col
df = df.filter((col("passenger_count") != 0) & (col("passenger_count") < 5))
df = df.filter((col("total_amount") > 0))
df = df.withColumn('passenger_count', col('passenger_count').cast('string'))

In [21]:
count = df.count()
print("Number of rows in the DataFrame:", count)


Number of rows in the DataFrame: 18395784


In [22]:
import matplotlib.pyplot as plt
import seaborn as sns

pandas_df = df.select("passenger_count").toPandas()
plt.bar(pandas_df['passenger_count'].value_counts().index, pandas_df['passenger_count'].value_counts())
plt.xlabel('passenger_count')
plt.ylabel('Count')
plt.title('Bar Chart of passenger_count')
plt.show()

Py4JJavaError: An error occurred while calling o487.collectToPython.
: java.lang.OutOfMemoryError: Java heap space


In [None]:
data = pd.read_parquet("green_tripdata_2023-01.parquet")

In [14]:
spark.stop()