# Pre-Processing

Code throughout assignment was adapted from tutes


In [3]:
# Downloads the training data from TLC

from urllib.request import urlretrieve
import os

YEAR = '2019'
MONTHS = range(1, 13)
URL_TEMPLATE = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_"

tlc_output_dir = '../data/raw/tlc_2019'
if not os.path.exists(tlc_output_dir):
    os.makedirs(tlc_output_dir)
    
for month in MONTHS:
    
    month = str(month).zfill(2) 
    print(f"Begin month {month}")
    
    # generate url
    url = f'{URL_TEMPLATE}{YEAR}-{month}.parquet'
    # generate output location and filename
    output_dir = f"{tlc_output_dir}/{YEAR}-{month}.parquet"
    # download
    urlretrieve(url, output_dir) 
    
    print(f"Completed month {month}")

Begin month 01
Completed month 01
Begin month 02
Completed month 02
Begin month 03
Completed month 03
Begin month 04
Completed month 04
Begin month 05
Completed month 05
Begin month 06
Completed month 06
Begin month 07
Completed month 07
Begin month 08
Completed month 08
Begin month 09
Completed month 09
Begin month 10
Completed month 10
Begin month 11
Completed month 11
Begin month 12
Completed month 12


In [4]:
from pyspark.sql import SparkSession

# Create a spark session
spark = (
    SparkSession.builder.appName("preprocessing")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "2g")
    .config("spark.executor.memory", "6g")
    .getOrCreate()
)

22/08/29 06:46:17 WARN Utils: Your hostname, AryansLaptop resolves to a loopback address: 127.0.1.1; using 172.18.205.204 instead (on interface eth0)
22/08/29 06:46:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/29 06:46:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Reads in the weather train dataset

tempdf = spark.read.option("header", "true").csv("../data/raw/JRB-train.tsv",sep='\t')
print(tempdf.count(), len(tempdf.columns))
tempdf.limit(5)

8368 30
22/08/29 06:49:52 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


station,valid,tmpf,dwpf,relh,drct,sknt,p01i,alti,mslp,vsby,gust,skyc1,skyc2,skyc3,skyc4,skyl1,skyl2,skyl3,skyl4,wxcodes,ice_accretion_1hr,ice_accretion_3hr,ice_accretion_6hr,peak_wind_gust,peak_wind_drct,peak_wind_time,feel,metar,snowdepth
JRB,2019-01-01 00:56,44.1,43.0,95.88,90.0,7.0,0.18,30.08,1018.5,6.0,,OVC,,,,700.0,,,,-RA BR,,,,,,,39.53,KJRB 010056Z AUTO...,
JRB,2019-01-01 01:56,45.0,44.1,96.63,50.0,8.0,0.15,30.02,1016.7,3.0,,OVC,,,,700.0,,,,RA BR,,,,,,,40.13,KJRB 010156Z AUTO...,
JRB,2019-01-01 02:56,45.0,44.1,96.63,40.0,10.0,0.17,29.94,1013.9,8.0,,OVC,,,,700.0,,,,-RA,,,,,,,39.27,KJRB 010256Z AUTO...,
JRB,2019-01-01 03:56,46.0,45.0,96.28,40.0,7.0,0.11,29.88,1011.9,9.0,15.0,OVC,,,,700.0,,,,-RA,,,,,,,41.84,KJRB 010356Z AUTO...,
JRB,2019-01-01 04:56,48.9,48.0,96.69,,3.0,0.15,29.8,1009.2,7.0,,OVC,,,,500.0,,,,-RA,,,,,,,48.01,KJRB 010456Z AUTO...,


In [6]:
# Removes pointless features and checks if any null values

tempdf = tempdf.select("valid","tmpf","dwpf","relh","sknt")
print(tempdf.count())
tempdf.dropna("any")
print(tempdf.count())
tempdf.limit(5)

8368
8368


valid,tmpf,dwpf,relh,sknt
2019-01-01 00:56,44.1,43.0,95.88,7.0
2019-01-01 01:56,45.0,44.1,96.63,8.0
2019-01-01 02:56,45.0,44.1,96.63,10.0
2019-01-01 03:56,46.0,45.0,96.28,7.0
2019-01-01 04:56,48.9,48.0,96.69,3.0


In [7]:
# Transforms and renames dataset

from pyspark.sql.functions import *

# adds all the time related columns to dataset
tempdf = tempdf.withColumn("month-day-hr", date_format(col("valid"), "MMMM-dd-HH"))
tempdf = tempdf.withColumn("month", date_format(col("valid"), "MM").cast("long"))
tempdf = tempdf.withColumn("day", date_format(col("valid"), "dd").cast("long"))
tempdf = tempdf.withColumn("pickup_hour", date_format(col("valid"), "HH").cast("long"))
tempdf = tempdf.drop("valid")

# renames columns to less ambiguous names
tempdf = tempdf.withColumnRenamed("tmpf","temperature(f)")
tempdf = tempdf.withColumnRenamed("dwpf","dew_point_temp(f)")
tempdf = tempdf.withColumnRenamed("relh","relative_humidity")
tempdf = tempdf.withColumnRenamed("sknt","wind_speed")

# makes fields double so they can be used in model making
for field in ('temperature(f)',"dew_point_temp(f)","relative_humidity","wind_speed"):
    tempdf = tempdf.withColumn(
        field,
        col(field).cast('double')
    )

tempdf.limit(5)

