## Preprocessing

#### Starting a Spark Session

In [1]:
from pyspark.sql import SparkSession

# Create a spark session (which will run spark jobs)
spark = (
    SparkSession.builder.appName("Project 1 Preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", True)
    .config("spark.executor.memory", "3g")
    .config("spark.driver.memory", "3g")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .getOrCreate()
)

22/08/21 19:38:43 WARN Utils: Your hostname, DESKTOP-AKL6QQR resolves to a loopback address: 127.0.1.1; using 172.24.95.98 instead (on interface eth0)
22/08/21 19:38:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/21 19:38:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Reading the Parquets for Taxi Data

In [2]:
sdf1 = spark.read.parquet('../data/raw/2018-01.parquet')
sdf2 = spark.read.parquet('../data/raw/2018-02.parquet')
sdf3 = spark.read.parquet('../data/raw/2018-03.parquet')
sdf4 = spark.read.parquet('../data/raw/2018-04.parquet')
sdf5 = spark.read.parquet('../data/raw/2018-05.parquet')
sdf6 = spark.read.parquet('../data/raw/2018-06.parquet')
sdf7 = spark.read.parquet('../data/raw/2018-07.parquet')
sdf8 = spark.read.parquet('../data/raw/2018-08.parquet')

                                                                                

#### Add Month Column to Each Dataframe - Taxi

In [3]:
from pyspark.sql.functions import lit

sdf1 = sdf1.withColumn('Month', lit("Jan"))
sdf2 = sdf2.withColumn('Month', lit("Feb"))
sdf3 = sdf3.withColumn('Month', lit("Mar"))
sdf4 = sdf4.withColumn('Month', lit("Apr"))
sdf5 = sdf5.withColumn('Month', lit("May"))
sdf6 = sdf6.withColumn('Month', lit("Jun"))
sdf7 = sdf7.withColumn('Month', lit("Jul"))
sdf8 = sdf8.withColumn('Month', lit("Aug"))

#### Concatenate Dataframes - Taxi

In [4]:
# create new dataframe which is the concatenation of the 6 dataframes
taxi_sdf = sdf1.union(sdf2)
taxi_sdf = taxi_sdf.union(sdf3)
taxi_sdf = taxi_sdf.union(sdf4)
taxi_sdf = taxi_sdf.union(sdf5)
taxi_sdf = taxi_sdf.union(sdf6)
taxi_sdf = taxi_sdf.union(sdf7)
taxi_sdf = taxi_sdf.union(sdf8)

# number of combined rows for all dataframes
totaln = sdf1.count() + sdf2.count() + sdf3.count() + sdf4.count() + sdf5.count() + sdf6.count() + sdf7.count() + sdf8.count()

# check that the concatenation resulted in the correct number of rows
print(bool(totaln == taxi_sdf.count()))

taxi_sdf.limit(5)

True


                                                                                

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,Month
1,2018-01-01 00:21:05,2018-01-01 00:24:23,1.0,0.5,1.0,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,,Jan
1,2018-01-01 00:44:55,2018-01-01 01:03:05,1.0,2.7,1.0,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,,Jan
1,2018-01-01 00:08:26,2018-01-01 00:14:21,2.0,0.8,1.0,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,,Jan
1,2018-01-01 00:20:22,2018-01-01 00:52:51,1.0,10.2,1.0,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,,Jan
1,2018-01-01 00:09:18,2018-01-01 00:27:06,2.0,2.5,1.0,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,,Jan


In [5]:
# shape of dataset
print((taxi_sdf.count(), len(taxi_sdf.columns)))

(69636649, 20)


                                                                                

#### Reading the External Weather Data & Removing Irrelevant Columns - Weather

In [6]:
import pandas as pd

weather_df = pd.read_csv('../data/raw/74486094789.csv', low_memory=False)

