# Milestone 3 - Pre-processing and analysis with PySpark

## Deadline - Sunday, 10th of December @11.59 pm 

The goal of this milestone is to preprocess the dataset 'New York yellow taxis' by performing basic data preparation and basic analysis to gain a better understanding of the data using PySpark.

Use the same month and year you used for the green taxis in milestone 1. [Datasets](https://drive.google.com/drive/folders/1t8nBgbHVaA5roZY4z3RcAG1_JMYlSTqu?usp=sharing) (download the yellow taxis dataset).

Important Notes:
- You MUST use this notebook template/structure. not doing so will result in marks deduction.
- You MUST have the cells run and output shown similar to milestone 1. I will NOT RUN YOUR NOTEBOOK.

Submission guidelines: same as milestone 1.

Notebook name must be same format as the file you named in miletsone 1. Just M3 instead of M1.

IMPORTANT: You are only allowed to use PySpark unless explicitly told otherwise(i.e last task).

Useful resource/documentation (highly recommended) - [PySpark examples](https://sparkbyexamples.com/pyspark-tutorial/)


## Weight dist.
- Loading the dataset : 5%
- Basic cleaning: 30%
	- column renaming: 10%
	- detect missing: 35%
	- Handle missing: 35%
	- Check missing : 20%
- Analyses: 30%
- Encoding: 20%
- Lookup table: 10%
- Writing the cleaned and lookup table back as parquet and csv files: 5%.

# Tasks:

## Load the dataset.

In [1]:
## start the session
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("M3").config("spark.memory.offHeap.enabled","true").config("spark.memory.offHeap.size","10g").getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
# spark context to interact with the driver
sc = spark.sparkContext

from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, datediff, dayofweek, weekofyear, date_format, avg, when, to_date
from pyspark.sql.types import StringType, NumericType
from pyspark.ml.feature import StringIndexer
import pandas as pd 

In [2]:
df = spark.read.parquet('yellow_tripdata_2016-01.parquet')

### Preview first 20 rows.

In [3]:
df.show(20,vertical=True)

-RECORD 0-------------------------------------
 Vendor                | Creative Mobile T... 
 tpep_pickup_datetime  | 2016-01-01 01:12:22  
 tpep_dropoff_datetime | 2016-01-01 01:29:14  
 passenger_count       | 1.0                  
 trip_distance         | 3.2                  
 Rate_type             | Standard rate        
 store_and_fwd_flag    | N                    
 PU_Location           | Manhattan,Clinton... 
 DO_Location           | Manhattan,Yorkvil... 
 payment_type          | Credit card          
 fare_amount           | 14.0                 
 extra                 | 0.5                  
 mta_tax               | 0.5                  
 tip_amount            | 3.06                 
 tolls_amount          | 0.0                  
 improvement_surcharge | 0.3                  
 total_amount          | 18.36                
 congestion_surcharge  | null                 
 airport_fee           | null                 
-RECORD 1-------------------------------------
 Vendor      

In [4]:
df.printSchema()

root
 |-- Vendor: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- Rate_type: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PU_Location: string (nullable = true)
 |-- DO_Location: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)
 |-- airport_fee: integer (nullable = true)



### How many partitions is this dataframe split into?

In [5]:
df.rdd.getNumPartitions()

12

## Basic cleaning

### rename all columns (replacing a space with an underscore, and making it lowercase)

In [6]:
new_column_names = {col: col.replace(" ", "_").lower() for col in df.columns}
for old_name, new_name in new_column_names.items():
    df = df.withColumnRenamed(old_name, new_name)

In [7]:
df.printSchema()

root
 |-- vendor: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rate_type: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pu_location: string (nullable = true)
 |-- do_location: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)
 |-- airport_fee: integer (nullable = true)



### Detect and remove duplicates
- Duplicates are trips with same pickup time,pickup location, dropoff time,drop off location and trip distance

In [8]:
 df.count()

10905073

In [9]:
duplicate_columns = ["tpep_pickup_datetime", "pu_location", "tpep_dropoff_datetime", "do_location", "trip_distance"]

In [10]:
df_no_duplicates = df.dropDuplicates(subset=duplicate_columns)

In [11]:
df_no_duplicates.count()

10900876

### check that there is are no duplicates

In [12]:
duplicates_df_after_droping_duplicates = df_no_duplicates.groupBy(
    "tpep_pickup_datetime", "pu_location", 
    "tpep_dropoff_datetime", "do_location", 
    "trip_distance"
).count().where('count > 1')

num_duplicates = duplicates_df_after_droping_duplicates.count()
print(f"The DataFrame has {num_duplicates} duplicate rows based on the given criteria.")

The DataFrame has 0 duplicate rows based on the given criteria.


