In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
file_path = '/FileStore/bronze/nyc_taxi_data/'

df = spark.read.format('parquet') \
        .option('header', 'true') \
        .option('mode', 'FAILFAST') \
        .load(file_path)

In [0]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [0]:
df_null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
display(df_null_counts)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,0,0,278989,0,278989,278989,0,0,0,0,0,0,0,0,0,0,278989,278989


In [0]:
df_silver = df.fillna({
    'tip_amount': 0,              
    'extra': 0,                   
    'fare_amount': 0,             
    'total_amount': 0,            
    'mta_tax': 0.5,               
    'Improvement_surcharge': 1,
    'Airport_fee': 0,
    'congestion_surcharge' : 0
})

df_silver = df_silver.filter(col('passenger_count').isNotNull())
display(df_silver.select([count(when(col(c).isNull(), c)).alias(c) for c in df_silver.columns]))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [0]:
df_silver = df_silver.dropDuplicates()

### Modifying Columns 

tpep_pickup_datetime

In [0]:
df_silver = df_silver.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast("timestamp"))
df_silver = df_silver.withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp"))

In [0]:
df_silver = df_silver.withColumn("date", to_date("tpep_pickup_datetime"))

VendorID

In [0]:
(df_silver
    .select('VendorID')
    .groupby('VendorID')
    .count()
    .show()
)


+--------+-------+
|VendorID|  count|
+--------+-------+
|       1| 662161|
|       2|2135752|
+--------+-------+



In [0]:
df_silver = df_silver.withColumn(
    "VendorID",
    when(df_silver["VendorID"] == 1, "Creative Mobile Technologies")
    .when(df_silver["VendorID"] == 2, "VeriFone")
    .otherwise(df_silver["VendorID"])
)

passenger_count - Don't consider zero passenger_count

In [0]:
(df_silver
 .select('passenger_count')
 .groupBy('passenger_count')
 .count()
 .show()
)

+---------------+-------+
|passenger_count|  count|
+---------------+-------+
|              0|  29095|
|              6|  15363|
|              5|  22448|
|              1|2142481|
|              3| 109516|
|              2| 400827|
|              4|  78168|
|              8|     10|
|              7|      3|
|              9|      2|
+---------------+-------+



In [0]:
df_silver = (df_silver
    .where(col('passenger_count') != 0)
)

trip_distance - Don't Consider trip_distance <= 0 and converte miles to kilometers

In [0]:
df_silver = (df_silver
    .where(col('trip_distance') > 0)
)
df_silver = df_silver.withColumn("trip_distance", round(col("trip_distance") * 1.60934, 2))

RateCodeID

In [0]:
df_silver = df_silver.withColumn(
    "RateCodeID",
    when(df_silver["RateCodeID"] == 1, "Standard rate")
    .when(df_silver["RateCodeID"] == 2, "JFK")
    .when(df_silver["RateCodeID"] == 3, "Newark")
    .when(df_silver["RateCodeID"] == 4, "Nassau or Westchester")
    .when(df_silver["RateCodeID"] == 5, "Negotiated fare")
    .when(df_silver["RateCodeID"] == 6, "Group ride")
    .otherwise('NA')
)

df_silver = df_silver.withColumnRenamed('RateCodeID', 'Final_Rate')

Payment type

In [0]:
df_silver = df_silver.withColumn(
    'payment_type',
    when(df_silver['payment_type'] == 1, 'Credit card')
    .when(df_silver['payment_type'] == 2, 'Cash')
    .when(df_silver['payment_type'] == 3, 'No charge')
    .when(df_silver['payment_type'] == 4, 'Dispute')
    .when(df_silver['payment_type'] == 6, 'Voided trip')
    .otherwise('Unknown')
)

fare amount - values between 0 and 1000 USD

In [0]:
df_silver = df_silver.where((col('fare_amount') >= 0) & (col('fare_amount') <= 1000))

extra

In [0]:
df_silver = df_silver.withColumn(
    'extra',
    when(df_silver['extra'] < 0, 0)
    .otherwise(df_silver['extra'])
)

mtax

