In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pyspark
from delta import *
import os

In [2]:

# Build Spark session with Delta configurations and Hive support
builder = SparkSession.builder.appName("silvertogold") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.io.native.lib.available", "true") \
    .config("spark.jars.packages", "org.apache.hive:hive-exec:2.3.9")  \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
# Enable Hive support explicitly and get Spark session
spark = configure_spark_with_delta_pip(builder.enableHiveSupport()).getOrCreate()

# Optionally, you can test by running a Hive query
spark.sql("CREATE TABLE IF NOT EXISTS test_hive_table (name STRING, age INT) USING hive")
spark.sql("SELECT * FROM test_hive_table")
spark.sql("select count(*) from silver.location").show()


AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

In [5]:
# List of external silver tables
silver_tables = [
    "silver.location",
    "silver.payment",
    "silver.payment_method",
    "silver.payment_status",
    "silver.request",
    "silver.trip",
    "silver.user",
    "silver.vehicle",
    "silver.vehicle_make"
] 
print(silver_tables)

['silver.location', 'silver.payment', 'silver.payment_method', 'silver.payment_status', 'silver.request', 'silver.trip', 'silver.user', 'silver.vehicle', 'silver.vehicle_make']


In [6]:
landing_location = os.path.join(os.getcwd())
bronze_location = os.path.join(landing_location, "bronze")
silver_location = fr"{os.getcwd()}/silver"
gold_location = fr"{os.getcwd()}/gold"
print(silver_tables)
print(os.getcwd())
print(bronze_location)
print(silver_location)
print(gold_location)

['silver.location', 'silver.payment', 'silver.payment_method', 'silver.payment_status', 'silver.request', 'silver.trip', 'silver.user', 'silver.vehicle', 'silver.vehicle_make']
e:\DEPIfINALpROJECT\DATABASEcSV
e:\DEPIfINALpROJECT\DATABASEcSV\bronze
e:\DEPIfINALpROJECT\DATABASEcSV/silver
e:\DEPIfINALpROJECT\DATABASEcSV/gold


In [7]:
# Verify the current schema
print("Current Schema:")
spark.sql("use silver")
spark.sql("SELECT current_database()")

# List all tables in the bronze schema
print("Tables in silver schema:")
spark.sql("SHOW TABLES IN silver").show()
spark.sql("SELECT * FROM silver.location LIMIT 1").show()
spark.sql("SELECT * FROM silver.payment LIMIT 1").show()


# Describe the history of the Delta table
print("Describing history for silver.location:")



Current Schema:
Tables in silver schema:
+---------+--------------+-----------+
|namespace|     tableName|isTemporary|
+---------+--------------+-----------+
|   silver|      location|      false|
|   silver|       payment|      false|
|   silver|payment_method|      false|
|   silver|payment_status|      false|
|   silver|       request|      false|
|   silver|          trip|      false|
|   silver|          user|      false|
|   silver|       vehicle|      false|
|   silver|  vehicle_make|      false|
+---------+--------------+-----------+

+-----------+------------+----------+-------------------+--------------------+--------------------+
|location_id|   longitude|  latitude|    processing_date|record_modified_date|         record_hash|
+-----------+------------+----------+-------------------+--------------------+--------------------+
|    1467592|-73.95970917|40.7159729|2024-10-13 11:00:08|2024-10-08 09:46:...|af7a4d88abc208dbd...|
+-----------+------------+----------+--------------

In [8]:

deltaTable = DeltaTable.forPath(spark, "silver\\location")
fullHistoryDF = deltaTable.history()
fullHistoryDF.show(truncate=False)

+-------+-----------------------+------+--------+------------+--------------------------------------------------------------------+----+--------+---------+-----------+-----------------+-------------+----------------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation   |operationParameters                                                 |job |notebook|clusterId|readVersion|isolationLevel   |isBlindAppend|operationMetrics                                                      |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+------------+--------------------------------------------------------------------+----+--------+---------+-----------+-----------------+-------------+----------------------------------------------------------------------+------------+-----------------------------------+
|3      |2024-10-13 11:00:12.904|NULL  |

