#Bronze Stage ETL Process Using PySpark and Databricks

Step 1: Load Data into DataFrames
Bronze Stage ETL Process Using
PySpark and Databricks
Initialize a Spark session.
Define a schema for the CSV data.
Read data into a DataFrame with the specified schema.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import StructType, StructField, IntegerType,StringType, DateType

In [0]:
# Initialize SparkSession with the necessary option to handle S3
spark = SparkSession.builder.appName("AWS S3 Integration and Data Processing").getOrCreate()

In [0]:
# Set AWS S3 access keys securely

spark._jsc.hadoopConfiguration().set("fs.s3a.access.key",
"AKIAZG4APFAQRRDBUO65")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key",
"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")

In [0]:
# Define S3 bucket and file paths
bucket_name = "iiht-ntt-24"
customer_file_path = f"s3a://{bucket_name}/data.csv/Customer.csv"

Define the schema for the Customer CSV based on the initial definition provided

In [0]:
customer_schema = StructType([


    StructField("CustomerID",
IntegerType(), True),


    StructField("CustomerName",
StringType(), True),


    StructField("BillToCustomerID",
StringType(), True),


    StructField("CustomerCategoryID",
StringType(), True),


    StructField("BuyingGroupID",
StringType(), True),


    StructField("PrimaryContactPersonID",
StringType(), True),


    StructField("PostalCityID",
StringType(), True),


    StructField("ValidFrom",
DateType(), True),


    StructField("ValidTo",
DateType(), True),


    StructField("LineageKey",
IntegerType(), True)


])

Step 4: Data Preparation
Perform any necessary data cleaning or transformations required before joining the tables.Map raw data columns to the format required for downstream processing.


In [0]:
# Read the Customer CSV file into a DataFrame using the defined schema
customer_df = spark.read.format("csv").schema(customer_schema).load("/FileStore/tables/customer_data.csv")
customer_df.show()

+----------+------------+----------------+------------------+-------------+----------------------+------------+----------+----------+----------+
|CustomerID|CustomerName|BillToCustomerID|CustomerCategoryID|BuyingGroupID|PrimaryContactPersonID|PostalCityID| ValidFrom|   ValidTo|LineageKey|
+----------+------------+----------------+------------------+-------------+----------------------+------------+----------+----------+----------+
|      null|CustomerName|BillToCustomerID|CustomerCategoryID|BuyingGroupID|  PrimaryContactPer...|PostalCityID|      null|      null|      null|
|         1|  Alice Corp|              10|               100|         1000|                   101|        9001|2023-01-01|2024-01-01|         9|
|         2|     Bob LLC|              20|               200|         2000|                   202|        9002|2023-02-01|2024-02-01|         9|
|         3| Charlie Inc|              30|               300|         3000|                   303|        9003|2023-03-01|2024-03-

In the data preparation step, Transform the DataFrame to match our target schema, renaming columns where necessary. 
The LineageKey column is set to a static value to help with data lineage and tracking.


In [0]:

# Transform DataFrame
customer_df_transformed = customer_df.selectExpr(
    "CustomerID as CustomerKey",
    "CustomerID as WWICustomerID",
    "CustomerName as Customer",
    "BillToCustomerID as BillToCustomer",
    "CustomerCategoryID as Category",
    "BuyingGroupID as BuyingGroup",
    "PrimaryContactPersonID as PrimaryContact",
    "PostalCityID as PostalCode",
    "ValidFrom", "ValidTo",
    "LineageKey"
).withColumn("LineageKey", lit(9))

customer_df.show(10)

+----------+------------+----------------+------------------+-------------+----------------------+------------+----------+----------+----------+
|CustomerID|CustomerName|BillToCustomerID|CustomerCategoryID|BuyingGroupID|PrimaryContactPersonID|PostalCityID| ValidFrom|   ValidTo|LineageKey|
+----------+------------+----------------+------------------+-------------+----------------------+------------+----------+----------+----------+
|      null|CustomerName|BillToCustomerID|CustomerCategoryID|BuyingGroupID|  PrimaryContactPer...|PostalCityID|      null|      null|      null|
|         1|  Alice Corp|              10|               100|         1000|                   101|        9001|2023-01-01|2024-01-01|         9|
|         2|     Bob LLC|              20|               200|         2000|                   202|        9002|2023-02-01|2024-02-01|         9|
|         3| Charlie Inc|              30|               300|         3000|                   303|        9003|2023-03-01|2024-03-

