In [0]:
from pyspark.sql import SparkSession
import uuid
from pyspark.sql.functions import udf, lit, monotonically_increasing_id
from pyspark.sql.types import StringType
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second
from pyspark.sql.functions import format_string, lpad
from pyspark.sql import functions as F

Importing the files and creating the first DataFrame

In [0]:
df_main = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/cab_data.csv")
df_main.show(5)
df_main.count()

+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude|  dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2016-03-01 00:00:00|  2016-03-01 00:07:55|              1|          2.5|-73.97674560546875|40.765151977539055|         1|    

Dropping the Duplicates

In [0]:
df_unique = df_main.drop_duplicates()

df_unique = df_unique.withColumn("trip_id", monotonically_increasing_id())
# Reorder columns to make '_id' the first column
df_unique = df_unique.select('trip_id', *[col for col in df_unique.columns if col != 'trip_id'])

df_unique.show(5)
df_unique.count()

+-------+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+-----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|trip_id|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|  pickup_longitude|   pickup_latitude|RatecodeID|store_and_fwd_flag| dropoff_longitude| dropoff_latitude|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+-------+--------+--------------------+---------------------+---------------+-------------+------------------+------------------+----------+------------------+------------------+-----------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|      0|       2| 2016-03-10 07:12:57|  2016-03-10 07:37:33|              4|         5.17|-73.99210357666014| 40.6

Creating the Passanger Data Frame(RDD)

In [0]:
df_Passenger = df_unique.select('trip_id')
df_Passenger = df_Passenger.withColumnRenamed("trip_id", "passenger_id")
df_Passenger = df_Passenger.join(df_unique, df_unique["trip_id"] == df_Passenger["passenger_id"], "inner")

df_Passenger = df_Passenger.select('passenger_id', 'passenger_count')
df_Passenger.show(5)
df_Passenger.count()

+------------+---------------+
|passenger_id|passenger_count|
+------------+---------------+
|           0|              4|
|           1|              2|
|           2|              3|
|           3|              1|
|           4|              1|
+------------+---------------+
only showing top 5 rows

Out[4]: 100000

Creating the tripDistance Dataframe (RDD)

In [0]:
df_tripDistance = df_unique.select('trip_id')
df_tripDistance = df_tripDistance.withColumnRenamed("trip_id", "trip_distance_id")
df_tripDistance = df_tripDistance.join(df_unique, df_unique["trip_id"] == df_tripDistance["trip_distance_id"], "inner")

df_tripDistance = df_tripDistance.select('trip_distance_id', 'trip_distance')

df_tripDistance.show(5)
df_tripDistance.count()

+----------------+-------------+
|trip_distance_id|trip_distance|
+----------------+-------------+
|               0|         5.17|
|               1|         5.24|
|               2|         1.67|
|               3|          1.3|
|               4|         3.41|
+----------------+-------------+
only showing top 5 rows

Out[5]: 100000

Creating the rateCode dimension table.

In [0]:
# making a hash map for O(1) search and insert
rate_code_type = {
    1: "Standard Rate",
    2: "JFK",
    3: "Newark",
    4: "Nassau or Westchester",
    5: "Negotiated fare",
    6: "Group ride"
}

# making a user defined functions for getting the values from the above hashmap
def map_ratecode(ratecode):
    return rate_code_type.get(int(ratecode))

# defining the user defined function
map_ratecode_udf = udf(map_ratecode, StringType())

# creating the dataframe
dfRateCode = df_unique.select('trip_id')
dfRateCode = dfRateCode.withColumnRenamed("trip_id", "rateCode_id")
dfRateCode = dfRateCode.join(df_unique, df_unique["trip_id"] == dfRateCode["rateCode_id"], "inner")

# Create new column 'rateCodeName'
dfRateCode = dfRateCode.withColumn("rateCodeName", map_ratecode_udf(dfRateCode["RatecodeID"]))
# Reorder columns to make '_id' the first column
dfRateCode = dfRateCode.select('rateCode_id', *[col for col in dfRateCode.columns if col != 'rateCode_id'])

dfRateCode = dfRateCode.select('rateCode_id', 'RatecodeID', 'rateCodeName')

dfRateCode.show(5)
dfRateCode.count()

