In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import numpy as np
from urllib.request import urlretrieve

# Exploring Dataset #

In [2]:
spark = (
 SparkSession.builder.appName("project 1")
 .config("spark.sql.repl.eagerEval.enabled", True)
 .config("spark.executor.memory","6G")
 .config("spark.driver.memory","6G")
 .config("spark.sql.parquet.cacheMetadata", "true")
 .config("spark.sql.session.timeZone", "Etc/UTC")
 .config('spark.driver.maxResultSize', '2048m')
 .getOrCreate()
)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/21 00:33:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/08/21 00:33:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
Yellow_Taxi_Jan = spark.read.parquet('../data/raw/2023-01.parquet')
Green_Taxi_Jan = spark.read.parquet('../data/raw/green_tripdata_2023-01.parquet')
FHV_Jan = spark.read.parquet('../data/raw/fhv_tripdata_2023-01.parquet')
FHV_HV_Jan = spark.read.parquet('../data/raw/fhvhv_tripdata_2023-01.parquet')

                                                                                

In [4]:
print("Number of trips recorded by Yellow Taxi: ", Yellow_Taxi_Jan.count())
print("Number of trips recorded by Green Taxi: ", Green_Taxi_Jan.count())
print("Number of trips recorded by FHV: ", FHV_Jan.count())
print("Number of trips recorded by FHV_hv: ", FHV_HV_Jan.count())

Number of trips recorded by Yellow Taxi:  3066766
Number of trips recorded by Green Taxi:  68211
Number of trips recorded by FHV:  1114320
Number of trips recorded by FHV_hv:  18479031


## Feature Selection and Data Cleaning ##

In [5]:
#Adding new features by transforming the exist features
def new_feature(sdf):
    return sdf.withColumn('datetime', date_format('tpep_pickup_datetime', 'yyyy-MM-dd'))\
        .withColumn('duration', unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime'))\
        .withColumn('hourly_earn', F.col('total_amount')/(unix_timestamp('tpep_dropoff_datetime') - unix_timestamp('tpep_pickup_datetime'))*3600)\
        .withColumn('Month', date_format('tpep_pickup_datetime', 'MM').cast('int'))\
        .withColumn('Hour', date_format('tpep_pickup_datetime', 'HH').cast('int'))\
        .withColumn('Weekend', dayofweek('tpep_pickup_datetime').isin(1,7))\
        .withColumn('Airport', (F.col('airport_fee') > 0).cast('BOOLEAN'))\
        .withColumn('Congestion',(F.col('congestion_surcharge') > 0).cast('BOOLEAN'))

In [6]:
#Deleting unusual instance
def feature_clean(sdf):
    return sdf.where((F.col('duration') > 60) & (F.col('duration') < 3600*5))\
        .where((F.col('trip_distance') < 280) & (F.col('trip_distance') > 0))\
        .where(F.col('hourly_earn') > 0)\
        .where(F.col('total_amount') > 0)\
        .where(F.col('congestion_surcharge') >= 0)\
        .where(F.col('passenger_count') >= 0)\
        .where(F.col('payment_type') == 1)\
        .dropna()
        

In [7]:
sdf = spark.read.parquet('../data/curated/*')

In [8]:
print('Num of raw data :', sdf.count())
print('Null data ratio: ', sdf.dropna().count()/sdf.count())
#Derive the percentage of instances without null value

Num of raw data : 19585935




Null data ratio:  0.9716568037216503


                                                                                

In [9]:
sdf = new_feature(sdf)
sdf = feature_clean(sdf)

In [10]:
print('Num of data after being filtered: ', sdf.count())

23/08/21 00:34:16 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

Num of data after being filtered:  15121628


                                                                                

## Merge New Dataset ##

In [11]:
weather_dataset = spark.read.csv("../data/newyork_weather.csv", header = True)

In [12]:
weather_dataset

datetime,temp,precip,windspeedmean,visibility
2022-12-01,18.9,0,7.0,4.1
2022-12-02,19.7,0,7.6,5.0
2022-12-03,20.7,0,4.9,4.7
2022-12-04,21.0,0,6.2,4.4
2022-12-05,22.0,0,9.3,5.3
2022-12-06,20.6,0,8.5,4.3
2022-12-07,19.1,0,10.0,6.5
2022-12-08,18.1,0,9.5,7.0
2022-12-09,18.1,0,11.4,5.4
2022-12-10,18.5,0,9.4,6.8


In [13]:
merged_sdf = sdf.join(weather_dataset, on = 'datetime', how = 'leftouter')

In [14]:
# Transforming them to float datatype from string
merged_sdf = merged_sdf.withColumn("temp", F.col("temp").cast('float'))
merged_sdf = merged_sdf.withColumn("precip", F.col("precip").cast('float'))
merged_sdf = merged_sdf.withColumn("windspeedmean", F.col("windspeedmean").cast('float'))
merged_sdf = merged_sdf.withColumn("visibility", F.col("visibility").cast('float'))

## Export dataset to parquet file ##

In [15]:
merged_sdf.write.mode('overwrite').parquet('../data/curated/merged_sdf.parquet')

                                                                                

In [16]:
# 1% instances from the original dataset as sample
df = merged_sdf.sample(0.01, seed=0).toPandas()
df.to_parquet('../data/curated/sample_data.parquet')

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