In [9]:
dim_date = "gold.DIM_DATES"
dim_user = "gold.DIM_USERS"
dim_vehicle = "gold.DIM_VEHICLES"
dim_location = "gold.DIM_LOCATIONS"
fact_request = "gold.FCT_REQUESTS"

In [10]:
from pyspark.sql.functions import to_date, current_date

# Read data from each table and assign to individual variables
user_df = spark.read.table("silver.user").filter(to_date("processing_date") == current_date())
location_df = spark.read.table("silver.location").filter(to_date("processing_date") == current_date())
payment_df = spark.read.table("silver.payment").filter(to_date("processing_date") == current_date())
paymentmethod_df = spark.read.table("silver.payment_method").filter(to_date("processing_date") == current_date())
paymentstatus_df = spark.read.table("silver.payment_status").filter(to_date("processing_date") == current_date())
request_df = spark.read.table("silver.request").filter(to_date("processing_date") == current_date())
trip_df = spark.read.table("silver.trip").filter(to_date("processing_date") == current_date())
vehicle_df = spark.read.table("silver.vehicle").filter(to_date("processing_date") == current_date())
vehiclemake_df = spark.read.table("silver.vehicle_make").filter(to_date("processing_date") == current_date())

# Show each DataFrame separately
user_df.show()
location_df.show()
payment_df.show()
paymentmethod_df.show()
paymentstatus_df.show()
request_df.show()
trip_df.show()
vehicle_df.show()
vehiclemake_df.show()


+-------+-----------------+--------------------+--------------+-------------------+--------------------+--------------------+
|user_id|        full_name|               email|  phone_number|    processing_date|record_modified_date|         record_hash|
+-------+-----------------+--------------------+--------------+-------------------+--------------------+--------------------+
|  71351|       David King|kennethharris@exa...|(517) 691-9921|2024-10-13 10:59:51|2024-10-12 10:10:...|0000082bfd944061f...|
| 832921|Christine Andrews| tommy40@example.net|(495) 579-4467|2024-10-13 10:59:51|2024-10-12 10:10:...|0001055e80917d058...|
|1032184|  Sheila Bradshaw|zavalaandrea@exam...|(900) 865-9210|2024-10-13 10:59:51|2024-10-12 10:10:...|0001c0706f6abcf85...|
| 818214|    David Shields|nguyenashley@exam...|(586) 402-4084|2024-10-13 10:59:51|2024-10-12 10:10:...|0005301971817ad96...|
| 995969|   Angela Hancock|desireemiranda@ex...|(201) 251-6347|2024-10-13 10:59:51|2024-10-12 10:10:...|0007791563fb9c

In [11]:
user_df.orderBy("full_name").show()

+-------+------------+--------------------+--------------+-------------------+--------------------+--------------------+
|user_id|   full_name|               email|  phone_number|    processing_date|record_modified_date|         record_hash|
+-------+------------+--------------------+--------------+-------------------+--------------------+--------------------+
|   NULL|    --------|               -----|   -----------|2024-10-13 10:59:51|2024-10-12 10:10:...|77a8b166795c77f2e...|
| 776168|Aaron Abbott|angela67@example.com|(453) 832-3608|2024-10-13 10:59:51|2024-10-12 10:10:...|2d9db93d2a4a78acd...|
| 762535|Aaron Abbott|angela67@example.com|(453) 832-3608|2024-10-13 10:59:51|2024-10-12 10:10:...|86109191df74c0d49...|
| 768128|Aaron Abbott|angela67@example.com|(453) 832-3608|2024-10-13 10:59:51|2024-10-12 10:10:...|91414308c8c8ff694...|
| 767104|Aaron Abbott|angela67@example.com|(453) 832-3608|2024-10-13 10:59:51|2024-10-12 10:10:...|edc4190cdc0442cc0...|
| 850461|Aaron Acosta|vanessa20@

In [12]:
# Step 1: Join user_df with vehicle_df on user_id (assuming driver_id in vehicle_df relates to user_id)
user_vehicle_df = user_df.join(vehicle_df, user_df["user_id"] == vehicle_df["driver_id"], "left")

