In [0]:
%fs
ls "/mnt/yellowcab"

path,name,size,modificationTime
dbfs:/mnt/yellowcab/audit/,audit/,0,1714172809000
dbfs:/mnt/yellowcab/bronze/,bronze/,0,1714172328000
dbfs:/mnt/yellowcab/gold/,gold/,0,1714172819000
dbfs:/mnt/yellowcab/silver/,silver/,0,1714172801000


Checking for missing values

In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Missing Data Check") \
    .getOrCreate()

# Path to the CSV file in the bronze area
csv_path = "/mnt/yellowcab/bronze/yellowcabdataset.csv"

# Read the CSV file into a DataFrame
df = spark.read.format("csv").option("header","true").option("inferSchema","true").csv(csv_path) 

# Check for missing values in each column
missing_data_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])

# Convert the missing_data_counts DataFrame to Pandas DataFrame
missing_data_counts_pd = missing_data_counts.toPandas()

# Display the missing data counts in table format
missing_data_counts_pd



Unnamed: 0,_c0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


Print the data frame

In [0]:
df_pd = df.toPandas()
df_pd

Unnamed: 0,_c0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,24870114,2,03/25/2017 8:55:43 AM,03/25/2017 9:09:47 AM,6,3.34,1,N,100,231,1,13.0,0.0,0.5,2.76,0.00,0.3,16.56
1,35634249,1,04/11/2017 2:53:28 PM,04/11/2017 3:19:58 PM,1,1.80,1,N,186,43,1,16.0,0.0,0.5,4.00,0.00,0.3,20.80
2,106203690,1,12/15/2017 7:26:56 AM,12/15/2017 7:34:08 AM,1,1.00,1,N,262,236,1,6.5,0.0,0.5,1.45,0.00,0.3,8.75
3,38942136,2,05/07/2017 1:17:59 PM,05/07/2017 1:48:14 PM,1,3.70,1,N,188,97,1,20.5,0.0,0.5,6.39,0.00,0.3,27.69
4,30841670,2,04/15/2017 11:32:20 PM,04/15/2017 11:49:03 PM,1,4.37,1,N,4,112,2,16.5,0.5,0.5,0.00,0.00,0.3,17.80
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
22694,14873857,2,02/24/2017 5:37:23 PM,02/24/2017 5:40:39 PM,3,0.61,1,N,48,186,2,4.0,1.0,0.5,0.00,0.00,0.3,5.80
22695,66632549,2,08/06/2017 4:43:59 PM,08/06/2017 5:24:47 PM,1,16.71,2,N,132,164,1,52.0,0.0,0.5,14.64,5.76,0.3,73.20
22696,74239933,2,09/04/2017 2:54:14 PM,09/04/2017 2:58:22 PM,1,0.42,1,N,107,234,2,4.5,0.0,0.5,0.00,0.00,0.3,5.30
22697,60217333,2,07/15/2017 12:56:30 PM,07/15/2017 1:08:26 PM,1,2.36,1,N,68,144,1,10.5,0.0,0.5,1.70,0.00,0.3,13.00


Checking for duplicate values and deduping

In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Duplicate Data Check") \
    .getOrCreate()

# Path to the CSV file in the bronze area
csv_path = "/mnt/yellowcab/bronze/yellowcabdataset.csv"

# Read the CSV file into a DataFrame
df = spark.read.format("csv").option("header","true").option("inferSchema","true").csv(csv_path)

# Count occurrences of each distinct row (i.e., find duplicates)
duplicate_counts = df.groupBy(df.columns).count().where(col('count') > 1)

# Display the duplicate rows
duplicate_counts.show(truncate=False)




+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----+
|_c0|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|count|
+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----+
+---+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+-----+



In [0]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



Converting Date-Time format to Date and Time separately

In [0]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, date_format

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Separate Date and Time") \
    .getOrCreate()

# Path to the CSV file in the bronze area
csv_path = "/mnt/yellowcab/bronze/yellowcabdataset.csv"

# Read the CSV file into a DataFrame
df = spark.read.option("header", "true").csv(csv_path)

# Convert tpep_pickup_datetime to timestamp type
df = df.withColumn("tpep_pickup_datetime", to_timestamp(col("tpep_pickup_datetime"), "MM/dd/yyyy h:mm:ss a"))

