#### Loading the Parquet Data

In [None]:
import os, shutil
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import functions as f
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("nyc-taxi-notebook").getOrCreate()

In [2]:
spark

In [3]:
df = spark.read.parquet("data.parquet")
df.show(10)

23/12/07 22:48:08 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+--------------+----------+-----------------+-----------------+--------+----------+------------------+------------------+-------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|       PU_zone|PU_borough|           PU_lon|           PU_lat| DO_zone|DO_borough|            DO_lon|            DO_lat|trip_id|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+

#### Creating our Fact & Dimension Tables

In [4]:
## Date Time Dimension
datetime_dim = df.select("tpep_pickup_datetime", "tpep_dropoff_datetime")
datetime_dim = (datetime_dim
                .withColumn("pick_up_hour",f.hour("tpep_pickup_datetime"))
                .withColumn("pick_up_day",f.dayofmonth("tpep_pickup_datetime"))
                .withColumn("pick_up_month",f.month("tpep_pickup_datetime"))
                .withColumn("pick_up_year",f.year("tpep_pickup_datetime"))
                .withColumn("pick_up_dayoftheweek",f.dayofweek("tpep_pickup_datetime"))
                .withColumn("drop_off_hour",f.hour("tpep_dropoff_datetime"))
                .withColumn("drop_off_day",f.dayofmonth("tpep_dropoff_datetime"))
                .withColumn("drop_off_month",f.month("tpep_dropoff_datetime"))
                .withColumn("drop_off_year",f.year("tpep_dropoff_datetime"))
                .withColumn("drop_off_dayoftheweek",f.dayofweek("tpep_dropoff_datetime")))
datetime_dim = datetime_dim.sort("pick_up_day", "pick_up_month", "pick_up_year")


### Creating Index
datetime_dim = datetime_dim.select("*").withColumn("datetime_id", monotonically_increasing_id())

### Reordering Columns
datetime_dim = datetime_dim.select("datetime_id", "tpep_pickup_datetime", "pick_up_hour", "pick_up_day",
                                   "pick_up_month", "pick_up_year", "pick_up_dayoftheweek",
                                   "tpep_dropoff_datetime", "drop_off_hour", "drop_off_day",
                                   "drop_off_month", "drop_off_year", "drop_off_dayoftheweek")

datetime_dim.show(10)
#datetime_dim.coalesce(1).write.format("parquet").saveAsTable("tables/datetime_dim.parquet")

+-----------+--------------------+------------+-----------+-------------+------------+--------------------+---------------------+-------------+------------+--------------+-------------+---------------------+
|datetime_id|tpep_pickup_datetime|pick_up_hour|pick_up_day|pick_up_month|pick_up_year|pick_up_dayoftheweek|tpep_dropoff_datetime|drop_off_hour|drop_off_day|drop_off_month|drop_off_year|drop_off_dayoftheweek|
+-----------+--------------------+------------+-----------+-------------+------------+--------------------+---------------------+-------------+------------+--------------+-------------+---------------------+
|          0| 2009-01-01 04:36:53|           4|          1|            1|        2009|                   5|  2009-01-02 00:00:00|            0|           2|             1|         2009|                    6|
|          1| 2009-01-01 05:56:45|           5|          1|            1|        2009|                   5|  2009-01-01 06:09:59|            6|           1|            

In [5]:
## Passenger Count Dimension
passenger_count_dim = df.select("passenger_count")
passenger_count_dim = passenger_count_dim.dropDuplicates(subset = ['passenger_count']).sort("passenger_count")

# ### Creating Index                                     )
passenger_count_dim = passenger_count_dim.select("*").withColumn("passenger_count_id", monotonically_increasing_id())

# ### Reordering Columns
passenger_count_dim = passenger_count_dim.select("passenger_count_id", "passenger_count")
passenger_count_dim.show()