# Step 2: Join vehicle_df with vehicle_make_df on make_id to bring in make details
user_vehicle_make_df = user_vehicle_df.join(vehiclemake_df, vehicle_df["make_id"] == vehiclemake_df["make_id"], "left")

In [13]:
dim_user = user_vehicle_make_df.select(
    "user_id",
    "full_name",
    "email",
    "phone_number",
    # Replace NULLs in vehicle columns with 'Passenger' or 'N/A'
    coalesce(user_vehicle_make_df["vehicle_id"], lit("Passenger")).alias("vehicle_id"),
    coalesce(user_vehicle_make_df["make_name"], lit("Passenger")).alias("vehicle_make"),
    coalesce(user_vehicle_make_df["model"], lit("Passenger")).alias("vehicle_model"),
    coalesce(user_vehicle_make_df["year"], lit("Passenger")).alias("vehicle_year"),
    coalesce(user_vehicle_make_df["color"], lit("Passenger")).alias("vehicle_color"),
    coalesce(user_vehicle_make_df["license_plate"], lit("Passenger")).alias("vehicle_license_plate")
)




In [14]:
dim_user.orderBy("full_name").show()

+-------+------------+--------------------+--------------+----------+-------------+-------------+------------+-------------+---------------------+
|user_id|   full_name|               email|  phone_number|vehicle_id| vehicle_make|vehicle_model|vehicle_year|vehicle_color|vehicle_license_plate|
+-------+------------+--------------------+--------------+----------+-------------+-------------+------------+-------------+---------------------+
|   NULL|    --------|               -----|   -----------| Passenger|    Passenger|    Passenger|   Passenger|    Passenger|            Passenger|
| 762535|Aaron Abbott|angela67@example.com|(453) 832-3608|    168434|      Hyundai|     Santa Fe|        2006|       Silver|              3BW 116|
| 776168|Aaron Abbott|angela67@example.com|(453) 832-3608|    168434|      Hyundai|     Santa Fe|        2006|       Silver|              3BW 116|
| 768128|Aaron Abbott|angela67@example.com|(453) 832-3608|    168434|      Hyundai|     Santa Fe|        2006|       S

In [15]:

# Define the start and end date
start_date = "2015-01-01"
end_date = "2015-01-31"

# Create a Spark DataFrame with a date range using the sequence function
calendar_df = spark.sql(f"""
    SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) as full_date
""")

# Transform the DataFrame to create the calendar dimension
dim_calendar = calendar_df.select(
    date_format("full_date", "yyyyMMdd").alias("date_key"),  # Convert to string format YYYYMMDD
    col("full_date"),
    dayofmonth("full_date").alias("day"),  # Day of the month
    date_format("full_date", "EEEE").alias("day_name"),  # Full name of the day
    dayofweek("full_date").alias("day_of_week"),  # Day of the week (1 = Sunday, 7 = Saturday)
    weekofyear("full_date").alias("week_of_year"),  # Week of the year
    month("full_date").alias("month"),  # Month (1-12)
    date_format("full_date", "MMMM").alias("month_name"),  # Full name of the month
    quarter("full_date").alias("quarter"),  # Quarter (1-4)
    year("full_date").alias("year"),  # Year
    when(dayofweek("full_date").isin([1, 7]), 1).otherwise(0).alias("is_weekend"),  # Weekend flag (1=weekend)
    lit(0).alias("is_holiday"),  # Dummy holiday flag (replace with actual holiday logic)
    month("full_date").alias("fiscal_month"),  # Fiscal month (you can customize this logic)
    quarter("full_date").alias("fiscal_quarter"),  # Fiscal quarter (you can customize this logic)
    year("full_date").alias("fiscal_year")  # Fiscal year
)

dim_calendar.printSchema()
# Show the resulting calendar dimension DataFrame