+-----------+----------+-------------+
|rateCode_id|RatecodeID| rateCodeName|
+-----------+----------+-------------+
|          0|         1|Standard Rate|
|          1|         1|Standard Rate|
|          2|         1|Standard Rate|
|          3|         1|Standard Rate|
|          4|         1|Standard Rate|
+-----------+----------+-------------+
only showing top 5 rows

Out[6]: 100000

Creating the paymentMode dimension table

In [0]:
payment_type_name = {
    1: "Credit card",
    2: "Cash",
    3: "No charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided trip"
}

def map_payment(paymentCode):
    return payment_type_name.get(int(paymentCode))

map_payment_udf = udf(map_payment, StringType())

dfPayment = df_unique.select('trip_id')
dfPayment = dfPayment.withColumnRenamed("trip_id", "payment_id")
dfPayment = dfPayment.join(df_unique, df_unique["trip_id"] == dfPayment["payment_id"], "inner")
dfPayment = dfPayment.withColumn("paymentCodeName", map_payment_udf(dfPayment['payment_type']))

dfPayment = dfPayment.withColumn("paymentCodeName", map_ratecode_udf(dfPayment["payment_type"]))
# Reorder columns to make '_id' the first column
dfPayment = dfPayment.select('payment_id', *[col for col in dfPayment.columns if col != 'payment_id'])

dfPayment = dfPayment.select('payment_id', 'payment_type', 'paymentCodeName')

dfPayment.show(5)
dfPayment.count()

+----------+------------+---------------+
|payment_id|payment_type|paymentCodeName|
+----------+------------+---------------+
|         0|           1|  Standard Rate|
|         1|           1|  Standard Rate|
|         2|           1|  Standard Rate|
|         3|           2|            JFK|
|         4|           1|  Standard Rate|
+----------+------------+---------------+
only showing top 5 rows

Out[7]: 100000

Creating the pickUp Dimension Table

In [0]:
dfPickupDim = df_unique.select('trip_id')
dfPickupDim = dfPickupDim.withColumnRenamed("trip_id", "pickup_id")
dfPickupDim = dfPickupDim.join(df_unique, df_unique["trip_id"] == dfPickupDim["pickup_id"], "inner")

dfPickupDim = dfPickupDim.select('pickup_id', 'pickup_longitude', 'pickup_latitude')

dfPickupDim.show(5)
dfPickupDim.count()

+---------+------------------+------------------+
|pickup_id|  pickup_longitude|   pickup_latitude|
+---------+------------------+------------------+
|        0|-73.99210357666014| 40.68429565429688|
|        1|-73.91310119628906|40.746089935302734|
|        2|-73.96404266357422| 40.76549530029297|
|        3|-73.97695922851561|40.775028228759766|
|        4|-73.96170806884764| 40.75973892211913|
+---------+------------------+------------------+
only showing top 5 rows

Out[8]: 100000

Creating DropOff Dimesion Table

In [0]:
dfDropOffDim = df_unique.select('trip_id')
dfDropOffDim = dfDropOffDim.withColumnRenamed("trip_id", "dropoff_id")
dfDropOffDim = dfDropOffDim.join(df_unique, df_unique["trip_id"] == dfDropOffDim["dropoff_id"], "inner")

dfDropOffDim = dfDropOffDim.select('dropoff_id','dropoff_longitude','dropoff_latitude')

dfDropOffDim.show(5)
dfDropOffDim.count()

+----------+------------------+-----------------+
|dropoff_id| dropoff_longitude| dropoff_latitude|
+----------+------------------+-----------------+
|         0| -74.0073013305664|40.74828720092773|
|         1|-73.98751068115233|    40.7529296875|
|         2|-73.95700073242188|40.76502990722656|
|         3| -73.9722900390625|40.76259994506836|
|         4|-73.98794555664062|40.74559783935546|
+----------+------------------+-----------------+
only showing top 5 rows

Out[9]: 100000

Creating DateTime Dimension Table

In [0]:
dfDateTimeDimension = df_unique.select('trip_id')
dfDateTimeDimension = dfDateTimeDimension.withColumnRenamed("trip_id", "dateTime_id")
dfDateTimeDimension = dfDateTimeDimension.join(df_unique, df_unique["trip_id"] == dfDateTimeDimension["dateTime_id"], "inner")
dfDateTimeDimension = dfDateTimeDimension.select('dateTime_id', 'tpep_pickup_datetime', 'tpep_dropoff_datetime')

