In [1]:
#!/usr/bin/env python3

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, row_number, monotonically_increasing_id, col
from pyspark.sql.window import Window
import psycopg2

In [2]:
# Set up a Spark session

spark = SparkSession.builder \
    .appName("Yellow Taxi Trip Data Transformation") \
    .config("spark.jars", "postgresql-42.7.3.jar") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()


In [None]:
file_path = "yellow_tripdata_2009-01DATASET (1).parquet"
df = spark.read.option("mode", "DROPMALFORMED").parquet(file_path)
 

In [4]:
df.show(5)

+-----------+--------------------+---------------------+---------------+-------------+----------+---------+---------+-----------------+------------------+---------+------------+--------+---------+-------+-------+---------+---------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|Trip_Distance| Start_Lon|Start_Lat|Rate_Code|store_and_forward|           End_Lon|  End_Lat|Payment_Type|Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|
+-----------+--------------------+---------------------+---------------+-------------+----------+---------+---------+-----------------+------------------+---------+------------+--------+---------+-------+-------+---------+---------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|         2.63|-73.991957|40.721567|     NULL|             NULL|        -73.993803|40.695922|        CASH|     8.9|      0.5|   NULL|    0.0|      0.0|      9.4|
|        VTS| 2009-01-04 03:31:00|  2009-01-04 03:38:00|            

In [5]:
df.printSchema()