root
 |-- date_key: string (nullable = false)
 |-- full_date: date (nullable = false)
 |-- day: integer (nullable = false)
 |-- day_name: string (nullable = false)
 |-- day_of_week: integer (nullable = false)
 |-- week_of_year: integer (nullable = false)
 |-- month: integer (nullable = false)
 |-- month_name: string (nullable = false)
 |-- quarter: integer (nullable = false)
 |-- year: integer (nullable = false)
 |-- is_weekend: integer (nullable = false)
 |-- is_holiday: integer (nullable = false)
 |-- fiscal_month: integer (nullable = false)
 |-- fiscal_quarter: integer (nullable = false)
 |-- fiscal_year: integer (nullable = false)



In [16]:
dim_calendar.show(8)

+--------+----------+---+---------+-----------+------------+-----+----------+-------+----+----------+----------+------------+--------------+-----------+
|date_key| full_date|day| day_name|day_of_week|week_of_year|month|month_name|quarter|year|is_weekend|is_holiday|fiscal_month|fiscal_quarter|fiscal_year|
+--------+----------+---+---------+-----------+------------+-----+----------+-------+----+----------+----------+------------+--------------+-----------+
|20150101|2015-01-01|  1| Thursday|          5|           1|    1|   January|      1|2015|         0|         0|           1|             1|       2015|
|20150102|2015-01-02|  2|   Friday|          6|           1|    1|   January|      1|2015|         0|         0|           1|             1|       2015|
|20150103|2015-01-03|  3| Saturday|          7|           1|    1|   January|      1|2015|         1|         0|           1|             1|       2015|
|20150104|2015-01-04|  4|   Sunday|          1|           1|    1|   January|     

In [19]:
paymentmethod_df.show()
paymentstatus_df.show()

+-----------------+-----------+-------------------+--------------------+--------------------+
|payment_method_id|method_name|    processing_date|record_modified_date|         record_hash|
+-----------------+-----------+-------------------+--------------------+--------------------+
|                4| Debit Card|2024-10-13 11:00:16|2024-10-08 09:47:...|accdddad95307c5e2...|
|                3|Credit Card|2024-10-13 11:00:16|2024-10-08 09:47:...|401fadf0a8be9e836...|
|             NULL| ----------|2024-10-13 11:00:16|2024-10-08 09:47:...|590ccb73c8a4de29c...|
|                2|       Cash|2024-10-13 11:00:16|2024-10-08 09:47:...|9ca1ad9df92223c3a...|
|                5| Google Pay|2024-10-13 11:00:16|2024-10-08 09:47:...|ae41fb53886da4122...|
|                1|  Apple Pay|2024-10-13 11:00:16|2024-10-08 09:47:...|6eee5d2acc6136126...|
|                6|     PayPal|2024-10-13 11:00:16|2024-10-08 09:47:...|f9dba2d625d63fdc5...|
+-----------------+-----------+-------------------+---------

In [32]:
# Perform a cross join to get all combinations of PaymentMethod and PaymentStatus
combined_df = paymentmethod_df.crossJoin(paymentstatus_df)

# Add an incremental surrogate key
final_df = combined_df.withColumn("s_payment", monotonically_increasing_id())

# Select only the relevant columns (SurrogateKey, PaymentMethodID, PaymentStatusID)
final_df = final_df.select("s_payment", "payment_method_id", "method_name", "payment_status_id", "status_name")

# Show the resulting DataFrame
final_df.show(30)

+---------+-----------------+-----------+-----------------+-----------+
|s_payment|payment_method_id|method_name|payment_status_id|status_name|
+---------+-----------------+-----------+-----------------+-----------+
|        0|                4| Debit Card|             NULL| ----------|
|        1|                4| Debit Card|                1|  Completed|
|        2|                4| Debit Card|                4|   Refunded|
|        3|                4| Debit Card|                2|     Failed|
|        4|                4| Debit Card|                3|    Pending|
|        5|                3|Credit Card|             NULL| ----------|
|        6|                3|Credit Card|                1|  Completed|
|        7|                3|Credit Card|                4|   Refunded|
|        8|                3|Credit Card|                2|     Failed|
|        9|                3|Credit Card|                3|    Pending|
|       10|             NULL| ----------|             NULL| ----

In [28]:
payment_df.show()