# Separate date and time components
df = df.withColumn("tpep_pickup_date", date_format(col("tpep_pickup_datetime"), "MM-dd-yyyy")) \
       .withColumn("tpep_pickup_time", date_format(col("tpep_pickup_datetime"), "HH:mm:ss"))

# Show the updated DataFrame


dfnew_pd = df.toPandas()
dfnew_pd




Unnamed: 0,_c0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,tpep_pickup_date,tpep_pickup_time
0,24870114,2,2017-03-25 08:55:43,03/25/2017 9:09:47 AM,6,3.34,1,N,100,231,1,13,0,0.5,2.76,0,0.3,16.56,03-25-2017,08:55:43
1,35634249,1,2017-04-11 14:53:28,04/11/2017 3:19:58 PM,1,1.8,1,N,186,43,1,16,0,0.5,4,0,0.3,20.8,04-11-2017,14:53:28
2,106203690,1,2017-12-15 07:26:56,12/15/2017 7:34:08 AM,1,1,1,N,262,236,1,6.5,0,0.5,1.45,0,0.3,8.75,12-15-2017,07:26:56
3,38942136,2,2017-05-07 13:17:59,05/07/2017 1:48:14 PM,1,3.7,1,N,188,97,1,20.5,0,0.5,6.39,0,0.3,27.69,05-07-2017,13:17:59
4,30841670,2,2017-04-15 23:32:20,04/15/2017 11:49:03 PM,1,4.37,1,N,4,112,2,16.5,0.5,0.5,0,0,0.3,17.8,04-15-2017,23:32:20
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
22694,14873857,2,2017-02-24 17:37:23,02/24/2017 5:40:39 PM,3,0.61,1,N,48,186,2,4,1,0.5,0,0,0.3,5.8,02-24-2017,17:37:23
22695,66632549,2,2017-08-06 16:43:59,08/06/2017 5:24:47 PM,1,16.71,2,N,132,164,1,52,0,0.5,14.64,5.76,0.3,73.2,08-06-2017,16:43:59
22696,74239933,2,2017-09-04 14:54:14,09/04/2017 2:58:22 PM,1,0.42,1,N,107,234,2,4.5,0,0.5,0,0,0.3,5.3,09-04-2017,14:54:14
22697,60217333,2,2017-07-15 12:56:30,07/15/2017 1:08:26 PM,1,2.36,1,N,68,144,1,10.5,0,0.5,1.7,0,0.3,13,07-15-2017,12:56:30


In [0]:
# Separate date and time components for tpep_dropoff_datetime
df = df.withColumn("tpep_dropoff_datetime", to_timestamp(col("tpep_dropoff_datetime"), "MM/dd/yyyy h:mm:ss a")) \
       .withColumn("tpep_dropoff_date", date_format(col("tpep_dropoff_datetime"), "MM-dd-yyyy")) \
       .withColumn("tpep_dropoff_time", date_format(col("tpep_dropoff_datetime"), "HH:mm:ss"))

# Show the updated DataFrame
dfnew_pd = df.toPandas()
dfnew_pd




Unnamed: 0,_c0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,tpep_pickup_date,tpep_pickup_time,tpep_dropoff_date,tpep_dropoff_time
0,24870114,2,2017-03-25 08:55:43,2017-03-25 09:09:47,6,3.34,1,N,100,231,1,13,0,0.5,2.76,0,0.3,16.56,03-25-2017,08:55:43,03-25-2017,09:09:47
1,35634249,1,2017-04-11 14:53:28,2017-04-11 15:19:58,1,1.8,1,N,186,43,1,16,0,0.5,4,0,0.3,20.8,04-11-2017,14:53:28,04-11-2017,15:19:58
2,106203690,1,2017-12-15 07:26:56,2017-12-15 07:34:08,1,1,1,N,262,236,1,6.5,0,0.5,1.45,0,0.3,8.75,12-15-2017,07:26:56,12-15-2017,07:34:08
3,38942136,2,2017-05-07 13:17:59,2017-05-07 13:48:14,1,3.7,1,N,188,97,1,20.5,0,0.5,6.39,0,0.3,27.69,05-07-2017,13:17:59,05-07-2017,13:48:14
4,30841670,2,2017-04-15 23:32:20,2017-04-15 23:49:03,1,4.37,1,N,4,112,2,16.5,0.5,0.5,0,0,0.3,17.8,04-15-2017,23:32:20,04-15-2017,23:49:03
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
22694,14873857,2,2017-02-24 17:37:23,2017-02-24 17:40:39,3,0.61,1,N,48,186,2,4,1,0.5,0,0,0.3,5.8,02-24-2017,17:37:23,02-24-2017,17:40:39
22695,66632549,2,2017-08-06 16:43:59,2017-08-06 17:24:47,1,16.71,2,N,132,164,1,52,0,0.5,14.64,5.76,0.3,73.2,08-06-2017,16:43:59,08-06-2017,17:24:47
22696,74239933,2,2017-09-04 14:54:14,2017-09-04 14:58:22,1,0.42,1,N,107,234,2,4.5,0,0.5,0,0,0.3,5.3,09-04-2017,14:54:14,09-04-2017,14:58:22
22697,60217333,2,2017-07-15 12:56:30,2017-07-15 13:08:26,1,2.36,1,N,68,144,1,10.5,0,0.5,1.7,0,0.3,13,07-15-2017,12:56:30,07-15-2017,13:08:26


