In [69]:
import gc
spark.stop()
gc.collect()

11548

In [1]:
import requests
import os
import subprocess

### Downloading Data files

In [9]:
# downloading data
def downloadData(path):
    for url in range(1,13):
        # construct file url for 2023 
        url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2022-{url:02}.parquet"
        file_path = path + os.path.basename(url)
        # Ensure the parent directory exists
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        if  os.path.exists(file_path):
            print(f"File {file_path} exist.")
            continue  # Skip the current iteration and go to the next file
        
        # get request to download the file
        with requests.get(url , stream=True)as r:
            r.raise_for_status()
            # write file in chunks
            print(f'Writting file to {file_path}')
            with open(file_path,"wb") as f:
                for chunk in r.iter_content(chunk_size=80000):
                    f.write(chunk)

In [None]:
path = "/home/itversity/itversity-material/NYC_TCL/data/" 
downloadData(path)

In [21]:
#download taxi_zone_lookup.csv
url="https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"
file_path = path + os.path.basename(url)
with requests.get(url , stream=True)as r:
            r.raise_for_status()
            # write file in chunks
            with open(file_path,"wb") as f:
                for chunk in r.iter_content(chunk_size=80000):
                    f.write(chunk)



### Moving Data from Local to HDFS

In [17]:
local_dir = "/home/itversity/itversity-material/NYC_TCL/data/"
hdfs_dir = "/NYC/raw"
# create parent directory on HDFS if not exist
subprocess.run(["hdfs","dfs","-mkdir","-p",hdfs_dir])
# loop over files and move from local to HDFS
for file in range(1,13):
    file_path =local_dir + f"yellow_tripdata_2022-{file:02}.parquet"
    file_name = os.path.basename(f"{local_dir}yellow_tripdata_2022-{file:02}.parquet")
    hdfs_file_path = os.path.join(hdfs_dir,file_name)
    # check if file not found on local 
    if not os.path.exists(file_path):
        print(f"File {file_path} does not exist. Skipping this file.")
        continue  # Skip the current iteration and go to the next file
    print(f"moving file:{file_name}  to \n {hdfs_file_path} ")
    subprocess.run(["hdfs", "dfs", "-copyFromLocal", file_path, hdfs_file_path])
# moving taxi_zone_lookup.csv to HDFS
file_path =local_dir + f"taxi_zone_lookup.csv"
file_name = os.path.basename(f"{local_dir}taxi_zone_lookup.csv")
hdfs_file_path = os.path.join(hdfs_dir,file_name)
subprocess.run(["hdfs", "dfs", "-copyFromLocal", file_path, hdfs_file_path])

moving file:yellow_tripdata_2022-01.parquet  to 
 /NYC/raw/yellow_tripdata_2022-01.parquet 
moving file:yellow_tripdata_2022-02.parquet  to 
 /NYC/raw/yellow_tripdata_2022-02.parquet 
moving file:yellow_tripdata_2022-03.parquet  to 
 /NYC/raw/yellow_tripdata_2022-03.parquet 
moving file:yellow_tripdata_2022-04.parquet  to 
 /NYC/raw/yellow_tripdata_2022-04.parquet 
moving file:yellow_tripdata_2022-05.parquet  to 
 /NYC/raw/yellow_tripdata_2022-05.parquet 
moving file:yellow_tripdata_2022-06.parquet  to 
 /NYC/raw/yellow_tripdata_2022-06.parquet 
moving file:yellow_tripdata_2022-07.parquet  to 
 /NYC/raw/yellow_tripdata_2022-07.parquet 
moving file:yellow_tripdata_2022-08.parquet  to 
 /NYC/raw/yellow_tripdata_2022-08.parquet 
moving file:yellow_tripdata_2022-09.parquet  to 
 /NYC/raw/yellow_tripdata_2022-09.parquet 
moving file:yellow_tripdata_2022-10.parquet  to 
 /NYC/raw/yellow_tripdata_2022-10.parquet 
moving file:yellow_tripdata_2022-11.parquet  to 
 /NYC/raw/yellow_tripdata_2022-

CompletedProcess(args=['hdfs', 'dfs', '-copyFromLocal', '/home/itversity/itversity-material/NYC_TCL/data/taxi_zone_lookup.csv', '/NYC/raw/taxi_zone_lookup.csv'], returncode=0)

### Creating Spark Session

In [2]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.functions import * # year,month,dayofyear,datediff,unix_timestamp,dayofmonth,to_date,date_format,round,col
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, DoubleType, StringType

In [3]:
spark = SparkSession.builder.appName("NYC_taxi").enableHiveSupport().getOrCreate()

### Reading files and saving as one table

In [71]:
df = spark.read.parquet("/NYC/raw/yellow_tripdata_2022*.parquet",compression="snappy" )
df.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)



In [70]:
df.summary()

