In [0]:
#Bronze Layer – Ingest Raw Data

df_raw = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/uber_project/bronze/raw_volume/uber_data.csv")

df_raw.display()


VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
1,2016-03-01T00:00:00.000Z,2016-03-01T00:07:55.000Z,1,2.5,-73.97674560546875,40.765151977539055,1,N,-74.00426483154298,40.74612808227539,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35
1,2016-03-01T00:00:00.000Z,2016-03-01T00:11:06.000Z,1,2.9,-73.98348236083984,40.767925262451165,1,N,-74.00594329833984,40.7331657409668,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35
2,2016-03-01T00:00:00.000Z,2016-03-01T00:31:06.000Z,2,19.98,-73.78202056884764,40.64480972290039,1,N,-73.97454071044923,40.6757698059082,1,54.5,0.5,0.5,8.0,0.0,0.3,63.8
2,2016-03-01T00:00:00.000Z,2016-03-01T00:00:00.000Z,3,10.78,-73.86341857910156,40.769813537597656,1,N,-73.96965026855469,40.757766723632805,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62
2,2016-03-01T00:00:00.000Z,2016-03-01T00:00:00.000Z,5,30.43,-73.97174072265625,40.79218292236328,3,N,-74.17716979980467,40.69505310058594,1,98.0,0.0,0.0,0.0,15.5,0.3,113.8
2,2016-03-01T00:00:00.000Z,2016-03-01T00:00:00.000Z,5,5.92,-74.01719665527344,40.70538330078125,1,N,-73.97807312011719,40.75578689575195,1,23.5,1.0,0.5,5.06,0.0,0.3,30.36
2,2016-03-01T00:00:00.000Z,2016-03-01T00:00:00.000Z,6,5.72,-73.99458312988281,40.72784805297852,1,N,0.0,0.0,2,23.0,0.5,0.5,0.0,0.0,0.3,24.3
1,2016-03-01T00:00:01.000Z,2016-03-01T00:16:04.000Z,1,6.2,-73.78877258300781,40.64775848388672,1,N,-73.82920837402342,40.712345123291016,3,20.5,0.5,0.5,0.0,0.0,0.3,21.8
1,2016-03-01T00:00:01.000Z,2016-03-01T00:05:00.000Z,1,0.7,-73.95822143554686,40.76464080810546,1,N,-73.9678955078125,40.76290130615234,1,5.5,0.5,0.5,2.0,0.0,0.3,8.8
2,2016-03-01T00:00:01.000Z,2016-03-01T00:24:06.000Z,3,7.18,-73.98577880859375,40.74119186401367,1,N,-73.94635009765625,40.79787826538086,1,23.5,0.5,0.5,3.2,0.0,0.3,28.0


In [0]:
#Write to Bronze Delta Table
df_raw.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("uber_project.bronze.uber_raw")


In [0]:
# Silver Layer – Cleaning & Transformation

from pyspark.sql.functions import col, to_timestamp

df_bronze = spark.table("uber_project.bronze.uber_raw")

df_silver = df_bronze \
    .dropDuplicates() \
    .filter(col("fare_amount").isNotNull()) \
    .withColumn("pickup_datetime",
                to_timestamp(col("tpep_pickup_datetime"))) \
    .withColumn("dropoff_datetime",
                to_timestamp(col("tpep_dropoff_datetime"))) \
    .withColumn("fare_amount",
                col("fare_amount").cast("double"))




In [0]:
#Write Silver Table

df_silver.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("uber_project.silver.uber_clean")



In [0]:
#Create Gold Tables

df_silver = spark.table("uber_project.silver.uber_clean")


In [0]:
# Create dim_date

from pyspark.sql.functions import year, month, dayofmonth, date_format

df_date = df_silver \
    .withColumn("date", col("pickup_datetime").cast("date")) \
    .withColumn("year", year("pickup_datetime")) \
    .withColumn("month", month("pickup_datetime")) \
    .withColumn("day", dayofmonth("pickup_datetime")) \
    .withColumn("month_name", date_format("pickup_datetime", "MMMM")) \
    .select("date","year","month","month_name","day") \
    .distinct()


In [0]:
#Write 

df_date.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("uber_project.gold.dim_date")


In [0]:
#Create dim_hour

from pyspark.sql.functions import hour

df_hour = df_silver \
    .withColumn("hour", hour("pickup_datetime")) \
    .select("hour") \
    .distinct()



In [0]:
#Write:

df_hour.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("uber_project.gold.dim_hour")



In [0]:
#Create dim_passenger

df_passenger = df_silver \
    .select("passenger_count") \
    .distinct()


In [0]:
#Write:

df_passenger.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("uber_project.gold.dim_passenger")


In [0]:
#Create fact_trips
#Now create fact table with foreign keys:
from pyspark.sql.functions import col, hour

df_fact = df_silver \
    .withColumn("date", col("pickup_datetime").cast("date")) \
    .withColumn("hour", hour("pickup_datetime")) \
    .select(
        "date",
        "hour",
        "passenger_count",
        "trip_distance",
        "fare_amount"
    )


In [0]:
#Write:
df_fact.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("uber_project.gold.fact_trips")


In [0]:
#Partition Fact Table

df_fact.write.format("delta") \
    .partitionBy("date") \
    .mode("overwrite") \
    .saveAsTable("uber_project.gold.fact_trips")



In [0]:
from pyspark.sql.functions import col, to_date, hour, year, month, dayofmonth

df_silver = spark.table("uber_project.silver.uber_clean")

# Create derived columns
df_silver = df_silver \
    .withColumn("trip_date", to_date(col("pickup_datetime"))) \
    .withColumn("trip_hour", hour(col("pickup_datetime"))) \
    .withColumn("year", year(col("pickup_datetime"))) \
    .withColumn("month", month(col("pickup_datetime"))) \
    .withColumn("day", dayofmonth(col("pickup_datetime")))

# Overwrite silver table with new columns
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("uber_project.silver.uber_clean")


In [0]:
from pyspark.sql.functions import date_format

df_silver = spark.table("uber_project.silver.uber_clean")

df_dim_date = df_silver.select("trip_date") \
    .distinct() \
    .withColumnRenamed("trip_date", "date") \
    .withColumn("year", year("date")) \
    .withColumn("month", month("date")) \
    .withColumn("month_name", date_format("date", "MMMM")) \
    .withColumn("day", dayofmonth("date"))

df_dim_date.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("uber_project.gold.dim_date")


In [0]:
display(spark.table("uber_project.gold.dim_date"))


date,year,month,month_name,day
2016-03-10,2016,3,March,10
2016-03-01,2016,3,March,1