dfDateTimeDimension = dfDateTimeDimension.withColumn("pickup_year", year("tpep_pickup_datetime"))
dfDateTimeDimension = dfDateTimeDimension.withColumn("pickup_month", month("tpep_pickup_datetime"))
dfDateTimeDimension = dfDateTimeDimension.withColumn("pickup_day", dayofmonth("tpep_pickup_datetime"))
dfDateTimeDimension = dfDateTimeDimension.withColumn("pickup_hour",format_string("%02d", hour("tpep_pickup_datetime")))
dfDateTimeDimension = dfDateTimeDimension.withColumn("pickup_minute", format_string("%02d", minute("tpep_pickup_datetime")))
dfDateTimeDimension = dfDateTimeDimension.withColumn("pickup_second", format_string("%02d", second("tpep_pickup_datetime")))

dfDateTimeDimension = dfDateTimeDimension.withColumn("dropoff_year", year("tpep_dropoff_datetime"))
dfDateTimeDimension = dfDateTimeDimension.withColumn("dropoff_month", month("tpep_dropoff_datetime"))
dfDateTimeDimension = dfDateTimeDimension.withColumn("dropoff_day", dayofmonth("tpep_dropoff_datetime"))
dfDateTimeDimension = dfDateTimeDimension.withColumn("dropoff_hour", format_string("%02d", hour("tpep_dropoff_datetime")))
dfDateTimeDimension = dfDateTimeDimension.withColumn("dropoff_minute", format_string("%02d", minute("tpep_dropoff_datetime")))
dfDateTimeDimension = dfDateTimeDimension.withColumn("dropoff_second", format_string("%02d", second("tpep_dropoff_datetime")))

dfDateTimeDimension.show(5)
dfDateTimeDimension.count()

+-----------+--------------------+---------------------+-----------+------------+----------+-----------+-------------+-------------+------------+-------------+-----------+------------+--------------+--------------+
|dateTime_id|tpep_pickup_datetime|tpep_dropoff_datetime|pickup_year|pickup_month|pickup_day|pickup_hour|pickup_minute|pickup_second|dropoff_year|dropoff_month|dropoff_day|dropoff_hour|dropoff_minute|dropoff_second|
+-----------+--------------------+---------------------+-----------+------------+----------+-----------+-------------+-------------+------------+-------------+-----------+------------+--------------+--------------+
|          0| 2016-03-10 07:12:57|  2016-03-10 07:37:33|       2016|           3|        10|         07|           12|           57|        2016|            3|         10|          07|            37|            33|
|          1| 2016-03-10 07:17:02|  2016-03-10 07:41:10|       2016|           3|        10|         07|           17|           02|        

Creating the fact table and joining the above created Dimension tables

In [0]:
# Joining and selecting columns
fact_table = df_unique.join(df_Passenger, df_unique['trip_id'] == df_Passenger['passenger_id'], "inner") \
               .join(df_tripDistance, df_unique['trip_id'] == df_tripDistance['trip_distance_id'], "inner") \
               .join(dfRateCode, df_unique['trip_id'] == dfRateCode['rateCode_id'], "inner") \
               .join(dfPickupDim, df_unique['trip_id'] == dfPickupDim['pickup_id'], "inner") \
               .join(dfDropOffDim, df_unique['trip_id'] == dfDropOffDim['dropoff_id'], "inner") \
               .join(dfDateTimeDimension, df_unique['trip_id'] == dfDateTimeDimension['datetime_id'], "inner") \
               .join(dfPayment, df_unique['trip_id'] == dfPayment['payment_id'], "inner") \
               .select(df_unique['trip_id'], 'VendorID', 'datetime_id', 'passenger_id', 'trip_distance_id',
                       'rateCode_id', 'store_and_fwd_flag', 'pickup_id', 'dropoff_id',
                       'payment_id', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
                       'improvement_surcharge', 'total_amount')

# Show the resulting DataFrame
fact_table.show(5)
fact_table.count()