temperature(f),dew_point_temp(f),relative_humidity,wind_speed,month-day-hr,month,day,pickup_hour
44.1,43.0,95.88,7.0,January-01-00,1,1,0
45.0,44.1,96.63,8.0,January-01-01,1,1,1
45.0,44.1,96.63,10.0,January-01-02,1,1,2
46.0,45.0,96.28,7.0,January-01-03,1,1,3
48.9,48.0,96.69,3.0,January-01-04,1,1,4


In [25]:
tempdf.schema

StructType([StructField('temperature(f)', DoubleType(), True), StructField('dew_point_temp(f)', DoubleType(), True), StructField('relative_humidity', DoubleType(), True), StructField('wind_speed', DoubleType(), True), StructField('month-day-hr', StringType(), True), StructField('month', LongType(), True), StructField('day', LongType(), True), StructField('pickup_hour', LongType(), True)])

In [8]:
# reads in the taxi dataset

taxidf = spark.read.parquet('../data/raw/tlc_2019')
print(taxidf.count(), len(taxidf.columns))
taxidf.limit(5)

84598444 19


                                                                                

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2019-03-01 00:24:41,2019-03-01 00:25:31,1.0,0.0,1.0,N,145,145,2,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0,
1,2019-03-01 00:25:27,2019-03-01 00:36:37,2.0,3.7,1.0,N,95,130,1,13.0,0.5,0.5,0.7,0.0,0.3,15.0,0.0,
1,2019-03-01 00:05:21,2019-03-01 00:38:23,1.0,14.1,1.0,N,249,28,1,41.0,3.0,0.5,10.1,5.76,0.3,60.66,2.5,
1,2019-03-01 00:48:55,2019-03-01 01:06:03,1.0,9.6,1.0,N,138,98,2,27.0,0.5,0.5,0.0,0.0,0.3,28.3,0.0,
1,2019-03-01 00:11:42,2019-03-01 00:16:40,1.0,0.8,1.0,N,48,48,1,5.5,3.0,0.5,3.0,0.0,0.3,12.3,2.5,


In [9]:
# Transforms taxi data

from pyspark.sql.functions import *

# used to inner join later
taxidf = taxidf.withColumn("month-day-hr", date_format(col("tpep_pickup_datetime"), "MMMM-dd-HH"))

# used to remove small journeys
taxidf = taxidf.withColumn('tpep_pickup_datetime',to_timestamp(col('tpep_pickup_datetime')))\
  .withColumn('tpep_dropoff_datetime', to_timestamp(col('tpep_dropoff_datetime')))\
  .withColumn('journey_time',(col("tpep_dropoff_datetime").cast("long") - col('tpep_pickup_datetime').cast("double"))/60)
taxidf = taxidf.withColumn("journey_time", round(col("journey_time"), 2))

taxidf = taxidf.select("month-day-hr","PULocationID","journey_time","trip_distance")

taxidf.limit(10)

month-day-hr,PULocationID,journey_time,trip_distance
March-01-00,145,0.83,0.0
March-01-00,95,11.17,3.7
March-01-00,249,33.03,14.1
March-01-00,138,17.13,9.6
March-01-00,48,4.97,0.8
March-01-00,246,4.58,1.2
March-01-00,239,4.88,0.6
February-28-19,132,9.15,5.65
March-01-00,229,4.9,1.16
March-01-00,137,4.35,0.71


In [10]:
# taxi dataset filtering

print(taxidf.count())
taxidf = taxidf.filter(col('journey_time') > 10)
print(taxidf.count())
taxidf = taxidf.filter(col('trip_distance') > 0)
print(taxidf.count())

84598444


                                                                                

47769530




47647994


                                                                                

In [11]:
# Gets counts for ever hour-day-month-location combo

countdf = taxidf.groupBy("month-day-hr","PULocationID").count()
countdf

                                                                                

month-day-hr,PULocationID,count
February-28-23,148,215
March-01-01,37,2
March-01-02,142,8
March-01-09,221,1
March-01-12,228,2
March-01-14,151,56
April-12-22,132,430
March-02-00,223,2
March-02-04,236,7
March-02-05,205,1


In [9]:
countdf.count()

                                                                                

1080774

In [12]:
# Inner join

mergedf = countdf.join(tempdf, ["month-day-hr"])
mergedf.count()

                                                                                

1032570

In [13]:
mergedf = mergedf.drop("month-day-hr")
mergedf.limit(10)

                                                                                

PULocationID,count,temperature(f),dew_point_temp(f),relative_humidity,wind_speed,month,day,pickup_hour
148,215,35.1,19.0,51.47,9.0,2,28,23
37,2,33.1,18.0,53.41,10.0,3,1,1
142,8,32.0,17.1,53.7,13.0,3,1,2
221,1,32.0,21.9,65.92,10.0,3,1,9
228,2,30.0,28.0,92.15,9.0,3,1,12
151,56,33.1,28.9,84.36,10.0,3,1,14
132,430,62.1,55.9,80.11,4.0,4,12,22
223,2,35.1,30.0,81.46,9.0,3,2,0
236,7,34.0,28.9,81.37,12.0,3,2,4
205,1,34.0,30.9,88.27,13.0,3,2,5


In [42]:
mergedf.schema

StructType([StructField('PULocationID', LongType(), True), StructField('count', LongType(), False), StructField('temperature(f)', DoubleType(), True), StructField('dew_point_temp(f)', DoubleType(), True), StructField('relative_humidity', DoubleType(), True), StructField('wind_speed', DoubleType(), True), StructField('month', LongType(), True), StructField('day', LongType(), True), StructField('pickup_hour', LongType(), True)])

In [14]:
mergedf.write.parquet('../data/curated/mergedf.paraquet')

                                                                                