### Detect missing
- Create a function that takes in the df and returns any data structrue of your choice(df/dict,list,tuple,etc) which has the name of the column and percentage of missing entries from the whole dataset.
- Tip : storing the missing info as dict where the key is the column name and value is the percentage would be the easiest.  

In [13]:
def calculate_missing_percentage(df: DataFrame) -> dict:
    total_rows = df.count()
    missing_data = {}

    for column in df.columns:
        # Count the number of missing or null entries in each column
        missing_count = df.filter((col(column).isNull()) | (col(column) == "")).count()
        # Calculate the percentage of missing entries
        missing_percentage = (missing_count / total_rows) * 100
        missing_data[column] = missing_percentage

    return missing_data

### Prinout the missing info

In [14]:
def print_missing_data(missing_data: dict):
    print("Missing Data Percentage by Column:")
    print("-----------------------------------")
    for column, percentage in missing_data.items():
        print(f"{column:<30} : {percentage:>6.2f}%")

# Example usage
missing_percentages = calculate_missing_percentage(df_no_duplicates)
print_missing_data(missing_percentages)

Missing Data Percentage by Column:
-----------------------------------
vendor                         :   0.00%
tpep_pickup_datetime           :   0.00%
tpep_dropoff_datetime          :   0.00%
passenger_count                :   0.00%
trip_distance                  :   0.00%
rate_type                      :   0.00%
store_and_fwd_flag             :   0.00%
pu_location                    :   0.00%
do_location                    :   0.00%
payment_type                   :   7.42%
fare_amount                    :   0.00%
extra                          :  52.35%
mta_tax                        :   0.00%
tip_amount                     :   0.00%
tolls_amount                   :   0.00%
improvement_surcharge          :   0.00%
total_amount                   :   0.00%
congestion_surcharge           : 100.00%
airport_fee                    : 100.00%


### Handle missing
- For numerical features replace with 0.
- For categorical/strings replace with 'Unknown'


In [15]:
def replace_missing_values(df: DataFrame) -> DataFrame:
    # Initialize dictionaries for replacement values
    replacements = {}

    for column in df.columns:
        # Check the data type of the column
        if isinstance(df.schema[column].dataType, NumericType):
            # Set replacement value to 0 for numerical columns
            replacements[column] = 0
        elif isinstance(df.schema[column].dataType, StringType):
            # Set replacement value to 'Unknown' for string columns
            replacements[column] = "Unknown"

    # Use fillna with the dictionary of replacements
    df_imputed = df.fillna(value=replacements)

    return df_imputed

### check that there are no missing values

In [16]:
df_imputed = replace_missing_values(df_no_duplicates)
missing_percentages = calculate_missing_percentage(df_imputed)
print_missing_data(missing_percentages)

Missing Data Percentage by Column:
-----------------------------------
vendor                         :   0.00%
tpep_pickup_datetime           :   0.00%
tpep_dropoff_datetime          :   0.00%
passenger_count                :   0.00%
trip_distance                  :   0.00%
rate_type                      :   0.00%
store_and_fwd_flag             :   0.00%
pu_location                    :   0.00%
do_location                    :   0.00%
payment_type                   :   0.00%
fare_amount                    :   0.00%
extra                          :   0.00%
mta_tax                        :   0.00%
tip_amount                     :   0.00%
tolls_amount                   :   0.00%
improvement_surcharge          :   0.00%
total_amount                   :   0.00%
congestion_surcharge           :   0.00%
airport_fee                    :   0.00%


## Feature engineering - 
Write a function that adds the 3 following features. Use built in fucntions in PySpark (from the functions library) check lab 8, Avoid writing UDFs from scratch.
- trip duration (the format/unit is up to you)
- is_weekend. whether the trip occurred on Saturday or Sunday.
- week number (relevant to the month and not year, i.e 1,2,3,4 nto 31,32,33...) 

