# We Will follow the same steps we did in the 2020 taxi data. If you want to understand the steps please review `Spark_preprocess_2020`

# Preprocessing Stage
### Processing 2022 Taxi data 

__This will include the following process:__

1- Reading the Taxi data stored in AWS S3 bucket.

2- Analyzing the data and cleaning the data.

3- Grouping the data in Hourly basis and Make it ready for Modeling

4- Saving the transformed data in S3 bucket



===============================================================================================




# 1- Loading the data

In [1]:
from pyspark.sql import SparkSession

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1662022518326_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
spark = SparkSession.builder.appName('cleaning').getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

* __2022 taxi consist of 4 parquet files, each file contain a month taxi records data. The files are uploaded in `taxi-project-thesis` AWS Bucket inside `taxi2022` folder. To read all the 4 parquet files together we will use `.coalesce(1)` function in pyspark__

In [3]:
df=spark.read.parquet("s3://taxi-project-thesis/taxi2022/*").coalesce(1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12671164

__The dataframe contains `12671164` taxi trips. Lets analyze these trips.__

In [5]:
from pyspark.sql.functions import *

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 2- Analyzing  and cleaning the data
## 2.1 Cleaning incorrect trips using pickup_datetime and dropoff_datetime

### Cleaning the trips using the pick-up year
* We need to extract the unique pick year to see if there are trips available in the dataframe which belong to incorrect years

In [6]:
# adding the pick up year
df_new=df.withColumn('Pick_Year',year(df['tpep_pickup_datetime']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# adding the drop up year
df_new=df_new.withColumn('Drop_Year',year(df_new['tpep_dropoff_datetime']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
# check the pick-up year
df_new.select('Pick_Year').distinct().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(Pick_Year=2022), Row(Pick_Year=2021), Row(Pick_Year=2008), Row(Pick_Year=2009), Row(Pick_Year=2003), Row(Pick_Year=2012)]

In [9]:
# adding pick up month
df_new=df_new.withColumn('Pick_month',month(df_new['tpep_pickup_datetime']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# adding drop month
df_new=df_new.withColumn('drop_month',month(df_new['tpep_dropoff_datetime']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
#We need to extract the pick year to see if there are incorrect years
pick_year = df_new.select(collect_set('Pick_Year').alias('Pick_Year')).first()['Pick_Year']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
for i in (pick_year):
    print('year ',i,df_new.filter(df_new['Pick_Year']==i).count())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

year  2022 12671100
year  2008 13
year  2009 24
year  2021 24
year  2003 2
year  2012 1

In [13]:
# we will filter out the year 2021 and 2022
year_list=[2021,2022]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
df_new=df_new.filter(df_new.Pick_Year.isin(year_list))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
df.count()-df_new.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

40

In [16]:
# to check the pick up year in our dataframe
df_new.select('Pick_Year').distinct().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(Pick_Year=2022), Row(Pick_Year=2021)]

### Cleaning the trips using the drop-off year

* We need to extract the unique drop year to see if there are incorrect years

In [17]:
df_new.select('Drop_Year').distinct().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(Drop_Year=2022), Row(Drop_Year=2021)]

We have drop year in 2021 should be removed

In [18]:
# we will remove the rows which the drop year in 2021
df_new.filter(df_new['Drop_Year']==2021).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12

In [19]:
df_new= df_new.filter('Drop_Year=2022')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df_new.select('Drop_Year').distinct().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(Drop_Year=2022)]

## 2.2 Analyzing and cleaning the trip distance

In [22]:
zero_distance=df_new.filter(df_new['trip_distance']==0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
print('Number of trips with zero trip distance ',zero_distance.count(),' trips')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of trips with zero trip distance  144066  trips

In [24]:
# trips which have zero distance and pick location is similir to drop location
zero_distance.filter(zero_distance['PULocationID']==zero_distance['DOLocationID']).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

90980

__So we will try to remove the trips which have zero trip distance and is pick location is different than drop location__

In [25]:
# trips which have zero distance and pick location is different than drop location
zero_distance.filter(zero_distance['PULocationID']!=zero_distance['DOLocationID']).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

53086

In [26]:
# our dataset should have this count
df_new.count() - 53086

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12618026

### Steps:

* We will create data having zero distance trips (already done above).

* We will create dataframe that dont have zero distance trips.(`no_zero_distance`)


* We will filter out the trips that have zero distance and the pick up location is different than the drop locations from the zero dataframe.

* Then we will merge the zero dataframe with the non-zero-distance dataframe

In [27]:
# 1- we will remove the trips contains zero
no_zero_distance=df_new.filter(df_new['trip_distance']!=0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# we will create data having zero which we already created before
zero_distance.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

144066

In [30]:
#3 - we will filter out the trips that have zero and different pick and drop locations
# here we will filter only the trips with similar pick and drop location
Zero_distance_cleared=zero_distance.filter(zero_distance['PULocationID']==zero_distance['DOLocationID'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
# to varify the correct number
no_zero_distance.count() +Zero_distance_cleared.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12618026

In [32]:
# we will merge the trips that have no zero distance with the trips that have zero distance and simlar pick and drop locations
df_new_1 = no_zero_distance.union(Zero_distance_cleared)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
# this dataset contain all the observation except with zero distance and drop location is different than pick location
df_new_1.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12618026

In [34]:
#Check if we have trips with zero distance and diffrent drop and pick locations
df_new_1.filter((df_new_1['trip_distance']==0) & (df_new_1['PULocationID']!=df_new_1['DOLocationID'])).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [35]:
df_new_1.select('trip_distance').describe().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+
|summary|    trip_distance|
+-------+-----------------+
|  count|         12618026|
|   mean|5.704783304457681|
| stddev|584.8984083034265|
|    min|              0.0|
|    max|        348798.53|
+-------+-----------------+

__we will just try to remove the unrealistic trips by removing any trip above 1000 mile__

In [36]:
# Number of trips above or equal 1000 miles
df_new_1.filter(df_new_1['trip_distance']>=1000).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

334

In [37]:
df_new_1=df_new_1.filter(df_new_1['trip_distance']<1000)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 2.3 Fare amount
As per https://www1.nyc.gov/site/tlc/passengers/taxi-fare.page the min fare amount is $2.50 initial charge. so we will analyze the fare amount

In [38]:
#removing the trips that have fare amount less than 2.5
df_new_1=df_new_1.filter(df_new_1['fare_amount']>=2.5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
# we will remove the trips above 500 dollars since its not realistic in my point of view and only 113 trips which will not affect our data
df_new_1=df_new_1.filter(df_new_1['fare_amount']<=500)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
df_new_1.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12544947

## 2.4 Trip duration
As per NYC Taxi & Limousine Commision Regulations, the maximum duration taxi driver can drive per day is 12 hours, so we will assume that the max trip duration could be 12 hours and remove and records axceed 12 hours

Steps to remove the trips above 12 hours:

* We will create new column that contain the trip duration by second `trip_duration_seconds` (just subtract the UNIX pickup time from the UNIX drop time)

* We will create trip_duration in hours from the newly created `trip_duration_seconds`

* Then filterout and remove any trip above 12 hours

In [41]:
df_new_1=df_new_1.withColumn('trip_duration_seconds',unix_timestamp("tpep_dropoff_datetime") - unix_timestamp('tpep_pickup_datetime'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
# We will create trip duration in hour which is the `trip_duration_seconds / 3600`
df_new_1=df_new_1.withColumn('duration_In_Hours',round(col('trip_duration_seconds')/3600,1))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
df_new_1=df_new_1.filter(df_new_1['duration_In_Hours']<=12)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
df_new_1.filter(df_new_1['duration_In_Hours']>12).count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

## 2.5 Check the trips in each month

The trips in each month shpuld be dropped in the same month and some trips in the next month. for example january trips will have pick-up trips occurred in january and the drop of these trips must be in january and some of them will be dropped in february, following the assumption that some trips have been picked during the last hour in 31-Jan and it might been dropped in the first hour during the february first.

In [45]:
for i in range(1,5):
    print('Pick up month ',i,df_new_1.filter(df_new_1['Pick_month']==i).select('drop_month').distinct().collect())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Pick up month  1 [Row(drop_month=1), Row(drop_month=2)]
Pick up month  2 [Row(drop_month=3), Row(drop_month=2)]
Pick up month  3 [Row(drop_month=1), Row(drop_month=3), Row(drop_month=4)]
Pick up month  4 [Row(drop_month=4), Row(drop_month=5)]

__We notice that in March trips we have trips dropped in in January which is incorrect.__

In [46]:
df_new_1.filter((df_new_1['Pick_month']==3) & (df_new_1['drop_month']==1)).select(['tpep_pickup_datetime','tpep_dropoff_datetime']).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|
+--------------------+---------------------+
| 2022-03-18 07:53:14|  2022-01-27 11:53:18|
+--------------------+---------------------+

its wrong trip which should be removed

__Steps to remove this trip:__


* Create dataset without March

* create dataframe only for March trips

* filter out this row and union the two dataframe

In [48]:
# create mar_df
mar_df=df_new_1.filter(df_new_1['Pick_month']==3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
# create no_mar_df
no_mar_df = df_new_1.filter(df_new_1['Pick_month']!=3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
# filter out the feb drop row
mar_df=mar_df.filter(mar_df['drop_month']!=1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
mar_df.select('drop_month').distinct().collect()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[Row(drop_month=3), Row(drop_month=4)]

In [53]:
df_new_final = mar_df.union(no_mar_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
df_new_final.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

12530988

# 3- Grouping the data per location ID in Hourly basis and Make it ready for Modeling

In [55]:
df_new_final.select(countDistinct("PULocationID")).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------------+
|count(DISTINCT PULocationID)|
+----------------------------+
|                         261|
+----------------------------+

In [56]:
LocationID2022 = df_new_final.select(collect_set('PULocationID').alias('locationID_list')).first()['locationID_list']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We have 261 location ID, we will aggregate the trips in hourly basis. so we should have 120 X 24 X 261 = 751680 rows

### Steps:
We will change the timestamp to this format `"yyyy-MM-dd HH:00"` to be able to group the trips hourly

In [61]:
df_new_final=df_new_final.withColumn("Pickup_datetime_hourly", date_format(col("tpep_pickup_datetime").cast("timestamp"), "yyyy-MM-dd HH:00"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [62]:
# we will create trip count column
df_new_final=df_new_final.withColumn("Trip_count", lit(1))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [66]:
hourly_aggregated=df_new_final.groupby(['Pickup_datetime_hourly','PULocationID']).agg({'Trip_count':'count'})\
.withColumnRenamed("count(Trip_count)", "Trips_count")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [67]:
hourly_aggregated.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------+------------+-----------+
|Pickup_datetime_hourly|PULocationID|Trips_count|
+----------------------+------------+-----------+
|      2022-03-01 14:00|         233|         71|
|      2022-03-01 16:00|          10|          7|
|      2022-03-02 12:00|          50|         43|
|      2022-03-02 13:00|         237|        479|
|      2022-03-03 22:00|         189|          5|
|      2022-03-04 13:00|         158|         32|
|      2022-03-04 23:00|         113|        111|
|      2022-03-05 04:00|         231|         31|
|      2022-03-06 10:00|          82|          1|
|      2022-03-07 11:00|         138|        192|
|      2022-03-08 13:00|          65|         10|
|      2022-03-10 11:00|         144|         29|
|      2022-03-10 18:00|          75|         63|
|      2022-03-11 18:00|         231|        148|
|      2022-03-13 03:00|          68|        103|
|      2022-03-13 16:00|         236|        258|
|      2022-03-13 19:00|          24|         27|


In [68]:
hourly_aggregated.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

267126

The aggregated trips have 267126 rows and it should be 751680, and I think the issue is there are no pick-up records in certain hours. So when I aggregate the data, it will not show the missing hours. 

#### To solve this problem:
- we need to create dataframe with timestamp started from 1-1-2022 till 1-5-2022. (2880 rows).
- generate dataframe for each locationID and merge all of them. (751680)
- Create unique column that contain the __timestamp plus the locationID__ in both of them so we can join them.
- left join the generated dataframe with our hourly aggregated dataframe.

### 1- Creating unique coloum in hourly_aggregated
The unique column will merge the timestamp with the locationID. we need to convert the locationID from integer to string then we will use `concat_ws` to merge the string values together

In [70]:
hourly_aggregated.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Pickup_datetime_hourly: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- Trips_count: long (nullable = false)

In [71]:
from pyspark.sql.types import TimestampType,StructField,StringType,IntegerType,StructType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
#Create new column which convert the location ID to string
hourly_aggregated= hourly_aggregated.withColumn("PULocationID_string", hourly_aggregated["PULocationID"].cast(StringType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [73]:
#Create the unique column using concat_ws
hourly_aggregated=hourly_aggregated.select(concat_ws('_',hourly_aggregated.Pickup_datetime_hourly,
                                   hourly_aggregated.PULocationID_string).alias("UniqueColumn"),"Pickup_datetime_hourly","PULocationID","Trips_count")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [74]:
hourly_aggregated.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------------------+------------+-----------+
|        UniqueColumn|Pickup_datetime_hourly|PULocationID|Trips_count|
+--------------------+----------------------+------------+-----------+
|2022-03-01 14:00_233|      2022-03-01 14:00|         233|         71|
| 2022-03-01 16:00_10|      2022-03-01 16:00|          10|          7|
| 2022-03-02 12:00_50|      2022-03-02 12:00|          50|         43|
|2022-03-02 13:00_237|      2022-03-02 13:00|         237|        479|
|2022-03-03 22:00_189|      2022-03-03 22:00|         189|          5|
+--------------------+----------------------+------------+-----------+
only showing top 5 rows

In [75]:
hourly_aggregated.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- UniqueColumn: string (nullable = false)
 |-- Pickup_datetime_hourly: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- Trips_count: long (nullable = false)

### Generate timestamp dataframe from 1-1-2022 till 1-5-2022
The below function will generate timestamp spark dataframe

In [76]:
def generate_series(start, stop, interval):
    """
    :param start  - lower bound, inclusive
    :param stop   - upper bound, exclusive
    :interval int - increment interval in seconds
    """
    start, stop = spark.createDataFrame(
        [(start, stop)], ("start", "stop")
    ).select(
        [col(c).cast("timestamp").cast("long") for c in ("start", "stop")
    ]).first()
    # Create range with increments and cast to timestamp
    return spark.range(start, stop, interval).select(
        col("id").cast("timestamp").alias("timestamp")
    )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [77]:
# we will multiply by 60*60 to change the timestamp to hour
timestamp_df=generate_series("2022-01-01", "2022-05-01", 60 * 60)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We will change the timestamp generated to the same format in `hourly_aggregated`

In [78]:
timestamp_df=timestamp_df.withColumn("hourly_timestamp", date_format(col("timestamp").cast("timestamp"), "yyyy-MM-dd HH:00"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [79]:
timestamp_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- timestamp: timestamp (nullable = false)
 |-- hourly_timestamp: string (nullable = false)

In [80]:
timestamp_df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2880

Now this dataframe contain hourly timestamp for 2021. so we need to create the same rows for each location ID exist in `hourly_aggregated`

In [81]:
# extract the pickup location id from the dataset and save it in list
Pick_up_LocationID = df_new_final.select(collect_set('PULocationID').alias('locationID_list')).first()['locationID_list']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

We have 261 location ID, we will aggregate the trips in hourly basis. so we should have 120 X 24 X 261 = 751680 rows.

To do so, we need to iterate generated timestamp for each locationID and merge all of them by rows

__Define function that add the location ID to the generated timestamp__

In [83]:
def locations_timestampGenerator(dataframe,locationID):
    dataframe=dataframe.withColumn("locationID", lit(locationID))
    return dataframe

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [84]:
data_schema = [StructField("timestamp",TimestampType(), True),StructField("hourly_timestamp", StringType(), True),
               StructField("locationID", IntegerType(), True)]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [85]:
final_struct = StructType(fields=data_schema)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [86]:
ID_plus_timestamp = spark.createDataFrame([], schema=final_struct)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
for i in Pick_up_LocationID:
    dataframe=locations_timestampGenerator(timestamp_df,i)
    ID_plus_timestamp = ID_plus_timestamp.union(dataframe)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [88]:
ID_plus_timestamp.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

751680

#### Create unique column that contain the __timestamp plus the locationID__ in `ID_plus_timestamp` so we can join it to `hourly_aggregated`.

In [89]:
# Change the location ID from integer to string
ID_plus_timestamp= ID_plus_timestamp.withColumn("locationID_string", ID_plus_timestamp["locationID"].cast(StringType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [90]:
# Create the unique column using concat_ws
ID_plus_timestamp=ID_plus_timestamp.select(concat_ws('_',ID_plus_timestamp.hourly_timestamp,
                                   ID_plus_timestamp.locationID_string).alias("UniqueColumn_G"),"timestamp","hourly_timestamp","locationID")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [91]:
ID_plus_timestamp.show(3)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+----------------+----------+
|      UniqueColumn_G|          timestamp|hourly_timestamp|locationID|
+--------------------+-------------------+----------------+----------+
|2022-01-01 00:00_256|2022-01-01 00:00:00|2022-01-01 00:00|       256|
|2022-01-01 01:00_256|2022-01-01 01:00:00|2022-01-01 01:00|       256|
|2022-01-01 02:00_256|2022-01-01 02:00:00|2022-01-01 02:00|       256|
+--------------------+-------------------+----------------+----------+
only showing top 3 rows

#### Join both dataframes using the unique column

In [92]:
# join both dataframes using the unique column
Pick_aggregated=hourly_aggregated.join(ID_plus_timestamp,hourly_aggregated.UniqueColumn ==  ID_plus_timestamp.UniqueColumn_G,"right")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Replace null values in PULocationID and count(Trip_count) columns of dataframe hourly_aggregated_final with 0.

Assume  PULocationID of missing data to be 0

In [93]:
Pick_aggregated=Pick_aggregated.na.fill(0,subset=["PULocationID"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [94]:
Pick_aggregated=Pick_aggregated.na.fill(0,subset=["Trips_count"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Select the columns we need : timestamp, locationID , Trips_count and the UniqueColumn_G
I have selected `UniqueColumn_G` to be the unique identifier and to join it with the dropped aggregated dataframe

In [95]:
Pick_aggregated=Pick_aggregated.select(['timestamp','hourly_timestamp','locationID','Trips_count','UniqueColumn_G'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
Pick_aggregated.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----------------+----------+-----------+--------------------+
|          timestamp|hourly_timestamp|locationID|Trips_count|      UniqueColumn_G|
+-------------------+----------------+----------+-----------+--------------------+
|2022-01-01 00:00:00|2022-01-01 00:00|       120|          0|2022-01-01 00:00_120|
|2022-01-01 00:00:00|2022-01-01 00:00|       133|          0|2022-01-01 00:00_133|
|2022-01-01 00:00:00|2022-01-01 00:00|       137|         59|2022-01-01 00:00_137|
|2022-01-01 00:00:00|2022-01-01 00:00|       160|          0|2022-01-01 00:00_160|
|2022-01-01 00:00:00|2022-01-01 00:00|       166|         11|2022-01-01 00:00_166|
|2022-01-01 00:00:00|2022-01-01 00:00|       178|          0|2022-01-01 00:00_178|
|2022-01-01 00:00:00|2022-01-01 00:00|       179|          0|2022-01-01 00:00_179|
|2022-01-01 00:00:00|2022-01-01 00:00|       186|         37|2022-01-01 00:00_186|
|2022-01-01 00:00:00|2022-01-01 00:00|       249|        159|2022-01-01 00:00_249|
|202

## Aggregating the drop trips per location ID in hourly basis

- We will follow the same steps how we aggregate the pick-up trips above, but now for the drop-off so we will have hourly count per location ID for the pick-up trips and drop-off trips

In [97]:
df_new_final.select(countDistinct("DOLocationID")).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------------+
|count(DISTINCT DOLocationID)|
+----------------------------+
|                         261|
+----------------------------+

__We have 261 drop location ID, and we have 261 pick-up location ID, we will make sure these are the same location ID__

In [98]:
# extract the drop location id from the dataset and save it in list
Drop_LocationID = df_new_final.select(collect_set('DOLocationID').alias('DOLocationID')).first()['DOLocationID']

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
unique_list = []
 
for item in Pick_up_LocationID: 
    if item not in Drop_LocationID: 
        unique_list.append(item)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [100]:
unique_list

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[]

#### Now we are going to aggregate the data based on hourly drop count per locationID.
We will change the timestamp to this format `"yyyy-MM-dd HH:00"` to be able to group the trips hourly

In [101]:
Drop_df=df_new_final.withColumn("Drop_datetime_hourly", date_format(col("tpep_dropoff_datetime").cast("timestamp"), "yyyy-MM-dd HH:00"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [102]:
# Grouping and aggregating the dropped trips
drop_aggregated = Drop_df.groupby(['Drop_datetime_hourly','DOLocationID']).agg({'Trip_count':'count'})\
.withColumnRenamed("count(Trip_count)", "Drop_Trips_count")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [103]:
drop_aggregated.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+----------------+
|Drop_datetime_hourly|DOLocationID|Drop_Trips_count|
+--------------------+------------+----------------+
|    2022-01-01 03:00|         225|              17|
|    2022-01-01 03:00|         144|              24|
|    2022-01-01 03:00|         192|               1|
|    2022-01-01 06:00|         249|               5|
|    2022-01-01 15:00|         164|              86|
|    2022-01-03 05:00|         170|              12|
|    2022-01-03 12:00|         189|               4|
|    2022-01-04 21:00|         140|              55|
|    2022-01-05 12:00|         142|             128|
|    2022-01-05 14:00|         210|               2|
|    2022-01-07 01:00|          74|               9|
|    2022-01-08 16:00|          49|               5|
|    2022-01-08 22:00|          93|               1|
|    2022-01-09 06:00|         237|              11|
|    2022-01-09 16:00|         254|               1|
|    2022-01-09 20:00|          33|           

In [106]:
drop_aggregated.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

449482

__Create unique column that contain the timestamp plus the locationID in ID_plus_timestamp so we can join it to Pick_aggregated__

Convert the DOLocationID to string type

In [107]:
drop_aggregated=drop_aggregated.withColumn("DOLocationID_string", drop_aggregated["DOLocationID"].cast(StringType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [108]:
# Create the unique column using `concat_ws`
drop_aggregated=drop_aggregated.select(concat_ws('_',drop_aggregated.Drop_datetime_hourly,
                                   drop_aggregated.DOLocationID_string).alias("UniqueColumn"),"Drop_datetime_hourly","Drop_datetime_hourly","DOLocationID","Drop_Trips_count")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Joining Pick_aggregated & drop_aggregated dataframes using the UniqueColumn

In [109]:
final_df=drop_aggregated.join(Pick_aggregated,drop_aggregated.UniqueColumn == Pick_aggregated.UniqueColumn_G,"right")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [110]:
# Filling the null Drop_Trips_count with zero
final_df=final_df.na.fill(0,subset=["Drop_Trips_count"])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Now we will just select the columns we  want:

`hourly timestamp`, `locationID`, `Pick_Trips_count` and `Drop_Trips_count`

In [111]:
final_df=final_df.select(['timestamp','hourly_timestamp','locationID','Trips_count','Drop_Trips_count'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [112]:
final_df=final_df.withColumnRenamed('Trips_count','Pick_Trips_count')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [113]:
final_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----------------+----------+----------------+----------------+
|          timestamp|hourly_timestamp|locationID|Pick_Trips_count|Drop_Trips_count|
+-------------------+----------------+----------+----------------+----------------+
|2022-01-01 00:00:00|2022-01-01 00:00|       120|               0|               1|
|2022-01-01 00:00:00|2022-01-01 00:00|       133|               0|               0|
|2022-01-01 00:00:00|2022-01-01 00:00|       137|              59|              65|
|2022-01-01 00:00:00|2022-01-01 00:00|       160|               0|               0|
|2022-01-01 00:00:00|2022-01-01 00:00|       166|              11|              22|
|2022-01-01 00:00:00|2022-01-01 00:00|       178|               0|               1|
|2022-01-01 00:00:00|2022-01-01 00:00|       179|               0|               8|
|2022-01-01 00:00:00|2022-01-01 00:00|       186|              37|              33|
|2022-01-01 00:00:00|2022-01-01 00:00|       249|             159|          

# Feature Engineering

Now the dataframe is ready for modeling, final step is just to create some features such as month, day, year, hour and dayofweek


In [114]:
# Create Year column
final_df=final_df.withColumn('Year',year(final_df['timestamp']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [115]:
# Create Month column
final_df=final_df.withColumn('Month',month(final_df['timestamp']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [116]:
# create day of month column
final_df=final_df.withColumn('DayOfMonth',dayofmonth(final_df['timestamp']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [117]:
# Create hour column
final_df=final_df.withColumn('Hour',hour(final_df['timestamp']))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [118]:
# Create Day of week column
final_df=final_df.withColumn("dayofweek", dayofweek(col("timestamp")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [119]:
final_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----------------+----------+----------------+----------------+----+-----+----------+----+---------+
|          timestamp|hourly_timestamp|locationID|Pick_Trips_count|Drop_Trips_count|Year|Month|DayOfMonth|Hour|dayofweek|
+-------------------+----------------+----------+----------------+----------------+----+-----+----------+----+---------+
|2022-01-01 00:00:00|2022-01-01 00:00|       120|               0|               1|2022|    1|         1|   0|        7|
|2022-01-01 00:00:00|2022-01-01 00:00|       133|               0|               0|2022|    1|         1|   0|        7|
|2022-01-01 00:00:00|2022-01-01 00:00|       137|              59|              65|2022|    1|         1|   0|        7|
|2022-01-01 00:00:00|2022-01-01 00:00|       160|               0|               0|2022|    1|         1|   0|        7|
|2022-01-01 00:00:00|2022-01-01 00:00|       166|              11|              22|2022|    1|         1|   0|        7|
|2022-01-01 00:00:00|2022-01-01 

In [120]:
final_df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

751680

### Save the final dataframe as parquet file in S3

__Now we will save this spark dataframe to S3 bucket inside `EMR-project folder`__

In [121]:
final_df.coalesce(1).write.parquet("s3://taxi-project-thesis/EMR-project/final_2022")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [122]:
### Load the final dataframe from S3
df_taxi=spark.read.parquet("s3://taxi-project-thesis/EMR-project/final_2022/part-00000-61fc92be-6c6d-48fa-8f41-be9b0951fc17-c000.snappy.parquet")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [123]:
df_taxi.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----------------+----------+----------------+----------------+----+-----+----------+----+---------+
|          timestamp|hourly_timestamp|locationID|Pick_Trips_count|Drop_Trips_count|Year|Month|DayOfMonth|Hour|dayofweek|
+-------------------+----------------+----------+----------------+----------------+----+-----+----------+----+---------+
|2022-01-01 00:00:00|2022-01-01 00:00|       120|               0|               1|2022|    1|         1|   0|        7|
|2022-01-01 00:00:00|2022-01-01 00:00|       133|               0|               0|2022|    1|         1|   0|        7|
|2022-01-01 00:00:00|2022-01-01 00:00|       137|              59|              65|2022|    1|         1|   0|        7|
|2022-01-01 00:00:00|2022-01-01 00:00|       160|               0|               0|2022|    1|         1|   0|        7|
|2022-01-01 00:00:00|2022-01-01 00:00|       166|              11|              22|2022|    1|         1|   0|        7|
+-------------------+-----------

In [124]:
df_taxi.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

751680

DONE !