root
 |-- vendor_name: string (nullable = true)
 |-- Trip_Pickup_DateTime: string (nullable = true)
 |-- Trip_Dropoff_DateTime: string (nullable = true)
 |-- Passenger_Count: long (nullable = true)
 |-- Trip_Distance: double (nullable = true)
 |-- Start_Lon: double (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Rate_Code: double (nullable = true)
 |-- store_and_forward: double (nullable = true)
 |-- End_Lon: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- Payment_Type: string (nullable = true)
 |-- Fare_Amt: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- Tip_Amt: double (nullable = true)
 |-- Tolls_Amt: double (nullable = true)
 |-- Total_Amt: double (nullable = true)



In [6]:
df_subset = df.limit(10000)

In [7]:
df_subset.count()

10000

In [9]:
df_subset.show()

+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|     Trip_Distance|         Start_Lon|Start_Lat|Rate_Code|store_and_forward|           End_Lon|  End_Lat|Payment_Type|         Fare_Amt|surcharge|mta_tax|Tip_Amt|Tolls_Amt|Total_Amt|
+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+---------+-----------------+------------------+---------+------------+-----------------+---------+-------+-------+---------+---------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|              2.63|        -73.991957|40.721567|     NULL|             NULL|        -73.993803|40.695922|        CASH|              8.9|      0.5|   NULL|    0.0| 

In [10]:
# Find the missing values in the dataset

null_counts = [(column, df_subset.where(df_subset[column].isNull()).count()) for column in df_subset.columns]


In [11]:
null_counts

[('vendor_name', 0),
 ('Trip_Pickup_DateTime', 0),
 ('Trip_Dropoff_DateTime', 0),
 ('Passenger_Count', 0),
 ('Trip_Distance', 0),
 ('Start_Lon', 0),
 ('Start_Lat', 0),
 ('Rate_Code', 10000),
 ('store_and_forward', 10000),
 ('End_Lon', 0),
 ('End_Lat', 0),
 ('Payment_Type', 0),
 ('Fare_Amt', 0),
 ('surcharge', 0),
 ('mta_tax', 10000),
 ('Tip_Amt', 0),
 ('Tolls_Amt', 0),
 ('Total_Amt', 0)]

In [12]:
columns_to_drops = [column for column, count in null_counts if count > 0.1 * df_subset.count()]


In [13]:
columns_to_drops

['Rate_Code', 'store_and_forward', 'mta_tax']

In [14]:
# Drop columns with more than 10% missing values
df_subset = df_subset.drop(*columns_to_drops)


In [15]:
df_subset.columns

['vendor_name',
 'Trip_Pickup_DateTime',
 'Trip_Dropoff_DateTime',
 'Passenger_Count',
 'Trip_Distance',
 'Start_Lon',
 'Start_Lat',
 'End_Lon',
 'End_Lat',
 'Payment_Type',
 'Fare_Amt',
 'surcharge',
 'Tip_Amt',
 'Tolls_Amt',
 'Total_Amt']

In [16]:
null_counts = [(column, df_subset.where(df_subset[column].isNull()).count()) for column in df_subset.columns]


In [17]:
null_counts

[('vendor_name', 0),
 ('Trip_Pickup_DateTime', 0),
 ('Trip_Dropoff_DateTime', 0),
 ('Passenger_Count', 0),
 ('Trip_Distance', 0),
 ('Start_Lon', 0),
 ('Start_Lat', 0),
 ('End_Lon', 0),
 ('End_Lat', 0),
 ('Payment_Type', 0),
 ('Fare_Amt', 0),
 ('surcharge', 0),
 ('Tip_Amt', 0),
 ('Tolls_Amt', 0),
 ('Total_Amt', 0)]

In [18]:
df_subset.show()

+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+------------------+---------+------------+-----------------+---------+-------+---------+---------+
|vendor_name|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Passenger_Count|     Trip_Distance|         Start_Lon|Start_Lat|           End_Lon|  End_Lat|Payment_Type|         Fare_Amt|surcharge|Tip_Amt|Tolls_Amt|Total_Amt|
+-----------+--------------------+---------------------+---------------+------------------+------------------+---------+------------------+---------+------------+-----------------+---------+-------+---------+---------+
|        VTS| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              1|              2.63|        -73.991957|40.721567|        -73.993803|40.695922|        CASH|              8.9|      0.5|    0.0|      0.0|      9.4|
|        VTS| 2009-01-04 03:31:00|  2009-01-04 03:38:00|              3|              4.55|        -73.982102| 40.73629|    

In [19]:
# Data transformation 
df_subset = df_subset.filter(
    (df_subset["Passenger_Count"] > 0.0) &
    (df_subset["Trip_Distance"] > 0.0) &
    (df_subset["Fare_Amt"] > 0.0) &
    (df_subset["Total_Amt"] > 0.0) & 
    (df_subset["Tip_Amt"] >= 0.0) &
    (df_subset["Tolls_Amt"] >= 0.0) &
    (df_subset["surcharge"] >= 0.0)
)

In [21]:
df_subset.count()

9938

In [22]:
df_subset.dtypes

[('vendor_name', 'string'),
 ('Trip_Pickup_DateTime', 'string'),
 ('Trip_Dropoff_DateTime', 'string'),
 ('Passenger_Count', 'bigint'),
 ('Trip_Distance', 'double'),
 ('Start_Lon', 'double'),
 ('Start_Lat', 'double'),
 ('End_Lon', 'double'),
 ('End_Lat', 'double'),
 ('Payment_Type', 'string'),
 ('Fare_Amt', 'double'),
 ('surcharge', 'double'),
 ('Tip_Amt', 'double'),
 ('Tolls_Amt', 'double'),
 ('Total_Amt', 'double')]

In [23]:
# Convert the columns to the correct data types
columns_to_cast = {
    "Trip_Pickup_DateTime": "timestamp",
    "Trip_Dropoff_DateTime": "timestamp",
    "Passenger_Count": "integer",
}

In [24]:
for col_name, col_type in columns_to_cast.items():
    df_subset = df_subset.withColumn(col_name, df_subset[col_name].cast(col_type))


In [25]:
df_subset.dtypes

[('vendor_name', 'string'),
 ('Trip_Pickup_DateTime', 'timestamp'),
 ('Trip_Dropoff_DateTime', 'timestamp'),
 ('Passenger_Count', 'int'),
 ('Trip_Distance', 'double'),
 ('Start_Lon', 'double'),
 ('Start_Lat', 'double'),
 ('End_Lon', 'double'),
 ('End_Lat', 'double'),
 ('Payment_Type', 'string'),
 ('Fare_Amt', 'double'),
 ('surcharge', 'double'),
 ('Tip_Amt', 'double'),
 ('Tolls_Amt', 'double'),
 ('Total_Amt', 'double')]

In [30]:
categorical_columns = [item[0] for item in df_subset.dtypes if item[1].startswith('string')]

In [31]:
categorical_columns

['vendor_name', 'Payment_Type']

In [32]:
df_subset.select("vendor_name").distinct().show()

+-----------+
|vendor_name|
+-----------+
|        VTS|
|        DDS|
|        CMT|
+-----------+



In [33]:
df_subset.select("Payment_Type").distinct().show()

+------------+
|Payment_Type|
+------------+
|        CASH|
|      Credit|
|      CREDIT|
|        Cash|
|   No Charge|
|     Dispute|
+------------+



In [34]:
df_subset = df_subset.withColumn("Payment_Type", 
                                when(col("Payment_Type") == "CASH", "Cash")
                                .when(col("Payment_Type") == "CREDIT", "Credit")
                                .otherwise(col("Payment_Type")))



In [35]:
df_subset.select("Payment_Type").distinct().show()

+------------+
|Payment_Type|
+------------+
|        Cash|
|      Credit|
|   No Charge|
|     Dispute|
+------------+



In [36]:
#NORMALIZATION#
#There are four entities in the dataset 
#Vendor
#Payment
#Location
#Trips

vendors = df_subset.select("vendor_name").distinct() \
                    .withColumn("vendor_id", monotonically_increasing_id()) \
                    .select("vendor_id", "vendor_name")


In [38]:
locations = df_subset.select("Start_Lon", "Start_Lat", "End_Lon", "End_Lat") \
                    .withColumn("location_id", monotonically_increasing_id()) \
                    .select("location_id", "Start_Lon", "Start_Lat", "End_Lon", "End_Lat")

In [39]:
payments = df_subset.select("Trip_Pickup_DateTime", "Payment_Type", "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt" ) \
                    .withColumn("payment_id", monotonically_increasing_id()) \
                    .select("payment_id", "Payment_Type", "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt")


In [40]:
trips = df_subset.select("Trip_Pickup_DateTime", "Trip_Dropoff_DateTime", "Trip_Distance", "Passenger_Count",
                         "vendor_name", "Payment_Type", "Start_Lon") \
                         .withColumn("trip_id", monotonically_increasing_id()) \
                         .join(vendors, "vendor_name", "right") \
                         .select("trip_id", "vendor_id", "Trip_Pickup_DateTime", "Trip_Dropoff_DateTime", 
                                 "Trip_Distance", "Passenger_Count", "Payment_Type", "Start_Lon")


In [41]:
trips.show()

+-------+---------+--------------------+---------------------+-------------+---------------+------------+------------------+
|trip_id|vendor_id|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|Trip_Distance|Passenger_Count|Payment_Type|         Start_Lon|
+-------+---------+--------------------+---------------------+-------------+---------------+------------+------------------+
|   9937|        0| 2009-01-03 21:19:00|  2009-01-03 21:29:00|         2.29|              2|        Cash|-73.98261499999998|
|   9936|        0| 2009-01-07 19:56:00|  2009-01-07 20:04:00|         3.02|              1|      Credit|        -73.984955|
|   9935|        0| 2009-01-03 09:54:00|  2009-01-03 10:17:00|         9.06|              2|        Cash|        -73.873022|
|   9934|        0| 2009-01-03 19:04:00|  2009-01-03 19:19:00|         2.64|              5|        Cash|-73.96221199999998|
|   9933|        0| 2009-01-03 21:33:00|  2009-01-03 21:39:00|          1.1|              1|        Cash|        -73.997167|


In [42]:
#the window function is to et a unique id for our trips table
windowSpec = Window.orderBy("trip_id")
trips = trips.withColumn("row_num", row_number().over(windowSpec))


In [43]:
trips_payments = trips.join(payments, trips.row_num == payments.payment_id, "inner") 


In [44]:
trips_payments_locations = trips_payments.join(locations, trips_payments.row_num == locations.location_id, "inner") 


In [45]:
trips_payments_locations.show()

+-------+---------+--------------------+---------------------+------------------+---------------+------------+------------------+-------+----------+------------+-----------------+---------+-------+---------+---------+-----------+------------------+---------+------------------+---------+
|trip_id|vendor_id|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|     Trip_Distance|Passenger_Count|Payment_Type|         Start_Lon|row_num|payment_id|Payment_Type|         Fare_Amt|surcharge|Tip_Amt|Tolls_Amt|Total_Amt|location_id|         Start_Lon|Start_Lat|           End_Lon|  End_Lat|
+-------+---------+--------------------+---------------------+------------------+---------------+------------+------------------+-------+----------+------------+-----------------+---------+-------+---------+---------+-----------+------------------+---------+------------------+---------+
|      0|        0| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              2.63|              1|        Cash|        -73.991957|      1

In [46]:
trips_finals = trips_payments_locations.drop("row_num", "vendor_name", "Start_Lon", 
                                             "End_Lon", "Start_Lat", "End_Lat", "Payment_Type",
                                             "Fare_Amt", "surcharge", "Tip_Amt", "Tolls_Amt", "Total_Amt")


In [50]:
trips_finals.show()

+-------+---------+--------------------+---------------------+------------------+---------------+----------+-----------+
|trip_id|vendor_id|Trip_Pickup_DateTime|Trip_Dropoff_DateTime|     Trip_Distance|Passenger_Count|payment_id|location_id|
+-------+---------+--------------------+---------------------+------------------+---------------+----------+-----------+
|      0|        0| 2009-01-04 02:52:00|  2009-01-04 03:02:00|              2.63|              1|         1|          1|
|      1|        0| 2009-01-04 03:31:00|  2009-01-04 03:38:00|              4.55|              3|         2|          2|
|      2|        0| 2009-01-03 15:43:00|  2009-01-03 15:57:00|             10.35|              5|         3|          3|
|      3|        1| 2009-01-01 20:52:58|  2009-01-01 21:14:00|               5.0|              1|         4|          4|
|      4|        1| 2009-01-24 16:18:23|  2009-01-24 16:24:56|               0.4|              1|         5|          5|
|      5|        1| 2009-01-16 2

In [None]:
#we have all the tables we need
#we need to save the data into a database

In [77]:
conn = psycopg2.connect(
    host="localhost",
    database="taxi_data",
    user="postgres",
    password="eyramSQL"
)

cur = conn.cursor()


In [78]:
cur.execute(
    """
    CREATE TABLE vendors (
        vendor_id INT PRIMARY KEY,
        vendor_name VARCHAR(50)
    );

    CREATE TABLE locations (
        location_id INT PRIMARY KEY,
        start_lon FLOAT,
        start_lat FLOAT,
        end_lon FLOAT,
        end_lat FLOAT
    );

    CREATE TABLE payments (
        payment_id INT PRIMARY KEY,
        payment_type VARCHAR(50),
        fare_amt FLOAT,
        surcharge FLOAT,
        tip_amt FLOAT,
        tolls_amt FLOAT,
        total_amt FLOAT
    );

     CREATE TABLE trips (
        trip_id INT PRIMARY KEY,
        trip_pickup_datetime TIMESTAMP,
        trip_dropoff_datetime TIMESTAMP,
        passenger_count INT,
        trip_distance FLOAT,
        vendor_id INT,
        location_id INT,
        payment_id INT,

        FOREIGN KEY (vendor_id) REFERENCES vendors(vendor_id),
        FOREIGN KEY (location_id) REFERENCES locations(location_id),
        FOREIGN KEY (payment_id) REFERENCES payments(payment_id)
    )
"""
)

conn.commit()
conn.close()


In [89]:
# Save the data into the database
jdbc_url = "jdbc:postgresql://localhost/taxi_data"
jdbc_properties = {"user": "postgres", "postgres": "eyramSQL", "driver": "org.postgresql.Driver"}


In [90]:
dataframes_to_save = [
    (vendors, "vendors"),
    (locations, "locations"),
    (payments, "payments"),
    (trips_finals, "trips")
    ]

In [85]:
!pip install pandas



In [91]:
vendors_df = vendors.toPandas()
locations_df =locations.toPandas()
payments_df = payments.toPandas()
trips_df = trips_finals.toPandas()


In [94]:
from sqlalchemy import create_engine
connection_string = f'postgresql://postgres:postgres@localhost/taxi_data'
engine = create_engine(connection_string)


In [95]:
trips_df.to_sql('trips', engine, if_exists='replace', index=False)
vendors_df.to_sql('vendors', engine, if_exists='replace', index=False)
payments_df.to_sql('payments', engine, if_exists='replace', index=False)
locations_df.to_sql('locations', engine, if_exists='replace', index=False)



938

In [1]:
!jupyter nbconvert --to script extract_data.ipynb

[NbConvertApp] Converting notebook extract_data.ipynb to script
[NbConvertApp] Writing 7517 bytes to extract_data.py
