# Milestone 3 - Pre-processing and analysis with PySpark

## Deadline - Sunday, 10th of December @11.59 pm 

The goal of this milestone is to preprocess the dataset 'New York yellow taxis' by performing basic data preparation and basic analysis to gain a better understanding of the data using PySpark.

Use the same month and year you used for the green taxis in milestone 1. [Datasets](https://drive.google.com/drive/folders/1t8nBgbHVaA5roZY4z3RcAG1_JMYlSTqu?usp=sharing) (download the yellow taxis dataset).

Important Notes:
- You MUST use this notebook template/structure. not doing so will result in marks deduction.
- You MUST have the cells run and output shown similar to milestone 1. I will NOT RUN YOUR NOTEBOOK.

Submission guidelines: same as milestone 1.

Notebook name must be same format as the file you named in miletsone 1. Just M3 instead of M1.

IMPORTANT: You are only allowed to use PySpark unless explicitly told otherwise(i.e last task).

Useful resource/documentation (highly recommended) - [PySpark examples](https://sparkbyexamples.com/pyspark-tutorial/)


## Weight dist.
- Loading the dataset : 5%
- Basic cleaning: 30%
	- column renaming: 10%
	- detect missing: 35%
	- Handle missing: 35%
	- Check missing : 20%
- Analyses: 30%
- Encoding: 20%
- Lookup table: 10%
- Writing the cleaned and lookup table back as parquet and csv files: 5%.

# Tasks:

## Load the dataset.

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import NumericType, DateType
from pyspark.sql.functions import when, col, unix_timestamp, dayofweek, dayofmonth, floor, count, avg, desc
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
spark = SparkSession.builder.appName("NYC_Yellow_Taxi").config("spark.executor.memory", "10g").config("spark.driver.memory", "10g").config("spark.driver.memoryOverhead", "512M").getOrCreate()
sc = spark.sparkContext

In [2]:
path = './yellow_tripdata_2015-01.parquet'
yellow_df = spark.read.parquet(path)

### Preview first 20 rows.

In [3]:
num_of_rows = yellow_df.count()
print(f"Rows count is {num_of_rows}.")

Rows count is 12741040.


In [4]:
yellow_df.show(20, vertical=True)

-RECORD 0-------------------------------------
 Vendor                | Creative Mobile T... 
 tpep_pickup_datetime  | 2015-01-01 01:11:33  
 tpep_dropoff_datetime | 2015-01-01 01:16:48  
 passenger_count       | 1.0                  
 trip_distance         | 1.0                  
 Rate_type             | Standard rate        
 store_and_fwd_flag    | N                    
 PU_Location           | Manhattan,Central... 
 DO_Location           | Manhattan,Morning... 
 payment_type          | Credit card          
 fare_amount           | 5.7                  
 extra                 | 0.5                  
 mta_tax               | 0.5                  
 tip_amount            | 1.4                  
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.0                  
 total_amount          | 8.4                  
 congestion_surcharge  | null                 
 airport_fee           | null                 
-RECORD 1-------------------------------------
 Vendor      

### How many partitions is this dataframe split into?

In [5]:
num_of_partitions = yellow_df.rdd.getNumPartitions()

In [6]:
print(f"The DataFrame is split into {num_of_partitions} partitions.")

The DataFrame is split into 24 partitions.


## Basic cleaning

### rename all columns (replacing a space with an underscore, and making it lowercase)

In [7]:
for column in yellow_df.columns:
    yellow_df = yellow_df.withColumnRenamed(column, column.replace(' ', '_').lower())

In [8]:
yellow_df.columns

['vendor',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'rate_type',
 'store_and_fwd_flag',
 'pu_location',
 'do_location',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

### Detect and remove duplicates
- Duplicates are trips with same pickup time,pickup location, dropoff time,drop off location and trip distance

In [9]:
columns_to_check_duplicates = ["tpep_pickup_datetime", "pu_location", "tpep_dropoff_datetime", "do_location", "trip_distance"]
num_of_duplicates = yellow_df.groupBy(columns_to_check_duplicates).count().filter("count > 1").count()
print(f"Number of duplicate rows: {num_of_duplicates}")

Number of duplicate rows: 4071


In [10]:
yellow_df_drop_duplicates = yellow_df.dropDuplicates(subset=columns_to_check_duplicates)

### check that there is are no duplicates

In [11]:
num_of_duplicates = yellow_df_drop_duplicates.groupBy(columns_to_check_duplicates).count().filter("count > 1").count()
print(f"Number of duplicate rows: {num_of_duplicates}")

Number of duplicate rows: 0


In [12]:
num_of_rows = yellow_df_drop_duplicates.count()
print(f"Rows count is {num_of_rows}.")

Rows count is 12736758.


### Detect missing
- Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
- Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.  

In [13]:
def detect_missing(df):
    total_rows = df.count()
    missing_info = {}
    for column in df.columns:
        missing_count = df.filter(df[column].isNull()).count()
        percentage_missing = (missing_count / total_rows) * 100
        missing_info[column] = percentage_missing
    return missing_info

### Prinout the missing info

In [14]:
missing_info_dict = detect_missing(yellow_df_drop_duplicates)

In [15]:
print("Missing information:")
for column, percentage_missing in missing_info_dict.items():
    print(f"{column}: {percentage_missing:.2f}%")

Missing information:
vendor: 0.00%
tpep_pickup_datetime: 0.00%
tpep_dropoff_datetime: 0.00%
passenger_count: 0.05%
trip_distance: 0.00%
rate_type: 0.00%
store_and_fwd_flag: 0.00%
pu_location: 0.00%
do_location: 0.00%
payment_type: 9.05%
fare_amount: 0.00%
extra: 52.62%
mta_tax: 0.00%
tip_amount: 0.00%
tolls_amount: 0.00%
improvement_surcharge: 0.00%
total_amount: 0.00%
congestion_surcharge: 100.00%
airport_fee: 100.00%


### Handle missing
- For numerical features replace with 0.
- For categorical/strings replace with 'Unknown'


In [16]:
def handle_missing(df):
    for col_name in df.columns:
        col_type = df.schema[col_name].dataType
        if isinstance(col_type, NumericType):
            df = df.withColumn(col_name, when(col(col_name).isNull(), 0).otherwise(col(col_name)))
        else:
            df = df.withColumn(col_name, when(col(col_name).isNull(), 'Unknown').otherwise(col(col_name)))
    return df

In [17]:
yellow_df_handled_missing = handle_missing(yellow_df_drop_duplicates)

### check that there are no missing values

In [18]:
missing_info_dict = detect_missing(yellow_df_handled_missing)

In [19]:
print("Missing information:")
for column, percentage_missing in missing_info_dict.items():
    print(f"{column}: {percentage_missing:.2f}%")

Missing information:
vendor: 0.00%
tpep_pickup_datetime: 0.00%
tpep_dropoff_datetime: 0.00%
passenger_count: 0.00%
trip_distance: 0.00%
rate_type: 0.00%
store_and_fwd_flag: 0.00%
pu_location: 0.00%
do_location: 0.00%
payment_type: 0.00%
fare_amount: 0.00%
extra: 0.00%
mta_tax: 0.00%
tip_amount: 0.00%
tolls_amount: 0.00%
improvement_surcharge: 0.00%
total_amount: 0.00%
congestion_surcharge: 0.00%
airport_fee: 0.00%


## Feature engineering - 
Write a function that adds the 3 following features. Use built in fucntions in PySpark (from the functions library) check lab 8, Avoid writing UDFs from scratch.
- trip duration (the format/unit is up to you)
- is_weekend. whether the trip occurred on Saturday or Sunday.
- week number (relevant to the month and not year, i.e 1,2,3,4 nto 31,32,33...) 

In [20]:
def add_trip_duration(df):
    df = df.withColumn("trip_duration_minutes", (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60.0)
    return df

In [21]:
def add_is_weekend(df):
    df = df.withColumn("is_weekend", when((dayofweek("tpep_pickup_datetime") == 1) | (dayofweek("tpep_pickup_datetime") == 7), 1).otherwise(0))
    return df

In [22]:
def add_week_number(df):
    df = df.withColumn("day_of_month", dayofmonth("tpep_pickup_datetime"))
    df = df.withColumn("week_number", floor((col("day_of_month") - 1) / 7) + 1).drop("day_of_month")
    return df

In [23]:
yellow_df_trip_duration = add_trip_duration(yellow_df_handled_missing)
yellow_df_is_weekend = add_is_weekend(yellow_df_trip_duration)
yellow_df_week_number = add_week_number(yellow_df_is_weekend)

### Preview the first 20 rows (only select the following features: pickup and droptime, and the 3 features you added). 

In [24]:
selected_features = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_duration_minutes", "is_weekend", "week_number"]

In [25]:
preview_df = yellow_df_week_number.select(selected_features)
preview_df.show(20, vertical=True)

-RECORD 0------------------------------------
 tpep_pickup_datetime  | 2015-01-01 01:25:19 
 tpep_dropoff_datetime | 2015-01-01 01:38:41 
 trip_duration_minutes | 13.366666666666667  
 is_weekend            | 0                   
 week_number           | 1                   
-RECORD 1------------------------------------
 tpep_pickup_datetime  | 2015-01-01 01:45:23 
 tpep_dropoff_datetime | 2015-01-01 01:49:25 
 trip_duration_minutes | 4.033333333333333   
 is_weekend            | 0                   
 week_number           | 1                   
-RECORD 2------------------------------------
 tpep_pickup_datetime  | 2015-01-01 01:28:16 
 tpep_dropoff_datetime | 2015-01-01 01:38:17 
 trip_duration_minutes | 10.016666666666667  
 is_weekend            | 0                   
 week_number           | 1                   
-RECORD 3------------------------------------
 tpep_pickup_datetime  | 2015-01-01 01:59:42 
 tpep_dropoff_datetime | 2015-01-01 02:18:16 
 trip_duration_minutes | 18.566666

## Analyses - Answer the following 5 questions (by showing the output and and a short 1-2 sentences regarding your observation/answer) 

MUST Use the PySpark SQL API.

DO NOT explicitly write SQL queries. Doing so will result in 50% deduction (for the question). Check lab 7.

You are free to add columns if it will help in answering a question and add useful info to the dataset.

### 1- What is the average fare amount per payment type 

In [26]:
average_fare_per_payment_type = yellow_df_week_number.groupBy("payment_type").agg(avg("fare_amount").alias("average_fare"))

In [27]:
average_fare_per_payment_type.show()

+------------+------------------+
|payment_type|      average_fare|
+------------+------------------+
|     Unknown|11.805914899858447|
|        Cash| 10.96699928944869|
|     Dispute| 9.171704940848985|
|   No charge|10.188251557137416|
| Credit card|12.505705603533212|
+------------+------------------+



### 2- Do people tend to go on a longer trips during the weekend or weekdays?

In [28]:
average_duration_by_weekday = yellow_df_week_number.groupBy("is_weekend").agg(f.avg("trip_duration_minutes").alias("average_duration"))

In [29]:
average_duration_by_weekday.show()

+----------+------------------+
|is_weekend|  average_duration|
+----------+------------------+
|         1|15.120724932353053|
|         0|13.744531820292934|
+----------+------------------+



### 3 - which day recorded the most trips?

In [30]:
most_trips_day = yellow_df_week_number.groupBy(dayofweek("tpep_pickup_datetime").alias("day_of_week")).agg(count("*").alias("trip_count")).orderBy(desc("trip_count")).limit(1)

In [31]:
most_trips_day.show()

+-----------+----------+
|day_of_week|trip_count|
+-----------+----------+
|          7|   2374382|
+-----------+----------+



### 4- What is the average "total amount" of trips with more than 2 passengers?

In [32]:
average_total_amount_for_more_than_2_passengers = yellow_df_week_number.filter(col("passenger_count") > 2).agg(avg("total_amount").alias("average_total_amount"))

In [33]:
average_total_amount_for_more_than_2_passengers.show()

+--------------------+
|average_total_amount|
+--------------------+
|  14.850233581991185|
+--------------------+



### 5- On average, when is it more likely that the tip is higher, when there are multiple passengers or just 1.?

In [34]:
average_tip_amount_for_passenger_count = yellow_df_week_number.groupBy("passenger_count").agg(avg("tip_amount").alias("average_tip_amount"))

In [35]:
average_tip_amount_for_passenger_count.show()

+---------------+------------------+
|passenger_count|average_tip_amount|
+---------------+------------------+
|            8.0|             1.209|
|            0.0|1.4423953559425604|
|            7.0| 2.088888888888889|
|            1.0| 1.999657552001739|
|            4.0|1.3399863250295432|
|            3.0|1.4408743556432586|
|            2.0|1.5419974289131988|
|            6.0|1.5064134995091336|
|            5.0|1.5425411227032313|
|            9.0| 6.836363636363637|
+---------------+------------------+



### 6- What is the most frequent route on the weekend. 

In [36]:
most_frequent_route_weekend = yellow_df_week_number.filter("is_weekend = 1").groupBy("pu_location", "do_location").agg(count("*").alias("trip_count")).orderBy(desc("trip_count")).limit(1)

In [37]:
most_frequent_route_weekend.show()

+-----------+-----------+----------+
|pu_location|do_location|trip_count|
+-----------+-----------+----------+
| Unknown,NV| Unknown,NV|     67236|
+-----------+-----------+----------+



## Encoding
- Label encode all categorical fetaures.
- Create a lookup table for these label encoded features. You can use the same format/example as the lookup table in Milestone 1 description.

(You are allowed to store and manipulate the lookup table as a pandas dataframe, it does not have to be a PySpark df).
- Remove the original unencoded categorical features from the df after encoding.

In [None]:
yellow_df_week_number = yellow_df_week_number.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(DateType()))
yellow_df_week_number = yellow_df_week_number.withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(DateType()))

In [38]:
def label_encode(df):
    categorical_columns = [col_name for col_name, col_type in df.dtypes if not isinstance(col_type, NumericType)]
    indexers = [
        StringIndexer(inputCol=column, outputCol=f"{column}_encoded")
        for column in categorical_columns
    ]
    pipeline = Pipeline(stages=indexers)
    model = pipeline.fit(df)
    encoded_df = model.transform(df)

    lookup_table = {
        column: {original: encoded for original, encoded in zip(model.stages[i].labels, range(len(model.stages[i].labels)))}
        for i, column in enumerate(categorical_columns)
    }
    encoded_df.drop(*categorical_columns)
    return encoded_df, lookup_table

In [None]:
yellow_df_encoded, lookup_table = label_encode(yellow_df_week_number)

### Preview first 20 rows of the label encoded features

In [39]:
label_encoded_preview = yellow_df_encoded.select([col_name for col_name, col_type in df.dtypes if not isinstance(col_type, NumericType)])
label_encoded_preview.show(20, vertical=True)

-RECORD 0----------------------------------------
 tpep_pickup_datetime       | 2015-01-01         
 tpep_dropoff_datetime      | 2015-01-01         
 passenger_count            | 1.0                
 trip_distance              | 0.48               
 fare_amount                | 4.0                
 extra                      | 0.5                
 mta_tax                    | 0.5                
 tip_amount                 | 0.0                
 tolls_amount               | 0.0                
 improvement_surcharge      | 0.3                
 total_amount               | 5.3                
 congestion_surcharge       | 0                  
 airport_fee                | 0                  
 trip_duration_minutes      | 2.3666666666666667 
 is_weekend                 | 0                  
 week_number                | 1                  
 vendor_encoded             | 0.0                
 rate_type_encoded          | 0.0                
 store_and_fwd_flag_encoded | 0.0                


### Preview first 20 rows of your lookup table

In [40]:
lookup_table_df = pd.DataFrame(lookup_table)
lookup_table_df.head(20)

Unnamed: 0,vendor,rate_type,store_and_fwd_flag,pu_location,do_location,payment_type
VeriFone Inc.,0.0,,,,,
"Creative Mobile Technologies, LLC",1.0,,,,,
Standard rate,,0.0,,,,
JFK,,1.0,,,,
Negotiated fare,,2.0,,,,
Newark,,3.0,,,,
Nassau or Westchester,,4.0,,,,
Unknown,,5.0,,,,2.0
Group ride,,6.0,,,,
N,,,0.0,,,


### Load the cleaned PySpark df to a parquet file and the lookup table to a csv file.

In [41]:
cleaned_parquet_path = "yellow_trip_data_2015-01clean.parquet"
yellow_df_encoded.write.parquet(cleaned_parquet_path, mode="overwrite")
lookup_table_csv_path = "lookup_table_yellow_taxis.csv"
lookup_table_df.to_csv(lookup_table_csv_path, index=False)