+----------+-----------------+-----------------+-------------------+--------------------+--------------------+
|payment_id|payment_method_id|payment_status_id|    processing_date|record_modified_date|         record_hash|
+----------+-----------------+-----------------+-------------------+--------------------+--------------------+
|         3|                3|                1|2024-10-13 11:00:14|2024-10-08 09:47:...|1a95dbbced60215bb...|
|         4|                4|                3|2024-10-13 11:00:14|2024-10-08 09:47:...|99d4eeb3dd24aad8d...|
|         5|                5|                2|2024-10-13 11:00:14|2024-10-08 09:47:...|f9329e0213a26e028...|
|         2|                2|                2|2024-10-13 11:00:14|2024-10-08 09:47:...|dafb1a04ed4cadc83...|
|         3|                3|                3|2024-10-13 11:00:14|2024-10-08 09:47:...|28c9404ec4f0649b5...|
|      NULL|             NULL|             NULL|2024-10-13 11:00:14|2024-10-08 09:47:...|e3b0c44298fc1c149...|
|

In [34]:
# Perform the join operation
dim_payment = payment_df.alias("p").join(
    combined_df.alias("c"),
    (col("p.payment_method_id") == col("c.payment_method_id")) &
    (col("p.payment_status_id") == col("c.payment_status_id")),
    "inner"  # Use "inner" for only matching records
).select(
    col("p.payment_id").alias("s_payment"),  # Select from payment_df
    col("c.method_name").alias("method"),  # Select from combined_df
    col("c.status_name").alias("status")  # Select from combined_df
)

# Show the resulting DataFrame
dim_payment.show(50)

+---------+-----------+---------+
|s_payment|     method|   status|
+---------+-----------+---------+
|        4| Debit Card|Completed|
|        4| Debit Card| Refunded|
|        4| Debit Card|   Failed|
|        4| Debit Card|  Pending|
|        3|Credit Card|Completed|
|        3|Credit Card| Refunded|
|        3|Credit Card|   Failed|
|        3|Credit Card|  Pending|
|        2|       Cash|Completed|
|        2|       Cash| Refunded|
|        2|       Cash|   Failed|
|        2|       Cash|  Pending|
|        5| Google Pay|Completed|
|        5| Google Pay| Refunded|
|        5| Google Pay|   Failed|
|        5| Google Pay|  Pending|
|        1|  Apple Pay|Completed|
|        1|  Apple Pay| Refunded|
|        1|  Apple Pay|   Failed|
|        1|  Apple Pay|  Pending|
|        6|     PayPal|Completed|
|        6|     PayPal| Refunded|
|        6|     PayPal|   Failed|
|        6|     PayPal|  Pending|
+---------+-----------+---------+



In [38]:
dim_location = location_df.select("location_id", "latitude", "longitude")
dim_location.show()

+-----------+-----------+------------+
|location_id|   latitude|   longitude|
+-----------+-----------+------------+
|    1454452|40.75437164|-73.97747803|
|    1709866|40.76332855|-73.98121643|
|    1763732|40.80134964|-73.95018005|
|    1029047|40.64883804|-73.78248596|
|    1174888|40.74873352| -73.9761734|
|     988677|40.72956085|-74.00376892|
|    1253988|40.71712875|-73.99546814|
|    1723771|40.77020264|-73.99108887|
|    1234159|40.75824356|-73.97527313|
|     831287|40.73840332|-74.03143311|
|    1123221|40.75576019|-73.97451019|
|     916300|40.72474289|-73.99191284|
|     806681|40.71231461| -73.9659729|
|    1151847|40.73068619|-73.99085999|
|    1734518| 40.7698822|-73.95088959|
|    1226143|40.75824738|-74.00082397|
|    1754744|40.77654266|-73.98943329|
|    1733555|40.78443146|-73.97750092|
|    1398635|40.76130676| -73.9654007|
|    1514648|  40.771595|-73.95709229|
+-----------+-----------+------------+
only showing top 20 rows



In [41]:
request_df.show(5)
trip_df.show(5)