Step 5: Save or Export Results
Persist the prepared data into a table for use in subsequent ETL stages.
save the prepared DataFrame to a table in the Spark SQL catalog named BronzeCustomer. The mode("overwrite") option ensures we replace any existing data with our latest DataFrame

In [0]:
table_name = "SilverCustomer1"

try:
    # Check if table exists in the catalog
    if table_name in [table.name for table in spark.catalog.listTables()]:
        # Drop the table and delete its directory
        spark.sql(f"DROP TABLE IF EXISTS {table_name}")
        spark._jvm.org.apache.hadoop.fs.FileSystem.get(
            spark._jsc.hadoopConfiguration()
        ).delete(spark._jvm.org.apache.hadoop.fs.Path("dbfs:/user/hive/warehouse/silvercustomer1"), True)

    # Write the transformed DataFrame to the table
    customer_df_transformed.write.format("parquet").mode("overwrite").saveAsTable(table_name)
    print(f"Table {table_name} created/overwritten successfully.")

except Exception as e:
    print(f"Failed to create or replace table {table_name} due to: {e}")


Table SilverCustomer1 created/overwritten successfully.


In [0]:
customer_df_transformed.show()

+-----------+-------------+------------+----------------+------------------+-------------+--------------------+------------+----------+----------+----------+
|CustomerKey|WWICustomerID|    Customer|  BillToCustomer|          Category|  BuyingGroup|      PrimaryContact|  PostalCode| ValidFrom|   ValidTo|LineageKey|
+-----------+-------------+------------+----------------+------------------+-------------+--------------------+------------+----------+----------+----------+
|       null|         null|CustomerName|BillToCustomerID|CustomerCategoryID|BuyingGroupID|PrimaryContactPer...|PostalCityID|      null|      null|         9|
|          1|            1|  Alice Corp|              10|               100|         1000|                 101|        9001|2023-01-01|2024-01-01|         9|
|          2|            2|     Bob LLC|              20|               200|         2000|                 202|        9002|2023-02-01|2024-02-01|         9|
|          3|            3| Charlie Inc|            

#Silver Stage ETL Process Using PySpark and Databricks



The Silver stage in an ETL process represents the refinement of extracted data. Data from the Bronze layer is cleansed, transformed, and enriched to create a reliable dataset for analysis and reporting.


In [0]:
#Step 1: Load Data into DataFrames
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# Initialize SparkSession with necessary options to handle S3
spark = SparkSession.builder.appName("AWS S3 Integration and Data Processing").getOrCreate()


In [0]:
# Set AWS S3 access keys securely (use environment variables or Databricks secrets in production)
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "AKIAZG4APFAQRRDBUO65")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "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")

In [0]:
# Define S3 bucket and file paths
bucket_name = "iiht-ntt-24"
sale_order_file_path = f"s3a://{bucket_name}/data.csv/SaleOrder.csv"

In [0]:
# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","


Start by setting up the SparkSession, which is essential for any data processing in Spark. We then define the location and format of our source data (Sales data CSV file), load it into a DataFrame without schema inference (to speed up the process), and display it for a quick check

In [0]:
# CSV options
infer_schema = True  # Changed to boolean
first_row_is_header = True  # Changed to boolean
delimiter = ","  # This remains a string

# Try block for processing and handling the Sales data
try:
    # Load the SaleOrder.csv file from S3 into a DataFrame
    dfSales = spark.read.format("csv") \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load("/FileStore/tables/sales_orders.csv")

    # Display the DataFrame to verify contents
    dfSales.show()

except Exception as e:
    print(f"Failed to load data from S3 due to: {e}")