+------------------+---------------+
|passenger_count_id|passenger_count|
+------------------+---------------+
|                 0|           null|
|                 1|            0.0|
|                 2|            1.0|
|                 3|            2.0|
|                 4|            3.0|
|                 5|            4.0|
|                 6|            5.0|
|                 7|            6.0|
|                 8|            7.0|
|                 9|            8.0|
|                10|            9.0|
+------------------+---------------+



In [6]:
## Trip Distance Dimension
trip_distance_dim = df.select("trip_distance")
trip_distance_dim = trip_distance_dim.dropDuplicates(subset = ['trip_distance']).sort("trip_distance")


# ### Creating Index
trip_distance_dim = trip_distance_dim.select("*").withColumn("trip_distance_id", monotonically_increasing_id())


### Reordering Columns
trip_distance_dim = trip_distance_dim.select("trip_distance_id", "trip_distance")
trip_distance_dim.show(10)

+----------------+-------------+
|trip_distance_id|trip_distance|
+----------------+-------------+
|               0|          0.0|
|               1|         0.01|
|               2|         0.02|
|               3|         0.03|
|               4|         0.04|
|               5|         0.05|
|               6|         0.06|
|               7|         0.07|
|               8|         0.08|
|               9|         0.09|
+----------------+-------------+
only showing top 10 rows



In [7]:
## Rate Code Dimension

### Assigning rate code mapping data as per the data dictionary
rate_code_data = [
    (1,"Standard rate"),
    (2,"JFK"),
    (3,"Newark"),
    (4,"Nassau or Westchester"),
    (5,"Negotiated fare"),
    (6,"Group ride")
]
### Creating new dataframe from mapping data
rate_code_type = spark.createDataFrame(data=rate_code_data, schema = ["RatecodeID","rate_code_name"])
### Investigating the schema of the columns 
# rate_code_type.printSchema() 
### Changing column schema to match expected datatype for the join
rate_code_type = rate_code_type.select(f.expr("CAST(RatecodeID AS double) AS RatecodeID"), "rate_code_name") 

### Creating Rate Code Dimension
rate_code_dim = df.select("RatecodeID")
rate_code_dim = rate_code_dim.dropDuplicates(subset = ['RatecodeID']).sort("RatecodeID")


### Creating Index
rate_code_dim = rate_code_dim.select("*").withColumn("ratecode_id", monotonically_increasing_id()-1) # The index started at 1 in this case, so a -1 was inserted

### Joining dataframes on RatecodeID
rate_code_dim = rate_code_dim.join(rate_code_type,
                   ["RatecodeID"])

### Reordering Columns
rate_code_dim = rate_code_dim.select("ratecode_id", "RatecodeID", "rate_code_name")
rate_code_dim.show()

+-----------+----------+--------------------+
|ratecode_id|RatecodeID|      rate_code_name|
+-----------+----------+--------------------+
|          0|       1.0|       Standard rate|
|          1|       2.0|                 JFK|
|          2|       3.0|              Newark|
|          3|       4.0|Nassau or Westche...|
|          4|       5.0|     Negotiated fare|
+-----------+----------+--------------------+



In [8]:
## Pickup Location Dimension
pickup_location_dim = df.select("PU_lon", "PU_lat", "PU_borough","PU_zone")
pickup_location_dim = pickup_location_dim.dropDuplicates(subset = ['PU_lat']).sort("PU_lat")

### Creating Index
pickup_location_dim = pickup_location_dim.select("*").withColumn("pickup_location_id", monotonically_increasing_id())

### Reordering Columns
pickup_location_dim = pickup_location_dim.select("pickup_location_id", "PU_lon", "PU_lat", "PU_borough","PU_zone")

pickup_location_dim.show(10)

                                                                                