# these are the same for every instance
weather_df = weather_df.drop('STATION', axis=1)
weather_df = weather_df.drop('NAME', axis=1)
weather_df = weather_df.drop('ELEVATION', axis=1)
weather_df = weather_df.drop('LATITUDE', axis=1)
weather_df = weather_df.drop('LONGITUDE', axis=1)

# drop every irrelevant column (keep only the "Mandatory Data Section")
weather_df = weather_df.filter(['DATE','WND','VIS','TMP','AA1'])

# drop instances after June 30th
n = weather_df.index[weather_df['DATE'] == '2018-08-31T23:51:00'].tolist()
weather_df = weather_df.iloc[:n[0]+1]

weather_df.tail(5)

Unnamed: 0,DATE,WND,VIS,TMP,AA1
9080,2018-08-31T20:51:00,"100,5,N,0082,5","016093,5,N,5",2285,1000025
9081,2018-08-31T21:00:00,"100,1,N,0082,1",016000199,2281,3000021
9082,2018-08-31T21:51:00,"110,5,N,0067,5","016093,5,N,5",2285,1000095
9083,2018-08-31T22:51:00,"090,5,N,0072,5","016093,5,N,5",2225,1000095
9084,2018-08-31T23:51:00,"100,5,N,0062,5","016093,5,N,5",2175,1000095


#### Seperating the Combination Columns into Individual Columns - Weather

In [7]:
import numpy as np

# separate the WND column but only keep wind speed
weather_df[['1', '2', '3', 'Wind_Speed', '5']] = weather_df['WND'].str.split(',', expand=True) # names of the attributes are irrelevant
weather_df = weather_df.drop(['WND','1','2','3','5'], axis=1)  

# separate the VIS column but only keep visibility distance
weather_df[['Visibility_Distance', '1', '2', '3']] = weather_df['VIS'].str.split(',', expand=True)
weather_df = weather_df.drop(['VIS','1','2','3'], axis=1)

# separate the TMP column but only keep air temperature
weather_df[['Air_Temp', '1']] = weather_df['TMP'].str.split(',', expand=True)
weather_df = weather_df.drop(['TMP','1'], axis=1)

# separate the AA1 column but only keep precipitation depth
weather_df[['1', 'Precipitation_Depth', '3', '4']] = weather_df['AA1'].str.split(',', expand=True)
weather_df = weather_df.drop(['AA1','1','3','4'], axis=1) 

# turn missing values (often denoted by 9999) to NaN
weather_df['Wind_Speed'] = weather_df['Wind_Speed'].replace('9999', np.NaN)
weather_df['Visibility_Distance'] = weather_df['Visibility_Distance'].replace('999999', np.NaN)
weather_df['Air_Temp'] = weather_df['Air_Temp'].replace('+9999', np.NaN)
weather_df['Precipitation_Depth'] = weather_df['Precipitation_Depth'].replace('9999', np.NaN)

weather_df.head(5)


Unnamed: 0,DATE,Wind_Speed,Visibility_Distance,Air_Temp,Precipitation_Depth
0,2018-01-01T00:00:00,93,16000,-111,
1,2018-01-01T00:51:00,67,16093,-117,0.0
2,2018-01-01T01:51:00,93,16093,-117,0.0
3,2018-01-01T02:51:00,93,16093,-122,0.0
4,2018-01-01T03:00:00,93,16000,-122,


In [8]:
# dimensions before NULL value removal
weather_df.shape

(9085, 5)

#### Interpolating the Data to fill NaN Values & Correcting dtypes - Weather

In [9]:
print(weather_df.isna().sum())

# remove duplicate rows - the data contains duplicate values in the DATE column
weather_df = weather_df.drop_duplicates(subset=['DATE'], keep='first')

# set the correct dtypes for each column
weather_df['DATE'] = pd.to_datetime(weather_df['DATE'])
weather_df['Wind_Speed'] = pd.to_numeric(weather_df['Wind_Speed'])
weather_df['Visibility_Distance'] = pd.to_numeric(weather_df['Visibility_Distance'])
weather_df['Air_Temp'] = pd.to_numeric(weather_df['Air_Temp'])
weather_df['Precipitation_Depth'] = pd.to_numeric(weather_df['Precipitation_Depth'])