+-------+----------+---------+----------+--------+---------+--------+-----------+----------------+----------+
|OrderID|CustomerID|ProductID| OrderDate|Quantity|UnitPrice|Discount|TotalAmount| ShippingAddress|    Status|
+-------+----------+---------+----------+--------+---------+--------+-----------+----------------+----------+
|   1001|    2001.0|    101.0|2024-01-01|     5.0|     20.0|     0.1|       90.0|123 Street, City|   Shipped|
|   1002|    2002.0|    102.0|2024-01-05|     3.0|     15.0|    null|       45.0|456 Avenue, City|Processing|
|   1003|    2003.0|    103.0|2024-01-10|    null|     30.0|    0.05|       85.5|  789 Blvd, City|  Canceled|
|   1004|    2004.0|    104.0|2024-01-12|     7.0|     25.0|    null|      175.0|123 Street, City|   Shipped|
|   1005|    2005.0|     null|2024-01-15|     2.0|     40.0|     0.1|       null|456 Avenue, City|Processing|
|   1006|      null|    106.0|2024-01-20|     4.0|     22.5|     0.2|       72.0|  789 Blvd, City|   Shipped|
|   1007| 

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

# CSV options
infer_schema = "true"  # Set as string for options
first_row_is_header = "true"  # Set as string for options
delimiter = ","  # This remains a string

# Step 1: Load CSV file into a DataFrame
try:
    # Load the SaleOrder.csv file from Databricks FileStore into a DataFrame
    dfSales = spark.read.format("csv") \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load("/FileStore/tables/sales_orders.csv")

    # Display the DataFrame to verify contents
    dfSales.show()

except Exception as e:
    print(f"Failed to load data from CSV file due to: {e}")

# Step 2: Create or Replace SalesTable in the Database
try:
    # Handle the SalesTable creation or replacement
    table_name = "SalesTable"
    warehouse_location = spark.conf.get("spark.sql.warehouse.dir", "dbfs:/FileStore/tables")  
    table_path = f"{warehouse_location}/{table_name.lower()}"

    # Check if the table path exists and remove it if so
    try:
        if any(dbutils.fs.ls(table_path)):  # Checks if path contains files
            dbutils.fs.rm(table_path, recurse=True)
    except Exception:
        print(f"Path {table_path} does not exist or couldn't be listed. Proceeding to create table.")

    # Drop the table if it exists
    if spark.catalog.tableExists(table_name):
        spark.sql(f"DROP TABLE IF EXISTS {table_name}")

    # Write the DataFrame to a parquet table
    dfSales.write.format("parquet").mode("overwrite").option("path", table_path).saveAsTable(table_name)
    print(f"Table {table_name} created successfully.")

except Exception as e:
    print(f"An error occurred while processing Sales data: {e}")

# Step 3: Delta Table Operations
try:
    # Load the permanent table created previously
    permanent_table_df = spark.table("SilverCustomer1")

    # Filter rows where CustomerKey is not null
    silver_customers_df = permanent_table_df.filter(col("CustomerKey").isNotNull())

    # Display the filtered DataFrame
    silver_customers_df.show()

except Exception as e:
    print(f"An error occurred during Delta table operations: {e}")


+-------+----------+---------+----------+--------+---------+--------+-----------+----------------+----------+
|OrderID|CustomerID|ProductID| OrderDate|Quantity|UnitPrice|Discount|TotalAmount| ShippingAddress|    Status|
+-------+----------+---------+----------+--------+---------+--------+-----------+----------------+----------+
|   1001|    2001.0|    101.0|2024-01-01|     5.0|     20.0|     0.1|       90.0|123 Street, City|   Shipped|
|   1002|    2002.0|    102.0|2024-01-05|     3.0|     15.0|    null|       45.0|456 Avenue, City|Processing|
|   1003|    2003.0|    103.0|2024-01-10|    null|     30.0|    0.05|       85.5|  789 Blvd, City|  Canceled|
|   1004|    2004.0|    104.0|2024-01-12|     7.0|     25.0|    null|      175.0|123 Street, City|   Shipped|
|   1005|    2005.0|     null|2024-01-15|     2.0|     40.0|     0.1|       null|456 Avenue, City|Processing|
|   1006|      null|    106.0|2024-01-20|     4.0|     22.5|     0.2|       72.0|  789 Blvd, City|   Shipped|
|   1007| 