+------------------+------------------+------------------+-------------+--------------------+
|pickup_location_id|            PU_lon|            PU_lat|   PU_borough|             PU_zone|
+------------------+------------------+------------------+-------------+--------------------+
|                 0|-74.23353546082052| 40.52549110546785|Staten Island|Charleston/Totten...|
|                 1|-74.15089028926955| 40.55186202510041|Staten Island|         Great Kills|
|                 2|-74.18848459794721| 40.55265878064343|Staten Island|       Arden Heights|
|                 3|-73.90691199328067|40.559134755628214|       Queens|Breezy Point/Fort...|
|                 4|-74.12258304710812| 40.56199406259296|Staten Island|             Oakwood|
|                 5|-74.10501884907058| 40.57176876885639|Staten Island|New Dorp/Midland ...|
|                 6|-74.18642081572338| 40.57677255023222|Staten Island|     Freshkills Park|
|                 7|-73.98794360684474|40.576961310705336|  

In [9]:
## Drop Off Location Dimension
dropoff_location_dim = df.select("DO_lon", "DO_lat", "DO_borough","DO_zone")
dropoff_location_dim = dropoff_location_dim.dropDuplicates(subset = ['DO_lat']).sort("DO_lat")

### Creating Index
dropoff_location_dim = dropoff_location_dim.select("*").withColumn("dropoff_location_id", monotonically_increasing_id())

### Reordering Columns
dropoff_location_dim = dropoff_location_dim.select("dropoff_location_id", "DO_lon", "DO_lat", "DO_borough","DO_zone")

dropoff_location_dim.show(10)

+-------------------+------------------+------------------+-------------+--------------------+
|dropoff_location_id|            DO_lon|            DO_lat|   DO_borough|             DO_zone|
+-------------------+------------------+------------------+-------------+--------------------+
|                  0|-74.23353546082052| 40.52549110546785|Staten Island|Charleston/Totten...|
|                  1|-74.18767927741463|40.528685582160755|Staten Island|Eltingville/Annad...|
|                  2|-74.20782577848769|40.540333107919224|Staten Island|   Rossville/Woodrow|
|                  3|-74.15089028926955| 40.55186202510041|Staten Island|         Great Kills|
|                  4|-74.18848459794721| 40.55265878064343|Staten Island|       Arden Heights|
|                  5|-73.90691199328067|40.559134755628214|       Queens|Breezy Point/Fort...|
|                  6|-74.12258304710812| 40.56199406259296|Staten Island|             Oakwood|
|                  7|-74.10501884907058| 40.571768

In [10]:
## Payment Type Dimension
### Assigning rate code mapping data as per the data dictionary
payment_type_data = [
    (1,"Credit card"),
    (2,"Cash"),
    (3,"No charge"),
    (4,"Dispute"),
    (5,"Unknown"),
    (6,"Voided trip")
]
### Creating new dataframe from mapping data
payment_type_name = spark.createDataFrame(data=payment_type_data, schema = ["payment_type","payment_type_name"])
### Investigating the schema of the columns
# payment_type_name.printSchema()

### Creating Rate Code Dimension
payment_type_dim = df.select("payment_type")
payment_type_dim = payment_type_dim.dropDuplicates(subset = ['payment_type']).sort("payment_type")

### Creating Index
payment_type_dim = payment_type_dim.select("*").withColumn("payment_type_id", monotonically_increasing_id()-1)

### Joining dataframes on RatecodeID
payment_type_dim = payment_type_dim.join(payment_type_name,
                   ["payment_type"])

### Reordering Columns
payment_type_dim = payment_type_dim.select("payment_type_id", "payment_type", "payment_type_name")
payment_type_dim.show()

+---------------+------------+-----------------+
|payment_type_id|payment_type|payment_type_name|
+---------------+------------+-----------------+
|              0|           1|      Credit card|
|              1|           2|             Cash|
|              2|           3|        No charge|
|              3|           4|          Dispute|
+---------------+------------+-----------------+



In [11]:
## Fact Table