+-------+--------+-----------+------------+----------------+-----------+------------------+---------+----------+----------+-----------+-----+-------+----------+------------+---------------------+------------+
|trip_id|VendorID|datetime_id|passenger_id|trip_distance_id|rateCode_id|store_and_fwd_flag|pickup_id|dropoff_id|payment_id|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+-------+--------+-----------+------------+----------------+-----------+------------------+---------+----------+----------+-----------+-----+-------+----------+------------+---------------------+------------+
|      0|       2|          0|           0|               0|          0|                 N|        0|         0|         0|       20.5|  0.0|    0.5|      4.26|         0.0|                  0.3|       25.56|
|      1|       2|          1|           1|               1|          1|                 N|        1|         1|         1|       21.0|  0.0|    0.5|       3.0|    

Saving the dataframe as a csv file

In [0]:
# Saving the Passenger Dimension Table
df_Passenger.write.format("csv") \
  .option("sep", ",") \
  .option("header", "true") \
  .option("mode", "overwrite") \
  .save("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/PassengerTable.csv");

# Saving the Trip Distance Dimension Table
df_tripDistance.write.format("csv") \
  .option("sep", ",") \
  .option("header", "true") \
  .option("mode", "overwrite") \
  .save("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/TripDistanceTable.csv");

 # Saving the rate Code Dimension Table
dfRateCode.write.format("csv") \
  .option("sep", ",") \
  .option("header", "true") \
  .option("mode", "overwrite") \
  .save("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/RateCodeTable.csv");

# Saving the Pickup dimesion Table
dfPickupDim.write.format("csv") \
  .option("sep", ",") \
  .option("header", "true") \
  .option("mode", "overwrite") \
  .save("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/PickUpTable.csv");

# Saving the Dropoff Dimension Table
dfDropOffDim.write.format("csv") \
  .option("sep", ",") \
  .option("header", "true") \
  .option("mode", "overwrite") \
  .save("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/DropOffTable.csv");

# Saving the payment Dimension table
dfPayment.write.format("csv") \
  .option("sep", ",") \
  .option("header", "true") \
  .option("mode", "overwrite") \
  .save("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/PaymentTable.csv");

# Saving the Date Time Dimension Table
dfDateTimeDimension.write.format("csv") \
  .option("sep", ",") \
  .option("header", "true") \
  .option("mode", "overwrite") \
  .save("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/DateTimeTable.csv");

# Saving the Fact Table
fact_table.write.format("csv") \
  .option("sep", ",") \
  .option("header", "true") \
  .option("mode", "overwrite") \
  .save("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/FactTable.csv");

Checking the if the Tables got saved or not

In [0]:
%fs ls dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in

path,name,size,modificationTime
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/DateTimeTable.csv/,DateTimeTable.csv/,0,0
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/DropOffTable.csv/,DropOffTable.csv/,0,0
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/FactTable.csv/,FactTable.csv/,0,0
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/PassengerTable.csv/,PassengerTable.csv/,0,0
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/PaymentTable.csv/,PaymentTable.csv/,0,0
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/PickUpTable.csv/,PickUpTable.csv/,0,0
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/RateCodeTable.csv/,RateCodeTable.csv/,0,0
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/TripDistanceTable.csv/,TripDistanceTable.csv/,0,0
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/cab_data.csv,cab_data.csv,15824375,1712315110000
dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/uber_data-1.csv,uber_data-1.csv,15824375,1711556397000


Loading one of the tables to verify

In [0]:
df_Dummy = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/gouravpratap.202cd006@nitk.edu.in/DateTimeTable.csv")
df_Dummy.show(5)

+-----------+--------------------+---------------------+-----------+------------+----------+-----------+-------------+-------------+------------+-------------+-----------+------------+--------------+--------------+
|dateTime_id|tpep_pickup_datetime|tpep_dropoff_datetime|pickup_year|pickup_month|pickup_day|pickup_hour|pickup_minute|pickup_second|dropoff_year|dropoff_month|dropoff_day|dropoff_hour|dropoff_minute|dropoff_second|
+-----------+--------------------+---------------------+-----------+------------+----------+-----------+-------------+-------------+------------+-------------+-----------+------------+--------------+--------------+
|60129542144| 2016-03-01 00:00:31|  2016-03-01 00:13:00|       2016|           3|         1|         00|           00|           31|        2016|            3|          1|          00|            13|            00|
|60129542145| 2016-03-10 07:07:54|  2016-03-10 07:17:08|       2016|           3|        10|         07|           07|           54|        