summary,passenger_count,trip_distance,RatecodeID,PULocationID,DOLocationID,payment_type,total_amount
count,37315145.0,37315145.0,37315145.0,37315145.0,37315145.0,37315145.0,37315145.0
mean,1.442238077863559,5.691385983689089,1.344849845819975,164.9714783635438,162.73626528853097,1.2206806378482518,21.602135243734228
stddev,0.947547130371869,7.162479882767672,5.278042280257353,64.82981528797151,70.13817852868611,0.4566393943738757,16.83831250905367
min,1.0,0.512,1.0,1.0,1.0,1.0,0.3
25%,1.0,1.84,1.0,132.0,113.0,1.0,12.3
50%,1.0,3.04,1.0,162.0,162.0,1.0,15.96
75%,2.0,5.696000000000001,1.0,234.0,234.0,1.0,22.8
max,9.0,149.872,99.0,265.0,265.0,5.0,408.329002287754


### Exploring and Cleaning data

In [72]:
# to focus on necessary features for the analysis some columns will be excluded 
# assuming trips with distance greater than 150 or less than 0.5 miles are outliers 
# assuming trips with total_amount greater than 350 or less than 
# considering only the absolute value for total_amount 
required_columns = ["tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count","trip_distance","RatecodeID",
                    "PULocationID","DOLocationID","payment_type","total_amount"]
df = df.withColumn("total_amount",abs(format_number(col("total_amount")+col("airport_fee"),2).cast("double"))).select(required_columns) \
        .withColumn("trip_distance" , col("trip_distance")*1.6) \
        .where(
                (col("trip_distance")<=150)&(col("trip_distance") >= 0.5)
                ) \
        .where(
                (col("total_amount") <=350)
                )
# df.count()

In [75]:
# function to count null values for each column containing nulls
def null_count(df):
    null_columns_count=[]
    row_count=df.count()
    for c in df.columns:
        null_rows=df.where(df[f"{c}"].isNull()).count()
        if null_rows > 0:
            temp= c, null_rows , (null_rows/row_count)*100
            null_columns_count.append(temp)
            
    return null_columns_count

In [76]:
null_columns = null_count(df)
print(null_columns)

[]


##### Substituting  records containing nulls for "passenger_count" column with AVG number of passenger per trip
##### Substituting  records containing nulls for "total_amount" column using a factor of "fare_per_distance"

In [77]:
# Get average passenger count per trip  
avg_passenger_count = df.select("passenger_count").where((col("passenger_count").isNotNull()) & (col("passenger_count")!= 0) ). \
        select(ceil(avg("passenger_count"))).collect()[0][0]
# Calculate factor for fare per distance
avg_fare_trip = df.select(["trip_distance","total_amount"]).where(
            (col("trip_distance").isNotNull())|(col("trip_distance")!=0) | (col("total_amount").isNotNull())|(col("total_amount")!=0)) \
            .withColumn("fare_per_trip", col("total_amount")/col("trip_distance")).agg(avg("fare_per_trip")).collect()[0][0]

# handling nulls in columns passenger_count & total_amount
df=df.withColumn("passenger_count", \
                when(
    (col("passenger_count").isNull())|(col("passenger_count")==0),lit(avg_passenger_count)).otherwise(col("passenger_count"))) \
    .withColumn("total_amount" ,
               when((col("total_amount").isNull()) | (col("total_amount")==0), col("trip_distance")*avg_fare_trip ).otherwise(col("total_amount")) )
# df.show(15)

##### Chechk if any integer field contains negative values

In [78]:
integr_fields =[field.name for field in df.schema if isinstance(field.dataType , (LongType, DoubleType))  ]
negative_values = {column: df.select(column).where(col(column) < 0).count() > 0 for column in integr_fields  }
negative_values=[column  for column , has_negative in negative_values.items() if has_negative]
negative_values

[]

In [79]:
# take the absolute for any negative value 
for column in negative_values:
    df = df.withColumn(column,abs(col(column)))


In [83]:
columns_arranged=["tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count","trip_distance",
                  "RatecodeID","PULocationID","DOLocationID","duration","payment_type","total_amount","year","month","day","day_name"]
df_final =df.withColumn("trip_distance",round(col("trip_distance"),2)) \
        .withColumn("year", year("tpep_pickup_datetime").cast("string")) \
        .withColumn("month",month("tpep_pickup_datetime").cast("string")) \
        .withColumn("day",dayofmonth("tpep_pickup_datetime").cast("string")) \
        .withColumn("day_name",date_format("tpep_pickup_datetime" , "E")) \
        .withColumn("duration",(unix_timestamp("tpep_dropoff_datetime")-unix_timestamp("tpep_pickup_datetime"))/60) \
        .withColumn("duration", round("duration",2)) \
            .select(columns_arranged).where(col("year")=='2022')  # filtring for year 2022

In [86]:
df_final.write.partitionBy("year","month").mode("overwrite").format("parquet").saveAsTable("Yellow_tripdata_cleaned",path="/NYC/processed")

In [None]:
# df.persist(StorageLevel.MEMORY_AND_DISK)
# df.unpersist()