Retrieve and utilize existing permanent tables in the data processing.

We extract the pre-existing "SilverCustomer" table from the data warehouse, filter it to exclude any records with null CustomerKey values, and create a temporary view for subsequent SQL operations or additional transformations

In [0]:
# Step 2: Join Tables
try:
    # Handle the SalesTable creation or replacement
    table_name = "SalesTable"
    warehouse_location = spark.conf.get("spark.sql.warehouse.dir", "dbfs:/FileStore/tables")  
    table_path = f"{warehouse_location}/{table_name.lower()}"

    # Check if the directory exists and clear it
    if dbutils.fs.ls(table_path):
        dbutils.fs.rm(table_path, recurse=True)
    
    if spark.catalog.tableExists(table_name):
        spark.sql(f"DROP TABLE IF EXISTS {table_name}")

    dfSales.write.format("parquet").mode("overwrite").option("path", table_path).saveAsTable(table_name)
    print(f"Table {table_name} created successfully.")

except Exception as e:
    print(f"An error occurred while processing Sales data: {e}")

# Try block for handling Delta table operations
try:
    # Load the permanent table created previously
    permanent_table_df = spark.table("SilverCustomer1")

    # Filter rows where CustomerKey is not null
    silver_customers_df = permanent_table_df.filter(col("CustomerKey").isNotNull())

    # Display the filtered DataFrame
    silver_customers_df.show()

except Exception as e:
    print(f"An error occurred during Delta table operations: {e}")


Table SalesTable created successfully.
+-----------+-------------+-----------+--------------+--------+-----------+--------------+----------+----------+----------+----------+
|CustomerKey|WWICustomerID|   Customer|BillToCustomer|Category|BuyingGroup|PrimaryContact|PostalCode| ValidFrom|   ValidTo|LineageKey|
+-----------+-------------+-----------+--------------+--------+-----------+--------------+----------+----------+----------+----------+
|          1|            1| Alice Corp|            10|     100|       1000|           101|      9001|2023-01-01|2024-01-01|         9|
|          2|            2|    Bob LLC|            20|     200|       2000|           202|      9002|2023-02-01|2024-02-01|         9|
|          3|            3|Charlie Inc|            30|     300|       3000|           303|      9003|2023-03-01|2024-03-01|         9|
|          4|            4|  Delta Ltd|            40|     400|       4000|           404|      9004|2023-04-01|2024-04-01|         9|
+-----------+---

Here we're taking our filtered customer data and storing it as a Delta table for fast, reliable, and versioned access to the dataset. Additionally, the raw sales data is stored in a permanent table named "SalesTable" for future analysis and reporting


In [0]:
# Step 3: Save or Export Results
try:
    # Create a temporary view or register DataFrame as a temporary table
    silver_customers_df.createOrReplaceTempView("temp_silver_customers_table")

    # Define the Delta table path
    delta_table_location = "/FileStore/tables/temp_silver_customers_table_delta"
    dbutils.fs.rm(delta_table_location, True)  # Ensure the directory is empty

    # Save the DataFrame as a Delta table
    silver_customers_df.write.format("delta").mode("overwrite").save(delta_table_location)
    print(f"Delta table saved at {delta_table_location}")

    # Access and display the saved Delta table
    delta_table_df = spark.read.format("delta").load(delta_table_location)
    delta_table_df.show()

except Exception as e:
    print(f"An error occurred while saving the Delta table: {e}")


