# TaxiFare Jupyter Notebook Analysis
The dataset encompasses taxi fare data from 2015-2016 across the USA, featuring over 7 million rows and exceeding 10 GB in size. 

Typically, PySpark is utilized for its RDD API and in-memory computation capabilities, which enhances performance for large-scale data processing. 
However, for initial ETL and data exploration, starting with pandas in a batch mode can be beneficial to visually understand the data before scaling up with Spark.
This project prepares the data for potential machine learning applications, although it does not implement any machine learning models or standardization processes directly.


## üì¶ Package Installation
- **Pyspark**: Ensuring that PySpark is available for processing large datasets using Spark's capabilities.
  ```python

In [1]:
!pip install pyspark



## üìö Library Imports
- Importing essential libraries for data processing


In [2]:
from pyspark.sql import SparkSession, functions as F
import keras 
import numpy as np
from sklearn.model_selection import train_test_split
import seaborn
import matplotlib.pyplot as plt
import plotly.express as px
from pyspark.sql.functions import count, year, month, dayofmonth, col, to_timestamp,when,  mean, stddev, min, max


## üîß Spark Session Initialization
- Configuring Spark to efficiently process and analyze large data volumes.


In [3]:

spark = SparkSession.builder \
    .appName("ETL with PySpark") \
    .config("spark.driver.memory", "5g") \
    .getOrCreate()


## üìÇ Data Reading and Processing
- Code to load and parse taxi fare data from a CSV file using Spark.

In [4]:
df15_1 = spark.read.csv("S:/Data/Us_taxi_fare/yellow_tripdata_2015-01.csv", header=True, inferSchema=True)
df15_2 = spark.read.csv("S:/Data/Us_taxi_fare/yellow_tripdata_2015-02.csv", header=True, inferSchema=True)
df15_3 = spark.read.csv("S:/Data/Us_taxi_fare/yellow_tripdata_2015-03.csv", header=True, inferSchema=True)
df16_1 = spark.read.csv("S:/Data/Us_taxi_fare/yellow_tripdata_2016-01.csv", header=True, inferSchema=True)   
df16_2 = spark.read.csv("S:/Data/Us_taxi_fare/yellow_tripdata_2016-02.csv", header=True, inferSchema=True)
df16_3 = spark.read.csv("S:/Data/Us_taxi_fare/yellow_tripdata_2016-03.csv", header=True, inferSchema=True)

In [5]:
df = df15_1.union(df15_2)  \
                     .union(df15_3) \
                     .union(df16_1) \
                     .union(df16_2) \
                     .union(df16_3)


In [6]:
df.show(4)

+--------+--------------------+---------------------+---------------+-------------+------------------+-----------------+----------+------------------+------------------+-----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|  pickup_latitude|RateCodeID|store_and_fwd_flag| dropoff_longitude| dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+-----------------+----------+------------------+------------------+-----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2| 2015-01-15 19:05:39|  2015-01-15 19:23:42|              1|         1.59|  -73.993896484375| 40.7501106262207|         1|           

In [7]:
df.describe()

DataFrame[summary: string, VendorID: string, passenger_count: string, trip_distance: string, pickup_longitude: string, pickup_latitude: string, RateCodeID: string, store_and_fwd_flag: string, dropoff_longitude: string, dropoff_latitude: string, payment_type: string, fare_amount: string, extra: string, mta_tax: string, tip_amount: string, tolls_amount: string, improvement_surcharge: string, total_amount: string]

In [8]:
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|pickup_longitude|pickup_latitude|RateCodeID|store_and_fwd_flag|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------------+---------------+----------+------------------+-----------------+----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       0|                   0|                    0|              0|            0|               0|              0|         0|                 0|              

In [9]:
df_count = df.groupBy("VendorID").agg(count("*").alias("Count"))
df_count.show()
df_count = df.groupBy("passenger_count").agg(count("*").alias("Count"))
df_count.show()
df_count = df.groupBy("RateCodeID").agg(count("*").alias("Count"))
df_count.show()
df_count = df.groupBy("store_and_fwd_flag").agg(count("*").alias("Count"))
df_count.show()
df_count = df.groupBy("payment_type").agg(count("*").alias("Count"))
df_count.show()

+--------+--------+
|VendorID|   Count|
+--------+--------+
|       1|34575927|
|       2|38475048|
+--------+--------+

+---------------+--------+
|passenger_count|   Count|
+---------------+--------+
|              1|51890118|
|              6| 2516993|
|              3| 2944683|
|              5| 3967367|
|              9|      87|
|              4| 1392887|
|              2|10312593|
|              0|   26011|
|              7|     125|
|              8|     111|
+---------------+--------+

+----------+--------+
|RateCodeID|   Count|
+----------+--------+
|         1|71296899|
|         6|     730|
|         3|  113597|
|         5|  220462|
|         4|   27213|
|         2| 1389960|
|        99|    2114|
+----------+--------+

+------------------+--------+
|store_and_fwd_flag|   Count|
+------------------+--------+
|                 Y|  535118|
|                 N|72515857|
+------------------+--------+

+------------+--------+
|payment_type|   Count|
+------------+--------+
|   

- Data that has a tiny amount has been renamed and grouped into a bigger group for better analysis and representation.


In [10]:

df = df.withColumn("payment_type", col("payment_type").cast("string"))
df = df.withColumn("passenger_count", col("passenger_count").cast("string"))

df = df.replace(["5", "4", "3"], ["other", "other", "other"], "payment_type")
df = df.replace("0", "1", "passenger_count")


- The data has been separated so it's easier to know the different months and days.


In [11]:
df = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime")))
df = df.withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime")))


df = df.withColumn("year", year(col("tpep_pickup_datetime")))
df = df.withColumn("month", month(col("tpep_pickup_datetime")))
df = df.withColumn("day", dayofmonth(col("tpep_pickup_datetime")))
df = df.withColumn("seconds_difference", F.unix_timestamp("tpep_dropoff_datetime") - F.unix_timestamp("tpep_pickup_datetime"))


In [12]:
df_count = df.groupBy("year").agg(count("*").alias("Count"))
df_count.show()
df_count = df.groupBy("month").agg(count("*").alias("Count"))
df_count.show()
df_count = df.groupBy("day").agg(count("*").alias("Count"))
df_count.show()

+----+--------+
|year|   Count|
+----+--------+
|2015|38551116|
|2016|34499859|
+----+--------+

+-----+--------+
|month|   Count|
+-----+--------+
|    1|23655844|
|    2|23832570|
|    3|25562561|
+-----+--------+

+---+-------+
|day|  Count|
+---+-------+
| 31|1705541|
| 28|2484395|
| 26|2298971|
| 27|2233707|
| 12|2565592|
| 22|2413852|
|  1|2249622|
| 13|2622211|
|  6|2470265|
| 16|2439664|
|  3|2328611|
| 20|2559295|
|  5|2393572|
| 19|2520545|
| 15|2464061|
| 17|2450408|
|  9|2379458|
|  4|2325266|
|  8|2381373|
| 23|2150286|
+---+-------+
only showing top 20 rows



In [13]:
df = df.drop("tpep_pickup_datetime","tpep_dropoff_datetime","extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge","total_amount")

## üìâ Statistical Analysis
- Calculating basic statistics for key metrics such as fare amounts and trip distances.


In [29]:
print("--------fare_amount---------")
stats = df.select(
    mean(col("fare_amount")).alias("mean"),
    stddev(col("fare_amount")).alias("stddev"),
    min(col("fare_amount")).alias("min"),
    max(col("fare_amount")).alias("max")
)
stats.show()

print("--------seconds_difference---------")
stats = df.select(
    mean(col("seconds_difference")).alias("mean"),
    stddev(col("seconds_difference")).alias("stddev"),
    min(col("seconds_difference")).alias("min"),
    max(col("seconds_difference")).alias("max")
)
stats.show()

print("--------trip_distance---------")
stats = df.select(
    mean(col("trip_distance")).alias("mean"),
    stddev(col("trip_distance")).alias("stddev"),
    min(col("trip_distance")).alias("min"),
    max(col("trip_distance")).alias("max")
)
stats.show()

--------fare_amount---------
+-----------------+------------------+---+------+
|             mean|            stddev|min|   max|
+-----------------+------------------+---+------+
|14.75088588513609|10.744981087843685|3.1|999.98|
+-----------------+------------------+---+------+

--------seconds_difference---------
+-----------------+-----------------+---+-----+
|             mean|           stddev|min|  max|
+-----------------+-----------------+---+-----+
|963.1136963720166|637.4226150348796| 61|28794|
+-----------------+-----------------+---+-----+

--------trip_distance---------
+------------------+------------------+----+-----+
|              mean|            stddev| min|  max|
+------------------+------------------+----+-----+
|3.6439254534677517|3.8568471274377227|1.01|828.1|
+------------------+------------------+----+-----+



In [17]:
df_filtered = df.filter(df["fare_amount"] > 1000)

# Count the number of rows in the filtered DataFrame
df_count = df_filtered.agg(count("*").alias("Count"))

# Show the result
df_count.show()

+-----+
|Count|
+-----+
|   89|
+-----+



In [27]:
df_filtered = df.filter(df["trip_distance"] > 1000)

# Count the number of rows in the filtered DataFrame
df_count = df_filtered.agg(count("*").alias("Count"))

# Show the result
df_count.show()

+-----+
|Count|
+-----+
|  162|
+-----+



## üîç Data Filtering
- Filtering data to remove unlikely and extreme values, improving the reliability of further analyses.


In [28]:
df = df.where((col("fare_amount") < 1000) & (col("fare_amount") > 3)) ## $3.00 initial charge for a taxi fare in the states
df = df.where((col("seconds_difference") > 60) & (col("seconds_difference") < 28800)) ## from 1 min up to 8 hours
df = df.where((col("trip_distance") > 1) & (col("trip_distance") < 1000)) ## from 1 distance up to 1000 hours
