In [1]:
# Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, date_format, monotonically_increasing_id, to_timestamp

import pandas as pd

In [2]:
# Creating spark session
spark = SparkSession.builder.appName("uber-data-analysis-4months").config("spark.driver.memory", "4g") .config("spark.executor.memory", "4g").getOrCreate()
# jdbc_driver_path = "E:\Mysql Connector\mysql-connector-j-8.4.0\mysql-connector-j-8.4.0"
# spark = SparkSession.builder.appName("uber-data-anlysis").config("spark.jars", jdbc_driver_path).getOrCreate()

In [3]:
# Specify the directory containing the Parquet files
data_directory = "../Data/"
# Load all Parquet files matching the pattern into a single DataFrame
df = spark.read.parquet(data_directory + "yellow_tripdata_202*.parquet")

# Show the DataFrame
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2023-11-01 00:03:03|  2023-11-01 01:04:08|              2|         13.6|         1|                 N|         132|          26|           2|       61.8| 2.75|    0.5|       0.

In [4]:
print('No. of rows: ', df.count())
print('No. of columns: ', len(df.columns))

No. of rows:  12688432
No. of columns:  19


In [5]:
# print the schema of the data
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



# Transformation Phase

### Data Cleaning

In [6]:
# Checking for missing values in the data
missing_values = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]) # This expression checks if each value in the column c is null. If it is, it casts the result to an integer (1 for null, 0 for not null).
missing_values.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|         638450|            0|    638450|            638450|           0|           0|           0|          0|    0|      0|         

**Observation**

It can be seen that the columns passenger_count, RatecodeID, store_and_fwd_flag, congestion_surcharge and Airport_fee contain missing values

In [7]:
# Calculating percentage of missing values
total_rows = df.count()
missing_values_percent = missing_values.select([(col(c) / total_rows * 100).alias(c) for c in df.columns])
missing_values_percent.show()

+--------+--------------------+---------------------+-----------------+-------------+-----------------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|  passenger_count|trip_distance|       RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|      Airport_fee|
+--------+--------------------+---------------------+-----------------+-------------+-----------------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------------+
|     0.0|                 0.0|                  0.0|5.031748603767589|          0.0|5.031748603767589| 5.031748603767589|         0.0|       

In [8]:
# Select columns of interest
missing_value_columns = df.filter(
    (col('Passenger_count').isNull()) |
    (col('RateCodeID').isNull()) | 
    (col('store_and_fwd_flag').isNull()) |
    (col('congestion_surcharge').isNull()) |
    (col('Airport_fee').isNull())
)

# show rows with missing values
missing_value_columns.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-12-01 00:04:36|  2023-12-01 00:32:12|           NULL|         4.98|      NULL|              NULL|         144|         143|           0|       28.2|  0.0|    0.5|       0.

In [9]:
unique_passenger_count = df.select('passenger_count').distinct()
print('Unique values in passenger_count: ')
unique_passenger_count.show()

unique_RatecodeID= df.select('RatecodeID').distinct()
print('Unique values in RatecodeID: ')
unique_RatecodeID.show()

unique_store_and_fwd_flag = df.select('store_and_fwd_flag').distinct()
print('Unique values in store_and_fwd_flag: ')
unique_store_and_fwd_flag.show()

unique_congestion_surcharge= df.select('congestion_surcharge').distinct()
print('Unique values in congestion_surcharge: ')
unique_congestion_surcharge.show()

unique_Airport_fee = df.select('Airport_fee').distinct()
print('Unique values in Airport_fee: ')
unique_Airport_fee.show()

Unique values in passenger_count: 
+---------------+
|passenger_count|
+---------------+
|              0|
|              7|
|              6|
|              5|
|              1|
|              3|
|              8|
|              2|
|              4|
|              9|
|           NULL|
+---------------+

Unique values in RatecodeID: 
+----------+
|RatecodeID|
+----------+
|         6|
|         5|
|         1|
|         3|
|         2|
|         4|
|        99|
|      NULL|
+----------+