# correct the scale of Air_Temp
weather_df['Air_Temp'] = weather_df['Air_Temp'].div(10).round(4)

# linear interpolation with a limit on the maximum number of consecutive NaNs to fill (2)
weather_df[['Wind_Speed','Visibility_Distance','Air_Temp','Precipitation_Depth']] = weather_df[['Wind_Speed','Visibility_Distance',\
    'Air_Temp','Precipitation_Depth']].interpolate(limit=2, limit_direction='forward')
print(weather_df.isna().sum())

# drop the rows that still contain NaN values - too much risk in interpolating further
weather_df = weather_df.dropna()

DATE                      0
Wind_Speed              251
Visibility_Distance     251
Air_Temp                252
Precipitation_Depth    2056
dtype: int64
DATE                    0
Wind_Speed              0
Visibility_Distance     0
Air_Temp                0
Precipitation_Depth    79
dtype: int64


In [10]:
# dimension after NULL value removal
weather_df.shape

(8993, 5)

22/08/22 02:41:27 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 287563 ms exceeds timeout 120000 ms
22/08/22 02:41:28 WARN SparkContext: Killing executors is not supported by current scheduler.


#### Removing Instances that did not Pay with Credit Card

In [8]:
taxi_sdf = taxi_sdf.filter(taxi_sdf.payment_type==1)

#### Dropping Columns that are not Required for Analysis - Taxi

In [9]:
columns_to_drop = ['Store_and_fwd_flag','VendorID', 'fare_amount', 'RatecodeID', \
    'airport_fee', 'congestion_surcharge', 'payment_type'] # Airport fee & congestion surchage would have been
                                                           # useful but they are empty.
taxi_sdf = taxi_sdf.drop(*columns_to_drop)

print((taxi_sdf.count(), len(taxi_sdf.columns)))



(48224456, 13)


                                                                                

#### Creating a New Column for Total Extra Fees Paid

In [10]:
from pyspark.sql.functions import col

fees_sdf = taxi_sdf['Extra'] + taxi_sdf['MTA_tax'] + taxi_sdf['Improvement_surcharge']

taxi_sdf = taxi_sdf.withColumn('Extra_fees_sum', fees_sdf)
columns_to_drop = ['Extra', 'MTA_tax', 'Improvement_surcharge']
taxi_sdf = taxi_sdf.drop(*columns_to_drop)

#### Outlier Detection & Removal - Taxi

In [11]:
# drop trips with location IDs less than 1 or more than 263
taxi_sdf = taxi_sdf.filter((taxi_sdf.PULocationID>=1) & (taxi_sdf.PULocationID<=263))
taxi_sdf = taxi_sdf.filter((taxi_sdf.DOLocationID>=1) & (taxi_sdf.DOLocationID<=263))

# drop trips with zero passengers
taxi_sdf = taxi_sdf.filter(taxi_sdf.passenger_count>0)

# drop trips with zero trip distance
taxi_sdf = taxi_sdf.filter(taxi_sdf.trip_distance>0)

# drop trips with negative duration
taxi_sdf = taxi_sdf.filter(taxi_sdf.tpep_dropoff_datetime>taxi_sdf.tpep_pickup_datetime)

# drop trips with zero total amount paid
taxi_sdf = taxi_sdf.filter(taxi_sdf.total_amount>0)

# drop trips that are out of the selected date range
taxi_sdf = taxi_sdf.filter(taxi_sdf.tpep_dropoff_datetime>='2018-01-01 00:00:00').filter\
    (taxi_sdf.tpep_dropoff_datetime<='2018-08-31 23:59:59')
taxi_sdf = taxi_sdf.filter(taxi_sdf.tpep_pickup_datetime>='2018-01-01 00:00:00').filter\
    (taxi_sdf.tpep_pickup_datetime<='2018-08-31 23:59:59')