+----------+------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+
|request_id|passenger_id|pickup_location_id|dropoff_location_id|       request_time|        accept_time|    processing_date|record_modified_date|         record_hash|
+----------+------------+------------------+-------------------+-------------------+-------------------+-------------------+--------------------+--------------------+
|    447450|      224189|            376530|             338646|2015-01-04 15:28:00|2015-01-04 15:30:00|2024-10-13 11:00:19|2024-10-08 09:47:...|02018d1c2a52d91dd...|
|    433893|      217445|            562731|             567647|2015-01-29 12:34:00|2015-01-29 12:44:00|2024-10-13 11:00:19|2024-10-08 09:47:...|eef0dbb103d3fbc24...|
|    178990|       88035|            544948|             513195|2015-01-30 17:50:00|2015-01-30 17:51:00|2024-10-13 11:00:19|2024-10-08 09:47:...|536eeb49f7dca596d...

In [52]:
# Join the DataFrames
fact_request = request_df.join(trip_df, request_df["request_id"] == trip_df["request_id"], "left") \
    .join(dim_payment, trip_df["payment_id"] == dim_payment["s_payment"], "left") \
    .select(
        trip_df["driver_id"],
        request_df["passenger_id"],
        request_df["pickup_location_id"],
        request_df["dropoff_location_id"],
        date_format(request_df["request_time"], 'yyyyMMdd').alias("request_datekey"),  # Renamed and formatted
        date_format(request_df["accept_time"], 'yyyyMMdd').alias("accept_datekey"),    # Renamed and formatted
        date_format(trip_df["trip_start_time"], 'yyyyMMdd').alias("trip_start_datekey"),   # Renamed
        date_format(trip_df["trip_end_time"], 'yyyyMMdd').alias("trip_end_datekey"),       # Renamed
        dim_payment["s_payment"],
        trip_df["trip_distance"],
        trip_df["base_fare"],
        trip_df["extra_fare"],
        trip_df["mta_tax"],
        trip_df["tip_amount"],
        trip_df["tolls_amount"],
        trip_df["improvement_surcharge"]
    )


# Show the resulting DataFrame
fact_request.show(10)

+---------+------------+------------------+-------------------+---------------+--------------+------------------+----------------+---------+-------------+---------+----------+-------+----------+------------+---------------------+
|driver_id|passenger_id|pickup_location_id|dropoff_location_id|request_datekey|accept_datekey|trip_start_datekey|trip_end_datekey|s_payment|trip_distance|base_fare|extra_fare|mta_tax|tip_amount|tolls_amount|improvement_surcharge|
+---------+------------+------------------+-------------------+---------------+--------------+------------------+----------------+---------+-------------+---------+----------+-------+----------+------------+---------------------+
|   951731|         801|            384446|             313498|       20150102|      20150102|          20150102|        20150102|        6|          0.5|      4.0|       1.0|    0.5|       0.0|         0.0|                  0.0|
|   951731|         801|            384446|             313498|       20150102| 

In [1]:
dataframes = [dim_calendar, dim_location, dim_payment, dim_user, fact_request]
# Names of the original DataFrames
dataframe_names = ["dim_calendar", "dim_location", "dim_payment", "dim_user", "fact_request"]

NameError: name 'dim_calendar' is not defined

In [None]:
processing_date = date_trunc('second', current_timestamp())

for i, df in enumerate(dataframes):
    # Apply column name conversion
    df = df.withColumn("processing_date", processing_date)

    # Reassign the modified DataFrame back to the original global variable
    globals()[dataframe_names[i]] = df
    
    # Show the first 5 rows of the updated DataFrame
trip_df.show()

In [None]:

# List of DataFrames, locations, and hash columns with an attribute for slowly changing dimension (SCD)
tables_info = [
    {"df": dim_user, "location": f"{gold_location}/DIM_USER", "hash_columns": ["user_id", "full_name"], "is_scd": True},
    {"df": dim_calendar, "location": f"{gold_location}/DIM_CALENDARr", "hash_columns": ["date_key"], "is_scd": False},
    {"df": dim_location, "location": f"{gold_location}/DIM_LOCATION", "hash_columns": ["location_id"], "is_scd": False},
    {"df": dim_payment, "location": f"{gold_location}/DIM_PAYMENT", "hash_columns": ["s_payment"], "is_scd": False},
    {"df": fact_request, "location": f"{gold_location}/FCT_REQUEST", "hash_columns": ["driver_id", "passenger_id", "request_datekey"], "is_scd": False}
]