Unique values in store_and_fwd_flag: 
+------------------+
|store_and_fwd_flag|
+------------------+
|                 Y|
|                 N|
|              NULL|
+------------------+

Unique values in congestion_surcharge: 
+--------------------+
|congestion_surcharge|
+--------------------+
|                 0.0|
|                 2.5|
|                -2.5|
|                 0.5|
|                0.75|
|                 1.0|
|                NULL|
|               -0.75|
+--------------------+

Uniq

For passenger count there we can drop the records where the values are nan.

For RateCodeID we only need to used valid rate code id's i.e from 1 to 6.

For none store and forward we will assume it to be N

For nan congestion surcharge we will assume it to be 0.

For nan airport fee we will assume it to be 0.

In [10]:
# Drop records where passenger_count is NaN
df = df.filter(col('passenger_count').isNotNull())

# Filter records where RateCodeID is between 1 and 6
df = df.filter((col('RateCodeID') >= 1) & (col('RateCodeID') <= 6))

# Replace None values in store_and_fwd_flag with 'N'
df = df.withColumn('store_and_fwd_flag', when(col('store_and_fwd_flag').isNull(), 'N').otherwise(col('store_and_fwd_flag')))

# Replace NaN values in congestion_surcharge with 0
df = df.fillna({'congestion_surcharge': 0})

# Replace NaN values in Airport_fee with 0
df = df.fillna({'Airport_fee': 0})

# Show the cleaned data
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2023-11-01 00:03:03|  2023-11-01 01:04:08|              2|         13.6|         1|                 N|         132|          26|           2|       61.8| 2.75|    0.5|       0.

In [11]:
unique_passenger_count = df.select('passenger_count').distinct()
print('Unique values in passenger_count: ')
unique_passenger_count.show()

unique_RatecodeID= df.select('RatecodeID').distinct()
print('Unique values in RatecodeID: ')
unique_RatecodeID.show()

unique_store_and_fwd_flag = df.select('store_and_fwd_flag').distinct()
print('Unique values in store_and_fwd_flag: ')
unique_store_and_fwd_flag.show()

unique_congestion_surcharge= df.select('congestion_surcharge').distinct()
print('Unique values in congestion_surcharge: ')
unique_congestion_surcharge.show()

unique_Airport_fee = df.select('Airport_fee').distinct()
print('Unique values in Airport_fee: ')
unique_Airport_fee.show()

Unique values in passenger_count: 
+---------------+
|passenger_count|
+---------------+
|              0|
|              7|
|              6|
|              5|
|              1|
|              3|
|              8|
|              2|
|              4|
|              9|
+---------------+

Unique values in RatecodeID: 
+----------+
|RatecodeID|
+----------+
|         6|
|         5|
|         1|
|         3|
|         2|
|         4|
+----------+

Unique values in store_and_fwd_flag: 
+------------------+
|store_and_fwd_flag|
+------------------+
|                 Y|
|                 N|
+------------------+

Unique values in congestion_surcharge: 
+--------------------+
|congestion_surcharge|
+--------------------+
|                 0.0|
|                 2.5|
|                -2.5|
|                 0.5|
|                0.75|
|                 1.0|
|               -0.75|
+--------------------+

Unique values in Airport_fee: 
+-----------+
|Airport_fee|
+-----------+
|        0.0|
|    

In [12]:
print('No. of rows: ', df.count())
print('No. of columns: ', len(df.columns))

No. of rows:  11946619
No. of columns:  19


In [13]:
# Group by all columns and count to identify duplicates
duplicate_counts = df.groupBy(df.columns).count().filter("count > 1")
duplicate_counts.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|count|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----+
|       2| 2023-11-13 13:26:12|  2023-11-13 14:48:41|              1|        18.87|         2|                 N|         230|         132|           1|       70.0|  0.0

In [14]:
# Drop duplicate rows based on all columns
df = df.dropDuplicates()

# Show the deduplicated DataFrame
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-11-01 00:10:43|  2023-11-01 00:36:39|              1|         5.45|         1|                 N|         137|          41|           2|       28.9|  1.0|    0.5|       0.