### Joining data from dimentions into dact table
fact_table = df.join(f.broadcast(passenger_count_dim), df["passenger_count"] == passenger_count_dim["passenger_count"]) \
               .join(f.broadcast(trip_distance_dim), df["trip_distance"] == trip_distance_dim["trip_distance"]) \
               .join(f.broadcast(rate_code_dim), df["RatecodeID"] == rate_code_dim["RatecodeID"]) \
               .join(f.broadcast(pickup_location_dim), df["PU_lon"] == pickup_location_dim["PU_lon"]) \
               .join(f.broadcast(dropoff_location_dim), df["DO_lon"] == dropoff_location_dim["DO_lon"]) \
               .join(f.broadcast(datetime_dim), df["tpep_pickup_datetime"] == datetime_dim["tpep_pickup_datetime"]) \
               .join(f.broadcast(payment_type_dim), df["payment_type"] == payment_type_dim["payment_type"]) \
               .select("trip_id","VendorID", "datetime_id", "passenger_count_id", "trip_distance_id",
                       "ratecode_id", "store_and_fwd_flag", "pickup_location_id", "dropoff_location_id",
                       "payment_type_id", "fare_amount", "extra", "mta_tax", "tip_amount",
                       "tolls_amount", "improvement_surcharge", "total_amount")

fact_table.show(10)


+-------+--------+-----------+------------------+----------------+-----------+------------------+------------------+-------------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|trip_id|VendorID|datetime_id|passenger_count_id|trip_distance_id|ratecode_id|store_and_fwd_flag|pickup_location_id|dropoff_location_id|payment_type_id|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+-------+--------+-----------+------------------+----------------+-----------+------------------+------------------+-------------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|      0|       1|          4|                 2|             430|          0|                 N|               189|                146|              0|       21.9|  3.5|    0.5|      5.35|         0.0|                  1.0|       32.25|
|      1|       1|          5|                 2

In [12]:
# stop the session 
# spark.stop()

#### Load Data to filepath for upload
The below two cells are the builiding blocks to the function which saves each pyspark dataframe as a singular file

In [13]:
# fact_table.coalesce(1).write.parquet("temp/fact_table.parquet")

In [14]:
# filenames = os.listdir("temp/fact_table.parquet")
    
# name = ""
# for filename in filenames:
#     if filename.endswith(".snappy.parquet"):
#         name += filename

# if not os.path.exists('tables'):
#    os.makedirs('tables')

# path1 = f"temp/fact_table.parquet/{name}"
# path2 = "tables/fact_table.snappy.parquet"

# shutil.move(path1, path2)
# shutil.rmtree("temp")

Now that the logic has been sorted, proceeding with developing the function & saving each dataframe

In [16]:
def save_table(table, str):
    if os.path.exists("temp"):
        shutil.rmtree("temp")

    # Pyspark syntax to save dataframe within designated filepath
    table.coalesce(1).write.option("compression", "uncompressed").parquet("temp/table.parquet")

    # Obtaining the name of our generated parquet file
    filenames = os.listdir("temp/table.parquet")
    name = ""
    for filename in filenames:
        if filename.endswith("parquet"):
            name += filename

    # Preparing a desired filepath to move our parquet files to
    if not os.path.exists("tables"):
        os.makedirs("tables")

    path1 = f"temp/table.parquet/{name}"
    path2 = f"tables/{str}.parquet"
    
    # Moving our parquet file to a desired filepath & renaming it
    shutil.move(path1, path2)
    
    # Deleting the generated filepath from pyspark
    shutil.rmtree("temp")

In [17]:
# Saving our tables!
save_table(passenger_count_dim, "passenger_count_dim")

In [18]:
save_table(trip_distance_dim, "trip_distance_dim")

In [19]:
save_table(rate_code_dim, "rate_code_dim")

In [20]:
save_table(pickup_location_dim, "pickup_location_dim")

In [21]:
save_table(dropoff_location_dim, "dropoff_location_dim")

In [22]:
save_table(datetime_dim, "datetime_dim")

                                                                                

In [23]:
save_table(payment_type_dim, "payment_type_dim")

In [24]:
save_table(fact_table, "fact_table")

                                                                                