In [12]:
print((taxi_sdf.count(), len(taxi_sdf.columns)))



(46884259, 11)


                                                                                

#### Joining Taxi & Weather Dataframes together

In [13]:
from pyspark.sql import functions as F

# find mean weather conditions for each day
weather_df['DATE']= pd.to_datetime(weather_df['DATE'])
weather_df['DATE'] = weather_df['DATE'].dt.date
weather_df = pd.DataFrame(weather_df.groupby('DATE', as_index = False).mean())

# turn weather dataframe from pandas to pyspark & rename columns
w_sdf = spark.createDataFrame(weather_df)
w_sdf = w_sdf.withColumnRenamed('Wind_Speed', 'Avg_Wind_Speed')
w_sdf = w_sdf.withColumnRenamed('Visibility_Distance', 'Avg_Visibility_Distance')
w_sdf = w_sdf.withColumnRenamed('Air_Temp', 'Avg_Air_Temp')
w_sdf = w_sdf.withColumnRenamed('Precipitation_Depth', 'Avg_Precipitation_Depth')

# create a column with just the date (not time) to make the dataframe joins easier
taxi_sdf = taxi_sdf.withColumn('DATE', F.to_date(F.col('tpep_pickup_datetime')))

# join the two dataframes
joint_sdf = taxi_sdf.join(w_sdf, taxi_sdf['DATE'] == w_sdf['DATE'], 'left')

# drop the columns not required any further 
columns_to_drop = ['DATE', 'DATE']
joint_sdf = joint_sdf.drop(*columns_to_drop)

joint_sdf.limit(5)

                                                                                

tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,PULocationID,DOLocationID,tip_amount,tolls_amount,total_amount,Month,Extra_fees_sum,Avg_Wind_Speed,Avg_Visibility_Distance,Avg_Air_Temp,Avg_Precipitation_Depth
2018-01-01 00:08:26,2018-01-01 00:14:21,2.0,0.8,262,141,1.0,0.0,8.3,Jan,1.3,82.359375,16072.65625,-11.246875,0.0
2018-01-01 00:09:18,2018-01-01 00:27:06,2.0,2.5,246,239,2.75,0.0,16.55,Jan,1.3,82.359375,16072.65625,-11.246875,0.0
2018-01-01 00:38:08,2018-01-01 00:48:24,2.0,1.7,50,239,2.05,0.0,12.35,Jan,1.3,82.359375,16072.65625,-11.246875,0.0
2018-01-01 00:49:29,2018-01-01 00:51:53,1.0,0.7,239,238,1.0,0.0,6.3,Jan,1.3,82.359375,16072.65625,-11.246875,0.0
2018-01-01 00:56:38,2018-01-01 01:01:05,1.0,1.0,238,24,1.7,0.0,8.5,Jan,1.3,82.359375,16072.65625,-11.246875,0.0


#### Checking for Null Values

In [14]:
import pyspark.sql.functions as F

def count_missings(spark_df, sort=True): # Function from user "gench" - https://stackoverflow.com/questions/44627386/
                                         # how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
    """
    Counts number of nulls and nans in each column
    """
    df = spark_df.select([F.count(F.when(F.isnan(c) | F.isnull(c), c)).alias(c) for (c,c_type)\
        in spark_df.dtypes if c_type not in ('timestamp', 'string', 'date')]).toPandas()

    if len(df) == 0:
        print("There are no any missing values!")
        return None

    if sort:
        return df.rename(index={0: 'count'}).T.sort_values("count",ascending=False)

    return df

count_missings(joint_sdf)

                                                                                

Unnamed: 0,count
passenger_count,0
trip_distance,0
PULocationID,0
DOLocationID,0
tip_amount,0
tolls_amount,0
total_amount,0
Extra_fees_sum,0
Avg_Wind_Speed,0
Avg_Visibility_Distance,0


#### Exporting Data to Curated Folder

In [15]:
joint_sdf.write.mode('overwrite').parquet('../data/curated/processed_data')

                                                                                