In [15]:
# Group by all columns and count to identify duplicates
duplicate_counts = df.groupBy(df.columns).count().filter("count > 1")
duplicate_counts.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|count|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----+
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----

**Observation**

No duplicate values present

### Data Transformation to store in dimension and fact tables

In [16]:
# dim_vendor
vendor_dim = df.select("VendorID").distinct().withColumnRenamed("VendorID", "vendor_id")\
    .withColumn("vendor_name", when(col("vendor_id") == 1, "Creative Mobile Technologies, LLC")
                             .when(col("vendor_id") == 2, "VeriFone Inc."))

In [17]:
vendor_dim.show()

+---------+--------------------+
|vendor_id|         vendor_name|
+---------+--------------------+
|        1|Creative Mobile T...|
|        2|       VeriFone Inc.|
+---------+--------------------+



In [18]:
# dim_rate_code
rate_code_dim = df.select("RateCodeID").distinct().withColumnRenamed("RateCodeID", "rate_code_id")\
    .withColumn("rate_code_description", 
                when(col("rate_code_id") == 1, "Standard rate")
                .when(col("rate_code_id") == 2, "JFK")
                .when(col("rate_code_id") == 3, "Newark")
                .when(col("rate_code_id") == 4, "Nassau or Westchester")
                .when(col("rate_code_id") == 5, "Negotiated fare")
                .when(col("rate_code_id") == 6, "Group ride"))

In [19]:
rate_code_dim.show()

+------------+---------------------+
|rate_code_id|rate_code_description|
+------------+---------------------+
|           6|           Group ride|
|           5|      Negotiated fare|
|           1|        Standard rate|
|           3|               Newark|
|           2|                  JFK|
|           4| Nassau or Westche...|
+------------+---------------------+



In [20]:
# dim_payment_type
payment_type_dim = df.select("payment_type").distinct().withColumnRenamed("payment_type", "payment_type_id")\
    .withColumn("payment_type_description", 
                when(col("payment_type_id") == 1, "Credit card")
                .when(col("payment_type_id") == 2, "Cash")
                .when(col("payment_type_id") == 3, "No charge")
                .when(col("payment_type_id") == 4, "Dispute")
                .when(col("payment_type_id") == 5, "Unknown")
                .when(col("payment_type_id") == 6, "Voieded Trip"))

In [21]:
payment_type_dim.show()

+---------------+------------------------+
|payment_type_id|payment_type_description|
+---------------+------------------------+
|              1|             Credit card|
|              3|               No charge|
|              2|                    Cash|
|              4|                 Dispute|
+---------------+------------------------+



In [22]:
# dim_datetime
datetime_dim = df.select(
    col("tpep_pickup_datetime").alias("pickup_datetime"),
    col("tpep_dropoff_datetime").alias("dropoff_datetime")
).distinct().withColumn("datetime_id", monotonically_increasing_id())\
    .withColumn("pick_hour", date_format(col("pickup_datetime"), "HH"))\
    .withColumn("pick_day", date_format(col("pickup_datetime"), "dd"))\
    .withColumn("pick_month", date_format(col("pickup_datetime"), "MM"))\
    .withColumn("pick_year", date_format(col("pickup_datetime"), "yyyy"))\
    .withColumn("pick_weekday", date_format(col("pickup_datetime"), "E"))\
    .withColumn("drop_hour", date_format(col("dropoff_datetime"), "HH"))\
    .withColumn("drop_day", date_format(col("dropoff_datetime"), "dd"))\
    .withColumn("drop_month", date_format(col("dropoff_datetime"), "MM"))\
    .withColumn("drop_year", date_format(col("dropoff_datetime"), "yyyy"))\
    .withColumn("drop_weekday", date_format(col("dropoff_datetime"), "E"))

In [23]:
print(datetime_dim.count())
datetime_dim.show()