In [0]:
df.printSchema()


root
 |-- _c0: string (nullable = true)
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- tpep_pickup_date: string (nullable = true)
 |-- tpep_pickup_time: string (nullable = true)
 |-- tpep_dropoff_date: string (nullable = true)
 |-- tpep_dropoff_time: string (nullable = tru

Sending the cleaned data into the SILVER LAYER

In [0]:
# Path to the Silver area
silver_path = "/mnt/yellowcab/silver/"

# Write the DataFrame to the Silver area as a CSV file with the specified file name
df.write.mode("overwrite").option("header", "true").csv(silver_path)








Reading the data in the silver level before silverlevel transformation

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read_CSV_into_DataFrame") \
    .getOrCreate()

# Define the schema for the CSV file
schema = StructType([
    StructField("_c0", StringType(), True),
    StructField("VendorID", StringType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", StringType(), True),
    StructField("trip_distance", StringType(), True),
    StructField("RatecodeID", StringType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", StringType(), True),
    StructField("DOLocationID", StringType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", StringType(), True),
    StructField("extra", StringType(), True),
    StructField("mta_tax", StringType(), True),
    StructField("tip_amount", StringType(), True),
    StructField("tolls_amount", StringType(), True),
    StructField("improvement_surcharge", StringType(), True),
    StructField("total_amount", StringType(), True),
    StructField("tpep_pickup_date", StringType(), True),
    StructField("tpep_pickup_time", StringType(), True),
    StructField("tpep_dropoff_date", StringType(), True),
    StructField("tpep_dropoff_time", StringType(), True)
])

# Read the CSV file from the specified path into a DataFrame with the specified schema
csv_path = "dbfs:/mnt/yellowcab/silver/part-00000-tid-2348824206068138771-d8093b30-989f-431d-8dd7-239bd2cdc9ad-77-1-c000.csv"
silver_df = spark.read.option("header", "true").schema(schema).csv(csv_path)

# Display the contents of the DataFrame
silver_df.show()




+---------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------+----------------+-----------------+-----------------+
|      _c0|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|tpep_pickup_date|tpep_pickup_time|tpep_dropoff_date|tpep_dropoff_time|
+---------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------+----------------+-----------------+-----------------+
| 24870114|       2| 2017-03-25 08:55:43|  201

In [0]:
# Read the CSV file from the Silver area into a DataFrame with the specified schema
silver_df = spark.read.option("header", "true").schema(schema).csv(silver_path)

# Display both column names and data
# Display the contents of the DataFrame
silver_df.show()

+---------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------+----------------+-----------------+-----------------+
|      _c0|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|tpep_pickup_date|tpep_pickup_time|tpep_dropoff_date|tpep_dropoff_time|
+---------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+----------------+----------------+-----------------+-----------------+
| 24870114|       2| 2017-03-25 08:55:43|  201

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType, DateType, TimestampType

# Assuming you have a DataFrame named df
df = df.withColumn("_c0", col("_c0").cast(IntegerType()))
df = df.withColumn("VendorID", col("VendorID").cast(IntegerType()))
df = df.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast(TimestampType()))
df = df.withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(TimestampType()))
df = df.withColumn("passenger_count", col("passenger_count").cast(IntegerType()))
df = df.withColumn("trip_distance", col("trip_distance").cast(FloatType()))
df = df.withColumn("RatecodeID", col("RatecodeID").cast(IntegerType()))
# store_and_fwd_flag remains string
df = df.withColumn("PULocationID", col("PULocationID").cast(IntegerType()))
df = df.withColumn("DOLocationID", col("DOLocationID").cast(IntegerType()))
df = df.withColumn("payment_type", col("payment_type").cast(FloatType()))
df = df.withColumn("fare_amount", col("fare_amount").cast(FloatType()))
df = df.withColumn("extra", col("extra").cast(FloatType()))
df = df.withColumn("mta_tax", col("mta_tax").cast(FloatType()))
df = df.withColumn("tip_amount", col("tip_amount").cast(FloatType()))
df = df.withColumn("tolls_amount", col("tolls_amount").cast(FloatType()))
df = df.withColumn("improvement_surcharge", col("improvement_surcharge").cast(FloatType()))
df = df.withColumn("total_amount", col("total_amount").cast(FloatType()))
df = df.withColumn("tpep_pickup_date", col("tpep_pickup_date").cast(DateType()))
df = df.withColumn("tpep_pickup_time", col("tpep_pickup_time").cast(TimestampType()))
df = df.withColumn("tpep_dropoff_date", col("tpep_dropoff_date").cast(DateType()))
df = df.withColumn("tpep_dropoff_time", col("tpep_dropoff_time").cast(TimestampType()))


Creating Fact and Dimension Tables and storing them in the GOLD LAYER

In [0]:
# Load data from Silver layer into DataFrame
silver_df = spark.read.option("header", "true").csv(silver_path)


[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-4435777153636194>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Load data from Silver layer into DataFrame[39;00m
[0;32m----> 2[0m silver_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39moption([38;5;124m"[39m[38;5;124mheader[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtrue[39m[38;5;124m"[39m)[38;5;241m.[39mcsv([43msilver_path[49m)

[0;31mNameError[0m: name 'silver_path' is not defined

In [0]:
silverdf_pd = silver_df.toPandas()
silverdf_pd

Unnamed: 0,_c0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,tpep_pickup_date,tpep_pickup_time,tpep_dropoff_date,tpep_dropoff_time
0,24870114,2,2017-03-25T08:55:43.000Z,2017-03-25T09:09:47.000Z,6,3.34,1,N,100,231,1,13,0,0.5,2.76,0,0.3,16.56,03-25-2017,08:55:43,03-25-2017,09:09:47
1,35634249,1,2017-04-11T14:53:28.000Z,2017-04-11T15:19:58.000Z,1,1.8,1,N,186,43,1,16,0,0.5,4,0,0.3,20.8,04-11-2017,14:53:28,04-11-2017,15:19:58
2,106203690,1,2017-12-15T07:26:56.000Z,2017-12-15T07:34:08.000Z,1,1,1,N,262,236,1,6.5,0,0.5,1.45,0,0.3,8.75,12-15-2017,07:26:56,12-15-2017,07:34:08
3,38942136,2,2017-05-07T13:17:59.000Z,2017-05-07T13:48:14.000Z,1,3.7,1,N,188,97,1,20.5,0,0.5,6.39,0,0.3,27.69,05-07-2017,13:17:59,05-07-2017,13:48:14
4,30841670,2,2017-04-15T23:32:20.000Z,2017-04-15T23:49:03.000Z,1,4.37,1,N,4,112,2,16.5,0.5,0.5,0,0,0.3,17.8,04-15-2017,23:32:20,04-15-2017,23:49:03
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
22694,14873857,2,2017-02-24T17:37:23.000Z,2017-02-24T17:40:39.000Z,3,0.61,1,N,48,186,2,4,1,0.5,0,0,0.3,5.8,02-24-2017,17:37:23,02-24-2017,17:40:39
22695,66632549,2,2017-08-06T16:43:59.000Z,2017-08-06T17:24:47.000Z,1,16.71,2,N,132,164,1,52,0,0.5,14.64,5.76,0.3,73.2,08-06-2017,16:43:59,08-06-2017,17:24:47
22696,74239933,2,2017-09-04T14:54:14.000Z,2017-09-04T14:58:22.000Z,1,0.42,1,N,107,234,2,4.5,0,0.5,0,0,0.3,5.3,09-04-2017,14:54:14,09-04-2017,14:58:22
22697,60217333,2,2017-07-15T12:56:30.000Z,2017-07-15T13:08:26.000Z,1,2.36,1,N,68,144,1,10.5,0,0.5,1.7,0,0.3,13,07-15-2017,12:56:30,07-15-2017,13:08:26


Creating Fact Table

In [0]:
# Fact Table: Trips_Fact
trips_fact_df = silver_df.select(
    col("_c0").cast("integer").alias("_c0"),
    col("tpep_pickup_datetime").cast("timestamp").alias("tpep_pickup_datetime"),
    col("tpep_dropoff_datetime").cast("timestamp").alias("tpep_dropoff_datetime"),
    col("passenger_count").cast("integer").alias("passenger_count"),
    col("trip_distance").cast("float").alias("trip_distance"),
    col("RatecodeID").cast("integer").alias("RatecodeID"),
    col("PULocationID").cast("integer").alias("PULocationID"),
    col("DOLocationID").cast("integer").alias("DOLocationID")
)

# Write Fact Table to Gold layer
trips_fact_df.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/yellowcab/gold/Trips_Fact")


Time Dimension

In [0]:
# Time Dimension: Time_Dimension
time_dimension_df = silver_df.select(
    col("tpep_pickup_datetime").alias("tpep_pickup_datetime"),
    date_format(col("tpep_pickup_datetime"), "yyyy-MM-dd").cast("date").alias("tpep_pickup_date"),
    col("tpep_pickup_datetime").alias("tpep_pickup_time"),
    col("tpep_dropoff_datetime").alias("tpep_dropoff_datetime"),
    date_format(col("tpep_dropoff_datetime"), "yyyy-MM-dd").cast("date").alias("tpep_dropoff_date"),
    col("tpep_dropoff_datetime").alias("tpep_dropoff_time")
)

# Write Time Dimension to Gold layer
time_dimension_df.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/yellowcab/gold/Time_Dimension")


Location Dimension

In [0]:
# Location Dimension: Location_Dimension
location_dimension_df = silver_df.select(
    col("PULocationID").cast("integer").alias("PULocationID"),
    col("DOLocationID").cast("integer").alias("DOLocationID")
)

# Write Location Dimension to Gold layer
location_dimension_df.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/yellowcab/gold/Location_Dimension")

Vendor Dimension

In [0]:
# Vendor Dimension: Vendor_Dimension
vendor_dimension_df = silver_df.select(
    col("VendorID").cast("integer").alias("VendorID")
)

# Write Vendor Dimension to Gold layer
vendor_dimension_df.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/yellowcab/gold/Vendor_Dimension")

Ratecode Dimension

In [0]:
# Ratecode Dimension: Ratecode_Dimension
ratecode_dimension_df = silver_df.select(
    col("RatecodeID").cast("integer").alias("RatecodeID")
)



Price Dimension

In [0]:




# Price Dimension: Price_Dimension
price_dimension_df = silver_df.select(
    col("_c0").cast("integer").alias("_c0"),
    col("payment_type").cast("float").alias("payment_type"),
    col("fare_amount").cast("float").alias("fare_amount"),
    col("extra").cast("float").alias("extra"),
    col("mta_tax").cast("float").alias("mta_tax"),
    col("tip_amount").cast("float").alias("tip_amount"),
    col("tolls_amount").cast("float").alias("tolls_amount"),
    col("improvement_surcharge").cast("float").alias("improvement_surcharge"),
    col("total_amount").cast("float").alias("total_amount")
)

# Write Price Dimension to Gold layer
price_dimension_df.repartition(1).write.mode("overwrite").option("header", "true").csv("/mnt/yellowcab/gold/Price_Dimension")

Data is stored in the GOLD LAYER


In [0]:
# Path to the Silver area
silver_path = "/mnt/yellowcab/gold/"

# Write the DataFrame to the Silver area as a CSV file with the specified file name
df.write.mode("overwrite").option("header", "true").csv(silver_path)

Storing a copy of all the data in the GOLD LAYER to AUDIT LAYER

In [0]:
# List all files in the gold directory
gold_files = dbutils.fs.ls("/mnt/yellowcab/gold/")

# Move each file from the gold directory to the audit directory
for file in gold_files:
    file_path = file.path
    file_name = file.name
    dbutils.fs.mv(file_path, f"/mnt/yellowcab/audit/{file_name}", recurse=True)