In [17]:
def add_new_features(df: DataFrame) -> DataFrame:

    # Trip duration in minutes
    df = df.withColumn("trip_duration", (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 60)

    # Check if the trip is on a weekend
    df = df.withColumn("is_weekend", dayofweek("tpep_pickup_datetime").isin([1, 7]).cast("int")) # 1 sunday , 7 saturday

    # Week number of the month
    df = df.withColumn("week_number", weekofyear("tpep_pickup_datetime") - weekofyear(date_format("tpep_pickup_datetime", "yyyy-MM-01")) + 1)

    return df

In [18]:
df_with_features = add_new_features(df_imputed)

### Preview the first 20 rows (only select the following features: pickup and droptime, and the 3 features you added). 

In [19]:
df_with_features.select("tpep_pickup_datetime", "tpep_dropoff_datetime", "trip_duration", "is_weekend", "week_number").show(20)

+--------------------+---------------------+------------------+----------+-----------+
|tpep_pickup_datetime|tpep_dropoff_datetime|     trip_duration|is_weekend|week_number|
+--------------------+---------------------+------------------+----------+-----------+
| 2016-01-01 01:22:08|  2016-01-01 01:27:13| 5.083333333333333|         0|          1|
| 2016-01-01 01:59:55|  2016-01-01 02:30:21|30.433333333333334|         0|          1|
| 2016-01-01 01:23:36|  2016-01-01 01:30:39|              7.05|         0|          1|
| 2016-01-01 01:51:57|  2016-01-01 01:56:03|               4.1|         0|          1|
| 2016-01-01 01:56:09|  2016-01-01 02:04:53| 8.733333333333333|         0|          1|
| 2016-01-01 01:57:08|  2016-01-01 02:30:19| 33.18333333333333|         0|          1|
| 2016-01-01 01:30:29|  2016-01-01 01:37:37| 7.133333333333334|         0|          1|
| 2016-01-01 01:52:56|  2016-01-01 01:58:09| 5.216666666666667|         0|          1|
| 2016-01-01 01:14:09|  2016-01-01 01:33:37

## Analyses - Answer the following 5 questions (by showing the output and and a short 1-2 sentences regarding your observation/answer) 

MUST Use the PySpark SQL API.

DO NOT explicitly write SQL queries. Doing so will result in 50% deduction (for the question). Check lab 7.

You are free to add columns if it will help in answering a question and add useful info to the dataset.

### 1- What is the average fare amount per payment type 

In [20]:
average_fare_per_payment_type = df_with_features.groupBy("payment_type").agg(avg("fare_amount").alias("average_fare"))

average_fare_per_payment_type.show()

+------------+------------------+
|payment_type|      average_fare|
+------------+------------------+
|     Unknown|12.311591304186663|
|        Cash|11.512400503682002|
|     Dispute|10.695270258583408|
|   No charge|11.184720309810677|
| Credit card|13.017946893502431|
+------------+------------------+



- Credit card payments show the highest average fare at approximately `$13.02`, while disputes have the lowest at about `$10.70`.

### 2- Do people tend to go on a longer trips during the weekend or weekdays?

In [21]:
df_with_features.groupBy("is_weekend").avg("trip_duration").show()

+----------+------------------+
|is_weekend|avg(trip_duration)|
+----------+------------------+
|         1|14.632815909836818|
|         0| 15.54322676289103|
+----------+------------------+



- Trips during weekdays have a higher average duration (about 15.54 minutes) compared to weekends (about 14.63 minutes), indicating that people tend to take longer trips on weekdays.

### 3 - which day recorded the most trips?

In [22]:
trips_by_date = df_with_features.groupBy(to_date(col("tpep_pickup_datetime")).alias("date")).agg(count("*").alias("trip_count"))

most_trips_date = trips_by_date.orderBy(col("trip_count").desc()).limit(1)

In [23]:
most_trips_date.show()


+----------+----------+
|      date|trip_count|
+----------+----------+
|2016-01-30|    434579|
+----------+----------+



- The day that recorded the most trips was January 30, 2016, with a total of 434,579 trips.

### 4- What is the average "total amount" of trips with more than 2 passengers?

In [24]:
df_more_than_two_passengers = df_with_features.filter(col("passenger_count") > 2)

average_total_amount = df_more_than_two_passengers.agg(avg("total_amount").alias("average_total_amount"))

# Show the result
average_total_amount.show()

+--------------------+
|average_total_amount|
+--------------------+
|  15.721455626940072|
+--------------------+



- The average total amount for trips with more than 2 passengers is approximately $15.72.

### 5- On average, when is it more likely that the tip is higher, when there are multiple passengers or just 1.?

In [25]:
average_tip_single_passenger = df_with_features.filter(col("passenger_count") == 1) \
                                 .agg(avg("tip_amount").alias("average_tip_single_passenger"))

# Calculate the average tip amount for multiple passenger trips
average_tip_multiple_passengers = df_with_features.filter(col("passenger_count") > 1) \
                                    .agg(avg("tip_amount").alias("average_tip_multiple_passengers"))

# Show the results
average_tip_single_passenger.show()
average_tip_multiple_passengers.show()

+----------------------------+
|average_tip_single_passenger|
+----------------------------+
|          1.7570142760509586|
+----------------------------+

+-------------------------------+
|average_tip_multiple_passengers|
+-------------------------------+
|             1.7379321409699922|
+-------------------------------+



- On average, single-passenger trips have a slightly higher tip amount at about `$1.76` compared to trips with multiple passengers, which average around `$1.74` in tips.

### 6- What is the most frequent route on the weekend. 

In [26]:
weekend_trips = df_with_features.filter(col("is_weekend") == 1)

# Group by pickup and dropoff location, and count the number of trips for each route
route_counts = weekend_trips.groupBy("pu_location", "do_location").agg(count("*").alias("trip_count"))

# Find the most frequent route
most_frequent_route = route_counts.orderBy(col("trip_count").desc()).limit(1)

# Show the result
most_frequent_route.show()

+-----------+-----------+----------+
|pu_location|do_location|trip_count|
+-----------+-----------+----------+
| Unknown,NV| Unknown,NV|     40617|
+-----------+-----------+----------+



- The most frequent route on the weekend was from "Unknown, NV" to "Unknown, NV", with a total of 40,617 trips.

## Encoding
- Label encode all categorical fetaures.
- Create a lookup table for these label encoded features. You can use the same format/example as the lookup table in Milestone 1 description.

(You are allowed to store and manipulate the lookup table as a pandas dataframe, it does not have to be a PySpark df).
- Remove the original unencoded categorical features from the df after encoding.

In [27]:
def label_encode_and_create_lookup(df: DataFrame, categorical_columns: list) -> (DataFrame, pd.DataFrame):
    # DataFrame to store all lookup information
    all_lookups = pd.DataFrame(columns=['feature', 'old_value', 'new_value'])

    for column in categorical_columns:
        # Create the indexer
        indexer = StringIndexer(inputCol=column, outputCol=column + "_encoded")

        # Fit the indexer to the DataFrame and transform the column
        model = indexer.fit(df)
        df = model.transform(df)

        # Create a lookup table for this column
        labels = model.labels
        lookup_table = pd.DataFrame({
            'feature': column,
            'old_value': labels,
            'new_value': range(len(labels))
        })

        # Append this lookup table to the all_lookups DataFrame
        all_lookups = pd.concat([all_lookups, lookup_table], ignore_index=True)

        # Drop the original column
        df = df.drop(column)

    return df, all_lookups

In [28]:
categorical_columns = ['vendor', 'rate_type', 'store_and_fwd_flag', 'pu_location', 'do_location', 'payment_type']
df_encoded, lookup_table = label_encode_and_create_lookup(df_with_features, categorical_columns)

### Preview first 20 rows of the label encoded features

In [29]:
df_encoded.select([f'{feature}_encoded' for feature in categorical_columns]).show(vertical=True)

-RECORD 0--------------------------
 vendor_encoded             | 0.0  
 rate_type_encoded          | 0.0  
 store_and_fwd_flag_encoded | 0.0  
 pu_location_encoded        | 27.0 
 do_location_encoded        | 7.0  
 payment_type_encoded       | 1.0  
-RECORD 1--------------------------
 vendor_encoded             | 1.0  
 rate_type_encoded          | 0.0  
 store_and_fwd_flag_encoded | 0.0  
 pu_location_encoded        | 13.0 
 do_location_encoded        | 11.0 
 payment_type_encoded       | 2.0  
-RECORD 2--------------------------
 vendor_encoded             | 1.0  
 rate_type_encoded          | 0.0  
 store_and_fwd_flag_encoded | 0.0  
 pu_location_encoded        | 32.0 
 do_location_encoded        | 15.0 
 payment_type_encoded       | 2.0  
-RECORD 3--------------------------
 vendor_encoded             | 1.0  
 rate_type_encoded          | 0.0  
 store_and_fwd_flag_encoded | 0.0  
 pu_location_encoded        | 4.0  
 do_location_encoded        | 29.0 
 payment_type_encoded       

### Preview first 20 rows of your lookup table

In [30]:
print(lookup_table.head(20))

               feature                               old_value new_value
0               vendor                           VeriFone Inc.         0
1               vendor       Creative Mobile Technologies, LLC         1
2            rate_type                           Standard rate         0
3            rate_type                                     JFK         1
4            rate_type                         Negotiated fare         2
5            rate_type                                  Newark         3
6            rate_type                   Nassau or Westchester         4
7            rate_type                                 Unknown         5
8            rate_type                              Group ride         6
9   store_and_fwd_flag                                       N         0
10  store_and_fwd_flag                                       Y         1
11         pu_location         Manhattan,Upper East Side South         0
12         pu_location                Manhattan,Mid

### Load the cleaned PySpark df to a parquet file and the lookup table to a csv file.

In [31]:
parquet_file_path = "yellow_trip_data_2016-01clean.parquet"
df_encoded.write.parquet(parquet_file_path, mode="overwrite")

In [32]:
csv_file_path = "lookup_table_yellow_taxis.csv"
lookup_table.to_csv(csv_file_path, index=False)

## Bonus - Load the cleaned parquet file and lookup table into a Postgres database. 

Note that if you decide to do the bonus, you must include not only your notebook but the docker-compose.yaml file aswell.

### Screenshot of the table existing in the database and a simple query such as `select count(*) from table_name` or `select * from table_name limit 10`

(You can just copy paste the screenshots in the markdown cells below)