Delta table saved at /FileStore/tables/temp_silver_customers_table_delta
+-----------+-------------+-----------+--------------+--------+-----------+--------------+----------+----------+----------+----------+
|CustomerKey|WWICustomerID|   Customer|BillToCustomer|Category|BuyingGroup|PrimaryContact|PostalCode| ValidFrom|   ValidTo|LineageKey|
+-----------+-------------+-----------+--------------+--------+-----------+--------------+----------+----------+----------+----------+
|          1|            1| Alice Corp|            10|     100|       1000|           101|      9001|2023-01-01|2024-01-01|         9|
|          2|            2|    Bob LLC|            20|     200|       2000|           202|      9002|2023-02-01|2024-02-01|         9|
|          3|            3|Charlie Inc|            30|     300|       3000|           303|      9003|2023-03-01|2024-03-01|         9|
|          4|            4|  Delta Ltd|            40|     400|       4000|           404|      9004|2023-04-01|2024-

#Gold Stage ETL Process Using PySpark and Databricks

Initialize a Spark session.
Define a complex schema for sales data.
Create an empty DataFrame with a predefined schema to store sales orders

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, DateType
from pyspark.sql.functions import lit, udf
import uuid

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Create Gold Table in Databricks PySpark") \
    .getOrCreate()

# Define the schema without spaces in field names
schema = StructType([
    StructField("OrderKey", StringType(), nullable=False),
    StructField("CustomerKey", IntegerType(), nullable=True),
    StructField("CityKey", IntegerType(), nullable=True),
    StructField("StockItemKey", IntegerType(), nullable=True),
    StructField("OrderDateKey", DateType(), nullable=True),
    StructField("PickedDateKey", DateType(), nullable=True),
    StructField("SalespersonKey", IntegerType(), nullable=True),
    StructField("PickerKey", IntegerType(), nullable=True),
    StructField("WWIOrderID", IntegerType(), nullable=True),
    StructField("WWIBackorderID", IntegerType(), nullable=True),
    StructField("Description", StringType(), nullable=True),
    StructField("Package", StringType(), nullable=True),
    StructField("Quantity", IntegerType(), nullable=True),
    StructField("UnitPrice", DecimalType(18, 2), nullable=True),
    StructField("TaxRate", DecimalType(18, 3), nullable=True),
    StructField("TotalExcludingTax", DecimalType(18, 2), nullable=True),
    StructField("TaxAmount", DecimalType(18, 2), nullable=True),
    StructField("TotalIncludingTax", DecimalType(18, 2), nullable=True),
    StructField("LineageKey", IntegerType(), nullable=True)
])

# Example of creating a DataFrame with the defined schema
# You can create an empty DataFrame using this schema as follows:
empty_df = spark.createDataFrame([], schema)

# Show the DataFrame schema
empty_df.printSchema()


root
 |-- OrderKey: string (nullable = false)
 |-- CustomerKey: integer (nullable = true)
 |-- CityKey: integer (nullable = true)
 |-- StockItemKey: integer (nullable = true)
 |-- OrderDateKey: date (nullable = true)
 |-- PickedDateKey: date (nullable = true)
 |-- SalespersonKey: integer (nullable = true)
 |-- PickerKey: integer (nullable = true)
 |-- WWIOrderID: integer (nullable = true)
 |-- WWIBackorderID: integer (nullable = true)
 |-- Description: string (nullable = true)
 |-- Package: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: decimal(18,2) (nullable = true)
 |-- TaxRate: decimal(18,3) (nullable = true)
 |-- TotalExcludingTax: decimal(18,2) (nullable = true)
 |-- TaxAmount: decimal(18,2) (nullable = true)
 |-- TotalIncludingTax: decimal(18,2) (nullable = true)
 |-- LineageKey: integer (nullable = true)



Step 2: Data Preparation
Objectives
Register the empty DataFrame as a temporary table for SQL operations.
Load and prepare data from Delta and permanent tables for integration.
Code and Actions


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
import uuid

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Join Sales and Customer Data") \
    .getOrCreate()

try:
    # Load customer_to_gold_df from Delta table
    customer_to_gold_df = spark.read.format("delta").load("/FileStore/tables/temp_silver_customers_table_delta")

    # Read data from the SalesTable
    df_from_SalesOrder = spark.table("SalesTable")

    # Register the DataFrames as temporary views
    df_from_SalesOrder.createOrReplaceTempView("SalesOrders")
    customer_to_gold_df.createOrReplaceTempView("CustomerToGold")

    # Generate GUIDs for Order Key using a UDF
    generate_uuid_udf = udf(lambda: str(uuid.uuid4()), StringType())
    df_from_SalesOrder = df_from_SalesOrder.withColumn("OrderKey", generate_uuid_udf())

    # Show the updated DataFrame with the new OrderKey
    df_from_SalesOrder.show()