In [0]:
df_silver = df_silver.withColumn(
    'mta_tax',
    when(df_silver['mta_tax'] == -0.5, 0.5)
    .otherwise(df_silver['mta_tax'])
)

improvement surcharge, tip amount

In [0]:
df_silver = df_silver.withColumn(
    'improvement_surcharge',
    when(col('improvement_surcharge') == -1, 1)
    .otherwise(col('improvement_surcharge'))
)

df_silver = df_silver.fillna({'tip_amount': 0})
df_silver = df_silver.filter(col('tip_amount') >= 0)

df_silver = df_silver.withColumn(
    'tip_amount',
    when((col('payment_type') != 'Credit card') & (col('tip_amount') != 0), 0)  
    .when((col('payment_type') == 'Credit card') & (col('tip_amount') < 0), 0)  
    .otherwise(col('tip_amount'))  
)

total amounts

In [0]:
df_silver = df_silver.withColumn(
    'total_amount',
    abs(col('total_amount'))
)

In [0]:
display(df_silver)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,Final_Rate,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,date
VeriFone,2024-07-01T00:38:33.000+0000,2024-07-01T00:46:13.000+0000,1,3.12,Standard rate,N,162,142,Credit card,10.7,1.0,0.5,3.14,0.0,1.0,18.84,2.5,0.0,2024-07-01
VeriFone,2024-07-01T00:02:32.000+0000,2024-07-01T00:21:39.000+0000,1,13.29,Standard rate,N,138,41,Credit card,35.2,6.0,0.5,9.93,6.94,1.0,61.32,0.0,1.75,2024-07-01
VeriFone,2024-07-01T00:31:47.000+0000,2024-07-01T00:37:52.000+0000,1,3.98,Standard rate,N,142,41,Credit card,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0,2024-07-01
VeriFone,2024-07-01T00:40:44.000+0000,2024-07-01T01:10:28.000+0000,1,17.64,Standard rate,N,132,225,Credit card,45.0,1.0,0.5,11.88,0.0,1.0,61.13,0.0,1.75,2024-07-01
VeriFone,2024-07-01T00:22:07.000+0000,2024-07-01T00:29:49.000+0000,1,1.96,Standard rate,N,144,87,Credit card,10.0,1.0,0.5,3.0,0.0,1.0,18.0,2.5,0.0,2024-07-01
VeriFone,2024-07-01T01:43:34.000+0000,2024-07-01T02:31:57.000+0000,1,29.71,JFK,N,132,163,Cash,70.0,0.0,0.5,0.0,6.94,1.0,82.69,2.5,1.75,2024-07-01
VeriFone,2024-07-01T00:16:08.000+0000,2024-07-01T00:38:19.000+0000,2,13.5,Standard rate,N,132,35,Credit card,35.9,1.0,0.5,0.0,0.0,1.0,40.15,0.0,1.75,2024-07-01
VeriFone,2024-07-01T00:51:21.000+0000,2024-07-01T00:54:52.000+0000,1,1.83,Standard rate,N,141,263,Credit card,6.5,1.0,0.5,3.45,0.0,1.0,14.95,2.5,0.0,2024-07-01
VeriFone,2024-07-01T00:56:35.000+0000,2024-07-01T01:03:37.000+0000,1,1.93,Standard rate,N,141,262,Credit card,9.3,1.0,0.5,2.86,0.0,1.0,17.16,2.5,0.0,2024-07-01
VeriFone,2024-07-01T00:27:47.000+0000,2024-07-01T00:56:50.000+0000,1,14.16,Standard rate,N,138,50,Credit card,39.4,6.0,0.5,9.88,0.0,1.0,61.03,2.5,1.75,2024-07-01


In [0]:
df_silver.write.format("parquet") \
    .mode("overwrite") \
    .option("compression", "snappy") \
    .save("/FileStore/silver/nyc_taxi_data")

In [0]:
%fs ls /FileStore/


path,name,size,modificationTime
dbfs:/FileStore/bronze/,bronze/,0,0
dbfs:/FileStore/gold/,gold/,0,0
dbfs:/FileStore/nyc_taxi.csv/,nyc_taxi.csv/,0,0
dbfs:/FileStore/silver/,silver/,0,0
dbfs:/FileStore/tables/,tables/,0,0