11793193
+-------------------+-------------------+-----------+---------+--------+----------+---------+------------+---------+--------+----------+---------+------------+
|    pickup_datetime|   dropoff_datetime|datetime_id|pick_hour|pick_day|pick_month|pick_year|pick_weekday|drop_hour|drop_day|drop_month|drop_year|drop_weekday|
+-------------------+-------------------+-----------+---------+--------+----------+---------+------------+---------+--------+----------+---------+------------+
|2023-11-01 00:56:12|2023-11-01 01:21:38|          0|       00|      01|        11|     2023|         Wed|       01|      01|        11|     2023|         Wed|
|2023-11-01 00:35:59|2023-11-01 00:57:10|          1|       00|      01|        11|     2023|         Wed|       00|      01|        11|     2023|         Wed|
|2023-11-01 00:10:48|2023-11-01 00:14:44|          2|       00|      01|        11|     2023|         Wed|       00|      01|        11|     2023|         Wed|
|2023-11-01 00:02:44|2023-11-01

For pickup locaation and dropoff location dimension tables we will have to load the taxi zone csv data to get the zones data

In [24]:
taxi_zone_csv_path = "../Data/taxi_zone_lookup.csv"
taxi_zone_df = spark.read.csv(taxi_zone_csv_path, header=True, inferSchema=True)

In [25]:
taxi_zone_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [26]:
# Create dim_pickup_location table
pickup_location_dim = df.select(
    col("PULocationID").alias("pickup_location_id")
).distinct()

pickup_location_dim = pickup_location_dim.join(taxi_zone_df, pickup_location_dim.pickup_location_id == taxi_zone_df.LocationID, "left")\
    .select(
        col("pickup_location_id"),
        col("Borough").alias("pickup_borough"),
        col("Zone").alias("pickup_zone"),
        col("service_zone").alias("pickup_service_zone")
    ).distinct()

In [27]:
print(pickup_location_dim.count())
pickup_location_dim.show()