except Exception as e:
    print(f"An error occurred: {e}")


+-------+----------+---------+----------+--------+---------+--------+-----------+----------------+----------+--------------------+
|OrderID|CustomerID|ProductID| OrderDate|Quantity|UnitPrice|Discount|TotalAmount| ShippingAddress|    Status|            OrderKey|
+-------+----------+---------+----------+--------+---------+--------+-----------+----------------+----------+--------------------+
|   1001|    2001.0|    101.0|2024-01-01|     5.0|     20.0|     0.1|       90.0|123 Street, City|   Shipped|819e90b5-ecd4-4f7...|
|   1002|    2002.0|    102.0|2024-01-05|     3.0|     15.0|    null|       45.0|456 Avenue, City|Processing|f4a9d633-e0c2-412...|
|   1003|    2003.0|    103.0|2024-01-10|    null|     30.0|    0.05|       85.5|  789 Blvd, City|  Canceled|0ff7789d-3a12-4f5...|
|   1004|    2004.0|    104.0|2024-01-12|     7.0|     25.0|    null|      175.0|123 Street, City|   Shipped|8340a63a-5cdd-40b...|
|   1005|    2005.0|     null|2024-01-15|     2.0|     40.0|     0.1|       null|45

Step 3: Join Tables
Objectives
Use SQL to join sales and customer data.
Generate unique identifiers for each order.



In [0]:
from pyspark.sql import SparkSession

try:
    # Join the tables and select the necessary columns
    joined_df = spark.sql("""
        SELECT 
            g.OrderID AS OrderKey,  -- Assuming OrderID is used as a unique OrderKey
            c.CustomerKey, 
            NULL AS CityKey, 
            NULL AS StockItemKey, 
            NULL AS OrderDateKey, 
            NULL AS PickedDateKey, 
            NULL AS SalespersonKey, 
            NULL AS PickerKey, 
            NULL AS WWIOrderID, 
            NULL AS WWIBackorderID, 
            NULL AS Description, 
            NULL AS Package, 
            NULL AS Quantity, 
            NULL AS UnitPrice, 
            NULL AS TaxRate, 
            NULL AS TotalExcludingTax, 
            NULL AS TaxAmount, 
            NULL AS TotalIncludingTax, 
            NULL AS LineageKey
        FROM SalesOrders g
        LEFT JOIN CustomerToGold c ON g.CustomerID = c.WWICustomerID  -- Use correct fields for the join
    """)

    # Show the joined DataFrame to verify contents
    joined_df.show()

except Exception as e:
    print(f"An error occurred during the join operation: {e}")


+--------+-----------+-------+------------+------------+-------------+--------------+---------+----------+--------------+-----------+-------+--------+---------+-------+-----------------+---------+-----------------+----------+
|OrderKey|CustomerKey|CityKey|StockItemKey|OrderDateKey|PickedDateKey|SalespersonKey|PickerKey|WWIOrderID|WWIBackorderID|Description|Package|Quantity|UnitPrice|TaxRate|TotalExcludingTax|TaxAmount|TotalIncludingTax|LineageKey|
+--------+-----------+-------+------------+------------+-------------+--------------+---------+----------+--------------+-----------+-------+--------+---------+-------+-----------------+---------+-----------------+----------+
|    1001|       null|   null|        null|        null|         null|          null|     null|      null|          null|       null|   null|    null|     null|   null|             null|     null|             null|      null|
|    1002|       null|   null|        null|        null|         null|          null|     null| 

Step 4: Save or Export Results
Objectives
Union the joined data with the initially created empty DataFrame.
Persist the final transformed data back into the GoldFactOrder table.


