### since I'm using an M1 CPU I worked in colab

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

# Milestone 3 - Pre-processing and analysis with PySpark

## Deadline - Sunday, 10th of December @11.59 pm

The goal of this milestone is to preprocess the dataset 'New York yellow taxis' by performing basic data preparation and basic analysis to gain a better understanding of the data using PySpark.

Use the same month and year you used for the green taxis in milestone 1. [Datasets](https://drive.google.com/drive/folders/1t8nBgbHVaA5roZY4z3RcAG1_JMYlSTqu?usp=sharing) (download the yellow taxis dataset).

Important Notes:
- You MUST use this notebook template/structure. not doing so will result in marks deduction.
- You MUST have the cells run and output shown similar to milestone 1. I will NOT RUN YOUR NOTEBOOK.

Submission guidelines: same as milestone 1.

Notebook name must be same format as the file you named in miletsone 1. Just M3 instead of M1.

IMPORTANT: You are only allowed to use PySpark unless explicitly told otherwise(i.e last task).

Useful resource/documentation (highly recommended) - [PySpark examples](https://sparkbyexamples.com/pyspark-tutorial/)


## Weight dist.
- Loading the dataset : 5%
- Basic cleaning: 30%
	- column renaming: 10%
	- detect missing: 35%
	- Handle missing: 35%
	- Check missing : 20%
- Analyses: 30%
- Encoding: 20%
- Lookup table: 10%
- Writing the cleaned and lookup table back as parquet and csv files: 5%.

# Tasks:

## Load the dataset.

In [None]:
import findspark
findspark.init()
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M3").getOrCreate()
sc = spark.sparkContext
from pyspark.sql import functions as fn
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import *
from pyspark.sql.functions import sum,regexp_replace,avg, col, unix_timestamp, dayofweek ,dayofmonth , weekofyear , count,when,isnan,lit,concat ,desc
import re
from pyspark.sql.types import StringType

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
df = spark.read.parquet("/content/drive/MyDrive/yellow_tripdata_2018-08.parquet")

### Preview first 20 rows.

In [None]:
df.show(vertical=True)

-RECORD 0-------------------------------------
 Vendor                | Creative Mobile T... 
 tpep_pickup_datetime  | 2018-08-01 00:44:35  
 tpep_dropoff_datetime | 2018-08-01 01:03:22  
 passenger_count       | 1.0                  
 trip_distance         | 5.6                  
 Rate_type             | Standard rate        
 store_and_fwd_flag    | N                    
 PU_Location           | Manhattan,Upper W... 
 DO_Location           | Manhattan,East Vi... 
 payment_type          | Credit card          
 fare_amount           | 19.0                 
 extra                 | 0.5                  
 mta_tax               | 0.5                  
 tip_amount            | 4.0                  
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.3                  
 total_amount          | 24.3                 
 congestion_surcharge  | null                 
 airport_fee           | null                 
-RECORD 1-------------------------------------
 Vendor      

### How many partitions is this dataframe split into?

In [None]:
df.rdd.getNumPartitions()

2

## Basic cleaning

### rename all columns (replacing a space with an underscore, and making it lowercase)

In [None]:
def clean_column_name(name):
    return name.lower().replace(" ", "_")

In [None]:
df2 = df.select([fn.col(x).alias(clean_column_name(x)) for x in df.columns])

In [None]:
df2.columns

['vendor',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'rate_type',
 'store_and_fwd_flag',
 'pu_location',
 'do_location',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

### Detect and remove duplicates
- Duplicates are trips with same pickup time,pickup location, dropoff time,drop off location and trip distance

In [None]:
distinct_df = df2.distinct()

In [None]:
distinct_df.rdd.repartition(8)

MapPartitionsRDD[1775] at coalesce at NativeMethodAccessorImpl.java:0

In [None]:
distinct_df.count()

7855040

In [None]:
df2.rdd.repartition(8)

MapPartitionsRDD[1796] at coalesce at NativeMethodAccessorImpl.java:0

In [None]:
df2_count = df2.count()

In [None]:
df3 = df2.dropDuplicates(subset=["tpep_pickup_datetime" , "tpep_dropoff_datetime" , "pu_location" , "do_location" , "trip_distance"])


### check that there are no duplicates

In [None]:
df3.rdd.repartition(8)

MapPartitionsRDD[1819] at coalesce at NativeMethodAccessorImpl.java:0

In [None]:
df2.select(["tpep_pickup_datetime" , "tpep_dropoff_datetime" , "pu_location" , "do_location" , "trip_distance"]).distinct().count()

7849259

In [None]:
df3.distinct().count()

7849259

### Detect missing
- Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
- Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.  

In [None]:
amount_missing_df = df3.select([(count(when(col(c).isNull(), c))/count(lit(1))).alias(c) for c in df.columns])


### Prinout the missing info

In [None]:
amount_missing_df.show(vertical=True)


-RECORD 0-------------------------------------
 Vendor                | 0.0                  
 tpep_pickup_datetime  | 0.0                  
 tpep_dropoff_datetime | 0.0                  
 passenger_count       | 0.008991167191705613 
 trip_distance         | 0.0                  
 Rate_type             | 0.0                  
 store_and_fwd_flag    | 6.580238975424304E-4 
 PU_Location           | 0.0                  
 DO_Location           | 0.0                  
 payment_type          | 0.06645595463215062  
 fare_amount           | 0.0                  
 extra                 | 0.5169430643070894   
 mta_tax               | 0.0                  
 tip_amount            | 0.0                  
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.0                  
 total_amount          | 0.0                  
 congestion_surcharge  | 1.0                  
 airport_fee           | 1.0                  



### Handle missing
- For numerical features replace with 0.
- For categorical/strings replace with 'Unknown'


In [None]:
numerical_cols = [col_name for col_name, data_type in df.dtypes if data_type == 'int']
categorical_cols = [col_name for col_name, data_type in df.dtypes if not data_type == 'int']

In [None]:
df4 = df3.fillna(0, subset=numerical_cols)
df5 = df4.fillna('Unknown', subset=categorical_cols)

### check that there are no missing values

In [None]:
amount_missing_df_new = df5.select([(count(when(col(c).isNull(), c))/count(lit(1))).alias(c) for c in df.columns])

In [None]:
amount_missing_df_new.show(vertical = True)

-RECORD 0-------------------------------------
 Vendor                | 0.0                  
 tpep_pickup_datetime  | 0.0                  
 tpep_dropoff_datetime | 0.0                  
 passenger_count       | 0.008991167191705613 
 trip_distance         | 0.0                  
 Rate_type             | 0.0                  
 store_and_fwd_flag    | 0.0                  
 PU_Location           | 0.0                  
 DO_Location           | 0.0                  
 payment_type          | 0.0                  
 fare_amount           | 0.0                  
 extra                 | 0.5169430643070894   
 mta_tax               | 0.0                  
 tip_amount            | 0.0                  
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.0                  
 total_amount          | 0.0                  
 congestion_surcharge  | 0.0                  
 airport_fee           | 0.0                  



## Feature engineering -
Write a function that adds the 3 following features. Use built in fucntions in PySpark (from the functions library) check lab 8, Avoid writing UDFs from scratch.
- trip duration (the format/unit is up to you)
- is_weekend. whether the trip occurred on Saturday or Sunday.
- week number (relevant to the month and not year, i.e 1,2,3,4 nto 31,32,33...)

In [None]:
def add_trip_features(df):
    df = df.withColumn("pickup_timestamp", unix_timestamp("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
    df = df.withColumn("dropoff_timestamp", unix_timestamp("tpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss").cast("timestamp"))
    df = df.withColumn("trip_duration", (col("dropoff_timestamp") - col("pickup_timestamp")))
    df = df.withColumn("trip_duration", df["trip_duration"].cast(StringType()))
    df = df.withColumn("day_of_week", dayofweek("pickup_timestamp"))
    df = df.withColumn("is_weekend", (col("day_of_week") >= 6).cast("int"))
    df = df.withColumn("week_number",  (weekofyear("tpep_pickup_datetime") / 4).cast("int") + 1)
    return df

In [None]:
df6 = add_trip_features(df5)

### Preview the first 20 rows (only select the following features: pickup and droptime, and the 3 features you added).

In [None]:
selected_features = ["tpep_pickup_datetime", "tpep_dropoff_datetime" , "trip_duration" , "is_weekend" , "week_number"]
df6.select(selected_features).show()

+--------------------+---------------------+--------------------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|       trip_duration|is_weekend|week_number|
+--------------------+---------------------+--------------------+----------+-----------+
| 2018-08-01 00:03:35|  2018-08-01 00:07:34|3 minutes 59 seconds|         0|          8|
| 2018-08-01 00:06:51|  2018-08-01 00:11:28|4 minutes 37 seconds|         0|          8|
| 2018-08-01 00:04:08|  2018-08-01 00:11:43|7 minutes 35 seconds|         0|          8|
| 2018-08-01 00:12:37|  2018-08-01 00:16:08|3 minutes 31 seconds|         0|          8|
| 2018-08-01 00:05:42|  2018-08-01 00:16:58|11 minutes 16 sec...|         0|          8|
| 2018-08-01 00:10:42|  2018-08-01 00:17:22|6 minutes 40 seconds|         0|          8|
| 2018-08-01 00:07:23|  2018-08-01 00:19:13|11 minutes 50 sec...|         0|          8|
| 2018-08-01 00:03:45|  2018-08-01 00:20:38|16 minutes 53 sec...|         0|          8|
| 2018-08-01 00:15:05

## Analyses - Answer the following 5 questions (by showing the output and and a short 1-2 sentences regarding your observation/answer)

MUST Use the PySpark SQL API.

DO NOT explicitly write SQL queries. Doing so will result in 50% deduction (for the question). Check lab 7.

You are free to add columns if it will help in answering a question and add useful info to the dataset.

### 1- What is the average fare amount per payment type

In [None]:
avg_fare = df6.groupBy("payment_type").agg(avg("fare_amount").alias("avg_fare"))

In [None]:
avg_fare.show()

+------------+------------------+
|payment_type|          avg_fare|
+------------+------------------+
|     Unknown|13.037452557446443|
|        Cash|12.336380802085786|
|     Dispute| 9.910525966641393|
|   No charge| 50.93430596487098|
| Credit card|13.521796426677769|
+------------+------------------+



### 2- Do people tend to go on a longer trips during the weekend or weekdays?

In [None]:
average_trip_distance_by_is_weekend = df6.groupBy("is_weekend").agg(avg("trip_distance").alias("avg_trip_distance"))

In [None]:
average_trip_distance_by_is_weekend.show()

+----------+------------------+
|is_weekend| avg_trip_distance|
+----------+------------------+
|         1| 3.022138163603692|
|         0|3.0340995541490976|
+----------+------------------+



### 3 - which day recorded the most trips?

*   List item
*   List item



In [None]:
df6 = df6.withColumn("day_of_month", dayofmonth("tpep_pickup_datetime"))
daily_trip_counts = df6.groupBy("day_of_month").count()
max_day_count = daily_trip_counts.orderBy(col("count").desc()).first()

In [None]:
max_day_count

Row(day_of_month=8, count=290041)

### 4- What is the average "total amount" of trips with more than 2 passengers?

In [None]:
average_total_amount = df6.filter(col("passenger_count") > 2).agg(avg("total_amount").alias("avg_total_amount")).first()["avg_total_amount"]


In [None]:
average_total_amount

16.966685446889006

### 5- On average, when is it more likely that the tip is higher, when there are multiple passengers or just 1?

In [None]:
single_average_tip_amount = df6.filter(col("passenger_count") == 1).agg(avg("tip_amount").alias("avg_tip_amount")).first()["avg_tip_amount"]
multi_average_tip_amount = df6.filter(col("passenger_count") > 1).agg(avg("tip_amount").alias("avg_tip_amount")).first()["avg_tip_amount"]

In [None]:
single_average_tip_amount

1.8278228197812318

In [None]:
multi_average_tip_amount

1.8062610705652205

### 6- What is the most frequent route on the weekend.

In [None]:
df7 = df6.withColumn("route", concat(col("pu_location"), lit("+"), col("do_location")))


In [None]:
weekend_trips_by_route = df7.filter(col("is_weekend") == 1).groupBy("route").count()

In [None]:
most_frequent = weekend_trips_by_route.orderBy(desc("count")).first()["route"]

In [None]:
most_frequent

'Unknown,NV+Unknown,NV'

## Encoding
- Label encode all categorical fetaures.
- Create a lookup table for these label encoded features. You can use the same format/example as the lookup table in Milestone 1 description.

(You are allowed to store and manipulate the lookup table as a pandas dataframe, it does not have to be a PySpark df).
- Remove the original unencoded categorical features from the df after encoding.

In [None]:
 categorical_columns = [item[0] for item in df2.dtypes if item[1].startswith('string')]

In [None]:
categorical_columns

['vendor',
 'rate_type',
 'store_and_fwd_flag',
 'pu_location',
 'do_location',
 'payment_type']

In [None]:
unique_values_dict = pd.DataFrame(columns=categorical_columns)
for column in categorical_columns:
    string_indexer = StringIndexer(inputCol=column, outputCol=f'{column}_label')
    df7 = string_indexer.fit(df7).transform(df7)
    unique_values = df7.select(column, f'{column}_label').distinct().toPandas()
    unique_values["feature_name"] = column
    unique_values_dict = pd.concat([unique_values_dict, unique_values], ignore_index=True)
    df7 = df7.drop(column)


In [None]:
unique_values_dict

Unnamed: 0,vendor,rate_type,store_and_fwd_flag,pu_location,do_location,payment_type,vendor_label,feature_name,rate_type_label,store_and_fwd_flag_label,pu_location_label,do_location_label,payment_type_label
0,"Creative Mobile Technologies, LLC",,,,,,1.0,vendor,,,,,
1,VeriFone Inc.,,,,,,0.0,vendor,,,,,
2,Unknown,,,,,,2.0,vendor,,,,,
3,,JFK,,,,,,rate_type,1.0,,,,
4,,Standard rate,,,,,,rate_type,0.0,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
532,,,,,,Credit card,,payment_type,,,,,0.0
533,,,,,,Cash,,payment_type,,,,,1.0
534,,,,,,Dispute,,payment_type,,,,,4.0
535,,,,,,Unknown,,payment_type,,,,,2.0


### Preview first 20 rows of the label encoded features

In [None]:
modified_list = [item + "_label" for item in categorical_columns]

df7.select(modified_list).show(vertical=True)

-RECORD 0------------------------
 vendor_label             | 0.0  
 rate_type_label          | 0.0  
 store_and_fwd_flag_label | 0.0  
 pu_location_label        | 5.0  
 do_location_label        | 22.0 
 payment_type_label       | 0.0  
-RECORD 1------------------------
 vendor_label             | 0.0  
 rate_type_label          | 0.0  
 store_and_fwd_flag_label | 0.0  
 pu_location_label        | 1.0  
 do_location_label        | 2.0  
 payment_type_label       | 0.0  
-RECORD 2------------------------
 vendor_label             | 0.0  
 rate_type_label          | 0.0  
 store_and_fwd_flag_label | 0.0  
 pu_location_label        | 1.0  
 do_location_label        | 21.0 
 payment_type_label       | 0.0  
-RECORD 3------------------------
 vendor_label             | 0.0  
 rate_type_label          | 0.0  
 store_and_fwd_flag_label | 0.0  
 pu_location_label        | 2.0  
 do_location_label        | 6.0  
 payment_type_label       | 1.0  
-RECORD 4------------------------
 vendor_label 

### Preview first 20 rows of your lookup table

In [None]:
unique_values_dict.head(20)

Unnamed: 0,vendor,rate_type,store_and_fwd_flag,pu_location,do_location,payment_type,vendor_label,feature_name,rate_type_label,store_and_fwd_flag_label,pu_location_label,do_location_label,payment_type_label
0,"Creative Mobile Technologies, LLC",,,,,,1.0,vendor,,,,,
1,VeriFone Inc.,,,,,,0.0,vendor,,,,,
2,Unknown,,,,,,2.0,vendor,,,,,
3,,JFK,,,,,,rate_type,1.0,,,,
4,,Standard rate,,,,,,rate_type,0.0,,,,
5,,Newark,,,,,,rate_type,3.0,,,,
6,,Unknown,,,,,,rate_type,5.0,,,,
7,,Negotiated fare,,,,,,rate_type,2.0,,,,
8,,Nassau or Westchester,,,,,,rate_type,4.0,,,,
9,,Group ride,,,,,,rate_type,6.0,,,,


### Load the cleaned PySpark df to a parquet file and the lookup table to a csv file.

In [None]:
df7.write.parquet("/content/drive/My Drive/yellow_tripdata_2018-08_cleaned.parquet" , mode="overwrite")

In [None]:
df7.printSchema()

root
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)
 |-- airport_fee: integer (nullable = true)
 |-- pickup_timestamp: timestamp (nullable = true)
 |-- dropoff_timestamp: timestamp (nullable = true)
 |-- trip_duration: string (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- is_weekend: integer (nullable = true)
 |-- week_number: integer (nullable = true)
 |-- day_of_month: integer (nullable = true)
 |-- route: string (nullable = false)
 |-- vendor_label: double (null

In [None]:
unique_values_dict.to_csv("/content/drive/My Drive/lookup.csv", index=False)

## Bonus - Load the cleaned parquet file and lookup table into a Postgres database.

Note that if you decide to do the bonus, you must include not only your notebook but the docker-compose.yaml file aswell.

In [None]:
engine = create_engine("postgresql://abdullahahmadfouad:password123@db:5432/yellow_taxi_8_18_postgres")
engine.connect()
lookup_df.to_sql("lookup_yellow_taxi_8_18", engine, if_exists="append", index=False)

In [None]:
postgres_properties = {
    "driver": "org.postgresql.Driver",
    "url": "jdbc:postgresql://db:5432/yellow_taxi_8_18_postgres",
    "user": "abdullahahmadfouad",
    "password": "password123",
    "dbtable": "yellow_taxi_8_18_postgres_cleaned",
}
encoded_df.write.jdbc(url=postgres_properties["url"],table=postgres_properties["dbtable"],mode="append",properties=postgres_properties)

### Screenshot of the table existing in the database and a simple query such as `select count(*) from table_name` or `select * from table_name limit 10`

(You can just copy paste the screenshots in the markdown cells below)