262
+------------------+--------------+--------------------+-------------------+
|pickup_location_id|pickup_borough|         pickup_zone|pickup_service_zone|
+------------------+--------------+--------------------+-------------------+
|               148|     Manhattan|     Lower East Side|        Yellow Zone|
|               243|     Manhattan|Washington Height...|          Boro Zone|
|                31|         Bronx|          Bronx Park|          Boro Zone|
|               137|     Manhattan|            Kips Bay|        Yellow Zone|
|                85|      Brooklyn|             Erasmus|          Boro Zone|
|               251| Staten Island|         Westerleigh|          Boro Zone|
|                65|      Brooklyn|Downtown Brooklyn...|          Boro Zone|
|               255|      Brooklyn|Williamsburg (Nor...|          Boro Zone|
|                53|        Queens|       College Point|          Boro Zone|
|               133|      Brooklyn|          Kensington|          Boro Z

In [28]:
# Create dim_dropoff_location table
dropoff_location_dim = df.select(
    col("DOLocationID").alias("dropoff_location_id")
).distinct()

In [29]:
dropoff_location_dim = dropoff_location_dim.join(taxi_zone_df, dropoff_location_dim.dropoff_location_id == taxi_zone_df.LocationID, "left")\
    .select(
        col("dropoff_location_id"),
        col("Borough").alias("dropoff_borough"),
        col("Zone").alias("dropoff_zone"),
        col("service_zone").alias("dropoff_service_zone")
    ).distinct()

In [30]:
print(dropoff_location_dim.count())
dropoff_location_dim.show()

261
+-------------------+---------------+--------------------+--------------------+
|dropoff_location_id|dropoff_borough|        dropoff_zone|dropoff_service_zone|
+-------------------+---------------+--------------------+--------------------+
|                148|      Manhattan|     Lower East Side|         Yellow Zone|
|                243|      Manhattan|Washington Height...|           Boro Zone|
|                 31|          Bronx|          Bronx Park|           Boro Zone|
|                137|      Manhattan|            Kips Bay|         Yellow Zone|
|                 85|       Brooklyn|             Erasmus|           Boro Zone|
|                251|  Staten Island|         Westerleigh|           Boro Zone|
|                 65|       Brooklyn|Downtown Brooklyn...|           Boro Zone|
|                255|       Brooklyn|Williamsburg (Nor...|           Boro Zone|
|                 53|         Queens|       College Point|           Boro Zone|
|                133|       Brooklyn

In [31]:
# creating combined data consisting of original dataframe and dimension tables data
combined_data_df = df.join(pickup_location_dim, df.PULocationID == pickup_location_dim.pickup_location_id, "left") \
       .join(dropoff_location_dim, df.DOLocationID == dropoff_location_dim.dropoff_location_id, "left") \
       .join(datetime_dim, 
             (df.tpep_pickup_datetime == datetime_dim.pickup_datetime) & 
             (df.tpep_dropoff_datetime == datetime_dim.dropoff_datetime), "left")

In [32]:
combined_data_df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+--------------+-------------+-------------------+-------------------+---------------+--------------------+--------------------+-------------------+-------------------+-----------+---------+--------+----------+---------+------------+---------+--------+----------+---------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_location_id|pickup_borough|  pickup_zone|pickup_service_zone|dropoff_location_id|dropoff_borough|        dropoff_zone|dropoff_service_zone|    picku

In [33]:
# Create fact_trips
fact_trips = combined_data_df.select(
    col("VendorID").alias("vendor_id"),
    col("RateCodeID").alias("rate_code_id"),
    col("payment_type").alias("payment_type_id"),
    col("pickup_location_id"),
    col("dropoff_location_id"),
    col("datetime_id"),
    col("Passenger_count").alias("passenger_count"),
    col("Trip_distance").alias("trip_distance"),
    col("Fare_amount").alias("fare_amount"),
    col("Extra").alias("extra"),
    col("MTA_tax").alias("mta_tax"),
    col("Tip_amount").alias("tip_amount"),
    col("Tolls_amount").alias("tolls_amount"),
    col("Total_amount").alias("total_amount"),
    col("congestion_surcharge").alias("congestion_surcharge"),
    col("Airport_fee").alias("airport_fee")
).withColumn("trip_id", monotonically_increasing_id())

In [34]:
print(fact_trips.count())
fact_trips.show()

11946616
+---------+------------+---------------+------------------+-------------------+-----------+---------------+-------------+-----------+-----+-------+----------+------------+------------+--------------------+-----------+-------+
|vendor_id|rate_code_id|payment_type_id|pickup_location_id|dropoff_location_id|datetime_id|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|congestion_surcharge|airport_fee|trip_id|
+---------+------------+---------------+------------------+-------------------+-----------+---------------+-------------+-----------+-----+-------+----------+------------+------------+--------------------+-----------+-------+
|        2|           2|              1|               132|                 13|     683418|              1|        28.19|       70.0|  0.0|    0.5|     16.54|        6.94|       99.23|                 2.5|       1.75|      0|
|        2|           1|              1|                90|                231|      31

# Load data in datawarehouse

In [35]:
# Specify the output directory
output_directory = "../Processed_Files/4months/"

# Coalesce to a single partition
vendor_dim_coalesced = vendor_dim.coalesce(1)
# Save as a single CSV file
vendor_dim_coalesced.write.csv(output_directory + "vendor_dim_coalesced.csv", header=True)
print("Saved vendor_dim_coalesced csv file.")

# Coalesce to a single partition
rate_code_dim_coalesced = rate_code_dim.coalesce(1)
# Save as a single CSV file
rate_code_dim_coalesced.write.csv(output_directory + "rate_code_dim_coalesced.csv", header=True)
print("Saved rate_code_dim_coalesced csv file.")

# Coalesce to a single partition
payment_type_dim_coalesced = payment_type_dim.coalesce(1)
# Save as a single CSV file
payment_type_dim_coalesced.write.csv(output_directory + "payment_type_dim_coalesced.csv", header=True)
print("Saved payment_type_dim_coalesced csv file.")

# Coalesce to a single partition
datetime_dim_coalesced = datetime_dim.coalesce(1)
# Save as a single CSV file
datetime_dim_coalesced.write.csv(output_directory + "datetime_dim_coalesced.csv", header=True)
print("Saved datetime_dim_coalesced csv file.")

# Coalesce to a single partition
pickup_location_dim_coalesced = pickup_location_dim.coalesce(1)
# Save as a single CSV file
pickup_location_dim_coalesced.write.csv(output_directory + "pickup_location_dim_coalesced.csv", header=True)
print("Saved pickup_location_dim_coalesced csv file.")

# Coalesce to a single partition
dropoff_location_dim_coalesced = dropoff_location_dim.coalesce(1)
# Save as a single CSV file
dropoff_location_dim_coalesced.write.csv(output_directory + "dropoff_location_dim_coalesced.csv", header=True)
print("Saved dropoff_location_dim_coalesced csv file.")

# Coalesce to a single partition
fact_trips_coalesced = fact_trips.coalesce(1)
# Save as a single CSV file
fact_trips_coalesced.write.csv(output_directory + "fact_trips_coalesced.csv", header=True)
print("Saved fact_trips_coalesced csv file.")

Saved vendor_dim_coalesced csv file.
Saved rate_code_dim_coalesced csv file.
Saved payment_type_dim_coalesced csv file.
Saved datetime_dim_coalesced csv file.
Saved pickup_location_dim_coalesced csv file.
Saved dropoff_location_dim_coalesced csv file.
Saved fact_trips_coalesced csv file.


In [36]:
# Create the analytical table
analytical_table = fact_trips.join(vendor_dim, "vendor_id", "left") \
    .join(rate_code_dim, "rate_code_id", "left") \
    .join(payment_type_dim, "payment_type_id", "left") \
    .join(pickup_location_dim, "pickup_location_id", "left") \
    .join(dropoff_location_dim, "dropoff_location_id", "left") \
    .join(datetime_dim, "datetime_id", "left") \
    .select(
        "trip_id",
        "vendor_id",
        "vendor_name",
        "rate_code_id",
        "rate_code_description",
        "payment_type_id",
        "payment_type_description",
        "pickup_location_id",
        "pickup_borough",
        "pickup_zone",
        "pickup_service_zone",
        "dropoff_location_id",
        "dropoff_borough",
        "dropoff_zone",
        "dropoff_service_zone",
        "datetime_id",
        "pickup_datetime",
        "dropoff_datetime",
        "pick_hour",
        "pick_day",
        "pick_month",
        "pick_year",
        "pick_weekday",
        "drop_hour",
        "drop_day",
        "drop_month",
        "drop_year",
        "drop_weekday",
        "passenger_count",
        "trip_distance",
        "fare_amount",
        "extra",
        "mta_tax",
        "tip_amount",
        "tolls_amount",
        "total_amount",
        "congestion_surcharge",
        "airport_fee"
    )

In [37]:
analytical_table.show()

+-----------+---------+--------------------+------------+---------------------+---------------+------------------------+------------------+--------------+-------------+-------------------+-------------------+---------------+--------------------+--------------------+-----------+-------------------+-------------------+---------+--------+----------+---------+------------+---------+--------+----------+---------+------------+---------------+-------------+-----------+-----+-------+----------+------------+------------+--------------------+-----------+
|    trip_id|vendor_id|         vendor_name|rate_code_id|rate_code_description|payment_type_id|payment_type_description|pickup_location_id|pickup_borough|  pickup_zone|pickup_service_zone|dropoff_location_id|dropoff_borough|        dropoff_zone|dropoff_service_zone|datetime_id|    pickup_datetime|   dropoff_datetime|pick_hour|pick_day|pick_month|pick_year|pick_weekday|drop_hour|drop_day|drop_month|drop_year|drop_weekday|passenger_count|trip_dist

In [38]:
# Coalesce to a single partition
analytical_table_coalesced = analytical_table.coalesce(1)
# Save as a single CSV file
analytical_table_coalesced.write.csv(output_directory + "analytical_table_coalesced.csv", header=True)
print("Saved analytical_table_coalesced csv file.")

Saved analytical_table_coalesced csv file.


In [39]:
spark.stop()