def deduplicate_source_df(source_df: DataFrame, hash_columns: list) -> DataFrame:
    """
    Deduplicates the source DataFrame based on the hash columns.
    
    Args:
        source_df (DataFrame): The source DataFrame to be deduplicated.
        hash_columns (list): List of columns to be used for deduplication.
        
    Returns:
        DataFrame: The deduplicated DataFrame.
    """
    return source_df.dropDuplicates(hash_columns)

# Function to generate a hash column for specified columns in a DataFrame
def generate_hash_column(df: DataFrame, columns: list, hash_column_name: str = "record_hash") -> DataFrame:
    """
    Generates a hash column for specified columns in the DataFrame using SHA-256.
    """
    return df.withColumn(hash_column_name, sha2(concat_ws("||", *columns), 256))

# Function to build merge condition using hash
def build_merge_condition_with_hash(target_alias: str, source_alias: str, hash_column: str) -> str:
    """
    Builds a merge condition string using the hash column.
    """
    return f"{target_alias}.{hash_column} = {source_alias}.{hash_column}"

# Function to merge Delta tables based on SCD flag
def merge_delta_table(source_df: DataFrame, delta_location: str, hash_columns: list, is_scd: bool):
    """
    Performs merge for slowly changing dimension (SCD) tables. If is_scd is False, data is simply overwritten.
    """
    # Generate hash column for the source DataFrame
    source_df_hashed = generate_hash_column(source_df, hash_columns)
    
    # If the table is an SCD, perform a merge operation
    if is_scd:
        try:
            # Check if the Delta table exists at the specified location
            if DeltaTable.isDeltaTable(spark, delta_location):
                print(f"Delta table found at {delta_location}. Proceeding with merge...")
                delta_table = DeltaTable.forPath(spark, delta_location)
                
                # Alias for target and source
                target_alias = "target"
                source_alias = "src"
                
                # Generate hash column for the target Delta table
                delta_table_df = spark.read.format("delta").load(delta_location)
                delta_table_hashed = generate_hash_column(delta_table_df, hash_columns)
                
                # Build the merge condition using the hash column
                merge_condition = build_merge_condition_with_hash(target_alias, source_alias, "record_hash")
                
                # Perform the merge operation
                delta_table.alias(target_alias).merge(
                    source=source_df_hashed.alias(source_alias),
                    condition=merge_condition
                ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
                
                print(f"Merge completed for {delta_location}.")
            else:
                # If Delta table doesn't exist, write the DataFrame to that location
                print(f"No Delta table found at {delta_location}. Writing new DataFrame...")
                source_df_hashed.write.mode("overwrite").format("delta").save(delta_location)
                print(f"DataFrame written to {delta_location}.")
        except Exception as e:
            print(f"Error processing table at {delta_location}: {e}")
    else:
        # For non-SCD tables, just overwrite the data
        print(f"{delta_location} is not an SCD table. Overwriting...")
        source_df_hashed.write.mode("overwrite").format("delta").save(delta_location)
        print(f"DataFrame overwritten at {delta_location}.")

In [None]:
# Loop through each table and perform the operations
for table_info in tables_info:
    # Print the schema of the source DataFrame
    print("Schema of source DataFrame:")
    table_info["df"].printSchema()  # Ensure we are printing the schema of the specific DataFrame

    # Deduplicate the source DataFrame based on hash columns
    df = deduplicate_source_df(table_info["df"], table_info["hash_columns"])
    
    # Prepare the Delta table location and SCD flag
    delta_location = table_info["location"]
    hash_columns = table_info["hash_columns"]
    is_scd = table_info["is_scd"]  # Check if it's a slowly changing dimension

    # Perform merge or overwrite based on is_scd flag
    merge_delta_table(df, delta_location, hash_columns, is_scd)

In [None]:
spark.read.format("delta").load(f"{gold_location}/user").orderBy("full_name").show()

In [None]:
# After your write_to_delta function
for table in dataframes:
    
    print(f"Creating external table: {table}")
    # Creating schema and external table in Spark SQL
    spark.sql("CREATE SCHEMA IF NOT EXISTS gold")
    # Set the current database to 'gold'
    spark.sql("USE gold")
    delta_table_path = f"{gold_location}\\{table}"
    spark.sql(f"CREATE EXTERNAL TABLE IF NOT EXISTS {table} USING DELTA LOCATION r'{delta_table_path}'")
    #spark.sql(f"SELECT * FROM {table_name}").show()

In [None]:
from delta.tables import DeltaTable

# Dictionary to store the DeltaTable objects
delta_tables = {}

for table in dataframes:
    # Define the table location
    table_location = f"{gold_location}/{table}"
    
    # Store the DeltaTable object in the dictionary using the file name as the key
    delta_tables[table] = DeltaTable.forPath(spark, table_location)  # Load table from path

    # Check if the table has been vacuumed in the last 30 days
    if delta_tables[table].history(30).filter("operation = 'VACUUM START'").count() == 0:
        # Optimize the table for better query performance
        delta_tables[table].optimize()
        # Perform vacuum operation (default is to keep data for 7 days)
        delta_tables[table].vacuum()


In [63]:
# Join the DataFrames with aliases for clarity
"""
fact_request_with_user_payment = fact_request.alias("fr") \
    .join(dim_user.alias("du"), col("fr.driver_id") == col("du.user_id"), "left") \
    .join(dim_user.alias("pu"), col("fr.passenger_id") == col("pu.user_id"), "left") \
    .join(dim_payment.alias("dp"), col("fr.s_payment") == col("dp.s_payment"), "left") \
    .select(
        col("fr.driver_id"),
        col("fr.passenger_id"),
        col("fr.pickup_location_id"),
        col("fr.dropoff_location_id"),
        col("fr.request_datekey"),
        col("fr.accept_datekey"),
        col("fr.trip_start_datekey"),
        col("fr.trip_end_datekey"),
        col("fr.s_payment"),
        col("fr.trip_distance"),
        col("fr.base_fare"),
        col("fr.extra_fare"),
        col("fr.mta_tax"),
        col("fr.tip_amount"),
        col("fr.tolls_amount"),
        col("fr.improvement_surcharge"),
        col("du.full_name").alias("driver_name"),
        col("du.email").alias("driver_email"),
        col("du.phone_number").alias("driver_phone"),
        col("du.vehicle_id").alias("driver_vehicle_id"),
        col("du.vehicle_make").alias("driver_vehicle_make"),
        col("du.vehicle_model").alias("driver_vehicle_model"),
        col("du.vehicle_year").alias("driver_vehicle_year"),
        col("du.vehicle_color").alias("driver_vehicle_color"),
        col("du.vehicle_license_plate").alias("driver_vehicle_license_plate"),
        col("pu.full_name").alias("passenger_name"),
        col("pu.email").alias("passenger_email"),
        col("pu.phone_number").alias("passenger_phone"),
        col("dp.method").alias("payment_method"),
        col("dp.status").alias("payment_status")
    )

# Show the resulting DataFrame
fact_request_with_user_payment.orderBy("driver_name").show(10)
"""


+---------+------------+------------------+-------------------+---------------+--------------+------------------+----------------+---------+-------------+---------+----------+-------+----------+------------+---------------------+------------+--------------------+--------------+-----------------+-------------------+--------------------+-------------------+--------------------+----------------------------+--------------+--------------------+---------------+--------------+--------------+
|driver_id|passenger_id|pickup_location_id|dropoff_location_id|request_datekey|accept_datekey|trip_start_datekey|trip_end_datekey|s_payment|trip_distance|base_fare|extra_fare|mta_tax|tip_amount|tolls_amount|improvement_surcharge| driver_name|        driver_email|  driver_phone|driver_vehicle_id|driver_vehicle_make|driver_vehicle_model|driver_vehicle_year|driver_vehicle_color|driver_vehicle_license_plate|passenger_name|     passenger_email|passenger_phone|payment_method|payment_status|
+---------+---------