In [0]:
try:
    # Check if GoldFactOrder_df already exists
    if 'GoldFactOrder_df' in locals():
        # Union the existing GoldFactOrder_df with joined_df to keep all rows
        final_df = GoldFactOrder_df.union(joined_df)
    else:
        # If GoldFactOrder_df doesn't exist, set it to joined_df
        final_df = joined_df

    # Replace the existing GoldFactOrder_df with the final DataFrame
    GoldFactOrder_df = final_df

    # Display the updated GoldFactOrder_df
    display(GoldFactOrder_df)  # Use this if in Databricks, else use GoldFactOrder_df.show()

except Exception as e:
    print(f"An error occurred during the Gold table creation process: {e}")


OrderKey,CustomerKey,CityKey,StockItemKey,OrderDateKey,PickedDateKey,SalespersonKey,PickerKey,WWIOrderID,WWIBackorderID,Description,Package,Quantity,UnitPrice,TaxRate,TotalExcludingTax,TaxAmount,TotalIncludingTax,LineageKey
1001,,,,,,,,,,,,,,,,,,
1002,,,,,,,,,,,,,,,,,,
1003,,,,,,,,,,,,,,,,,,
1004,,,,,,,,,,,,,,,,,,
1005,,,,,,,,,,,,,,,,,,
1006,,,,,,,,,,,,,,,,,,
1007,,,,,,,,,,,,,,,,,,
1008,,,,,,,,,,,,,,,,,,


Highest Valued Customer 


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, desc, coalesce, lit
from pyspark.sql import DataFrame
from pyspark.sql.utils import AnalysisException

# Create a SparkSession
spark = SparkSession.builder \
    .appName("High Value Customers Calculation") \
    .getOrCreate()

def calculate_high_value_customers(df: DataFrame) -> DataFrame:
    try:
        # Ensure that the 'TotalIncludingTax' column is handled correctly if it contains null values
        df = df.withColumn("TotalIncludingTax", coalesce(col("TotalIncludingTax"), lit(0)))

        # Summing the total sales per customer and ordering them by the highest total
        high_value_customers_df = df.groupBy("CustomerKey") \
            .agg(sum("TotalIncludingTax").alias("TotalSales")) \
            .orderBy(desc("TotalSales"))
         
         # Show the resulting DataFrame
        print("High_Value_Customers:")
        
        high_value_customers_df.show()
        return high_value_customers_df

    except AnalysisException as e:
        print(f"An error occurred: {e}")
        print(e)
        return None


Track Frequently Visiting Customers


In [0]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, countDistinct, desc
from pyspark.sql.utils import AnalysisException

# Initialize SparkSession (if not already initialized)
spark = SparkSession.builder \
    .appName("Frequent Visitors Calculation") \
    .getOrCreate()

def calculate_frequent_visitors(df: DataFrame) -> DataFrame:
    try:
        # Validate if necessary columns exist in the DataFrame to avoid runtime errors
        required_columns = {"CustomerKey", "OrderKey"}
        if not required_columns.issubset(df.columns):
            raise ValueError(f"DataFrame does not contain all required columns: {required_columns}")

        # Counting the distinct number of orders per customer and ordering the result
        frequent_visitors_df = df.groupBy("CustomerKey") \
            .agg(countDistinct("OrderKey").alias("NumberOfOrders")) \
            .orderBy(desc("NumberOfOrders"))

        return frequent_visitors_df

    except AnalysisException as e:
        print(f"An error occurred during DataFrame operations: {e}")
        return None
    except ValueError as e:
        print(e)
        return None

# Example Usage:
# Load data into GoldFactOrder_df (replace with your actual data source)
# GoldFactOrder_df = spark.read.format("parquet").load("/path/to/gold_fact_order_data")

# Calculate frequent visitors using the defined DataFrame
frequent_visitors_df = calculate_frequent_visitors(GoldFactOrder_df)

if frequent_visitors_df is not None:
    frequent_visitors_df.show()
else:
    print("Failed to calculate frequent visitors due to an error.")


+-----------+--------------+
|CustomerKey|NumberOfOrders|
+-----------+--------------+
|       null|             8|
+-----------+--------------+

