In [14]:
from prettytable import PrettyTable

In [1]:
from pyspark.sql import SparkSession

In [9]:
from pyspark.sql.types import IntegerType, StringType

In [2]:
spark = SparkSession.builder.appName("nyc_taxi_data_processing").getOrCreate()

In [3]:
df = spark.read.parquet("yellow_tripdata_2024-01.parquet")

In [4]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [5]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.

In [6]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [7]:
#  Handling null values
df = df.na.drop(how = "all")

In [8]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.

In [11]:
# To iterate through multiple columns in a DataFrame and convert columns of StringType to IntegerType
"""for column in df.columns:
    if isinstance(df.schema[column].DataType, StringType):

        df = df.withColumn(column, col(column).cast(IntegerType()))
df.show()"""

'for column in df.columns:\n    if isinstance(df.schema[column].DataType, StringType):\n\n        df = df.withColumn(column, col(column).cast(IntegerType()))\ndf.show()'

In [12]:
df = df.dropDuplicates()

In [13]:
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2024-01-01 00:41:06|  2024-01-01 00:53:42|              1|          1.5|         1|                 N|         164|          79|           1|       12.8|  3.5|    0.5|      4.4

In [15]:
table = PrettyTable()

In [16]:
table.field_names = df.columns

In [17]:
for row in df.take(10):
    table.add_row(row)

In [18]:
table

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
1,2024-01-01 00:41:06,2024-01-01 00:53:42,1,1.5,1,N,164,79,1,12.8,3.5,0.5,4.45,0.0,1.0,22.25,2.5,0.0
2,2024-01-01 00:53:36,2024-01-01 01:19:34,1,5.68,1,N,239,7,1,28.9,1.0,0.5,8.47,0.0,1.0,42.37,2.5,0.0
2,2024-01-01 00:56:19,2024-01-01 00:59:25,1,1.25,1,N,236,74,1,7.2,1.0,0.5,2.44,0.0,1.0,14.64,2.5,0.0
2,2024-01-01 00:09:26,2024-01-01 00:19:31,1,2.79,1,N,142,166,1,14.2,1.0,0.5,3.84,0.0,1.0,23.04,2.5,0.0
2,2024-01-01 00:43:30,2024-01-01 01:18:53,2,6.1,1,N,90,40,1,36.6,1.0,0.5,12.13,6.94,1.0,60.67,2.5,0.0
1,2024-01-01 00:56:41,2024-01-01 01:33:30,4,8.6,1,N,90,228,1,43.6,3.5,0.5,11.1,6.94,1.0,66.64,2.5,0.0
2,2024-01-01 00:51:20,2024-01-01 00:56:11,1,0.92,1,N,238,151,1,7.2,1.0,0.5,2.0,0.0,1.0,11.7,0.0,0.0
2,2024-01-01 00:11:04,2024-01-01 00:19:24,2,1.59,1,N,237,263,2,10.7,1.0,0.5,0.0,0.0,1.0,15.7,2.5,0.0
1,2024-01-01 00:46:04,2024-01-01 01:14:06,2,3.2,1,N,68,264,1,25.4,3.5,0.5,6.05,0.0,1.0,36.45,2.5,0.0
2,2024-01-01 00:28:35,2024-01-01 00:34:49,3,1.43,1,N,141,263,1,8.6,1.0,0.5,2.72,0.0,1.0,16.32,2.5,0.0


In [19]:
df.count()

2964624

In [20]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']