In [0]:
dbutils.fs.ls("/mnt/ankandatalakesilverlayer")

[FileInfo(path='dbfs:/mnt/ankandatalakesilverlayer/sales/', name='sales/', size=0, modificationTime=1727043369000)]

In [0]:
dbutils.fs.ls("/mnt/ankandatalakegoldlayer")

[]

In [0]:
input_path = "/mnt/ankandatalakesilverlayer/sales/orders/"

In [0]:
df = spark.read.format("delta").load(input_path)
display(df)

order_id,customer_id,order_status,order_date,required_date,shipped_date,store_id,staff_id
1,1,1,2024-09-01,2024-09-05,2024-09-03,1,1
2,2,2,2024-09-02,2024-09-06,,1,2


In [0]:
from pyspark.sql.functions import col

# Function to modify column names based on the presence of an underscore
def modify_column_names(df):
    new_columns = []
    for column_name in df.columns:
        # Check if the column name contains an underscore
        if "_" not in column_name:
            # Split the column name based on uppercase letters to identify parts
            parts = re.findall('[A-Z][^A-Z]*', column_name)
            # If the column name has more than one part, join them with an underscore
            if len(parts) > 1:
                new_column_name = "_".join(parts).lower()
            else:
                new_column_name = column_name.lower()
        else:
            new_column_name = column_name
        new_columns.append(new_column_name)
    
    # Rename the columns in the DataFrame
    for old_name, new_name in zip(df.columns, new_columns):
        df = df.withColumnRenamed(old_name, new_name)
    
    return df

# Modify column names in the DataFrame
df_columns = modify_column_names(df)
display(df_columns)

order_id,customer_id,order_status,order_date,required_date,shipped_date,store_id,staff_id
1,1,1,2024-09-01,2024-09-05,2024-09-03,1,1
2,2,2,2024-09-02,2024-09-06,,1,2


In [0]:
from pyspark.sql.functions import coalesce

# Replace null values in 'shipped_date' with values from 'required_date'
df_columns = df_columns.withColumn("shipped_date", coalesce(df_columns["shipped_date"], df_columns["required_date"]))

# Assuming a similar logic needs to be applied to other columns, repeat the process as needed
# Example for another pair of columns: 'columnA' and 'columnB'
# df_columns = df_columns.withColumn("columnA", coalesce(df_columns["columnA"], df_columns["columnB"]))

display(df_columns)

order_id,customer_id,order_status,order_date,required_date,shipped_date,store_id,staff_id
1,1,1,2024-09-01,2024-09-05,2024-09-03,1,1
2,2,2,2024-09-02,2024-09-06,2024-09-06,1,2


Doing Transformation for all tables

In [0]:
table_name = []
for i in dbutils.fs.ls('/mnt/ankandatalakesilverlayer/sales/'):
    table_name.append(i.name.rstrip('/'))

table_name    

['customers', 'order_items', 'orders', 'stores']

In [0]:
from pyspark.sql.functions import coalesce, col, lit
import re

# Function to determine the replacement value based on column data type
def get_replacement_value(df, column_index):
    # Get the column data type
    column_type = df.dtypes[column_index][1]
    
    # Determine the replacement value based on the column data type
    if re.search("int", column_type):
        return 0  # Default integer value
    elif re.search("date|string", column_type):
        return ""  # Default string/date value
    else:
        return None  # For other data types, no replacement is performed

# Function to apply modifications to all tables
def modify_tables(table_names):
    for table_name in table_names:
        # Load the table
        df = spark.read.format("delta").load(f"/mnt/ankandatalakesilverlayer/sales/{table_name}")
        
        # Iterate through all columns to replace null values
        for i in range(len(df.columns)):
            # Skip the last column as there's no subsequent column to replace from
            if i < len(df.columns) - 1:
                replacement_value = get_replacement_value(df, i + 1)
                # Replace null values in the current column with values from the subsequent column or a default value
                if replacement_value is not None:
                    df = df.withColumn(df.columns[i], coalesce(df[df.columns[i]], df[df.columns[i + 1]], lit(replacement_value)))
                else:
                    df = df.withColumn(df.columns[i], coalesce(df[df.columns[i]], df[df.columns[i + 1]]))
        
        # Display the modified DataFrame
        display(df)

# Get the list of table names
table_names = [i.name.rstrip('/') for i in dbutils.fs.ls('/mnt/ankandatalakesilverlayer/sales/')]

# Apply modifications to all tables
modify_tables(table_names)

customer_id,first_name,last_name,phone,email,street,city,state,zip_code
1,John,Doe,123-456-7890,john.doe@example.com,123 Main St,Anytown,CA,12345
2,Jane,Smith,555-555-5555,jane.smith@example.com,456 Elm St,Othertown,NY,67890


order_id,item_id,product_id,quantity,list_price,discount
1,1,1,2.0,1200.0,0.0
1,2,3,1.0,50.0,0.1
2,1,2,1.0,1400.0,0.05


order_id,customer_id,order_status,order_date,required_date,shipped_date,store_id,staff_id
1,1,1,2024-09-01,2024-09-05,2024-09-03,1,1
2,2,2,2024-09-02,2024-09-06,1,1,2


store_id,store_name,phone,email,street,city,state,zip_code
1,Downtown Bike Store,123-456-7890,downtown@example.com,123 Main St,Anytown,CA,12345
2,Uptown Bike Store,555-555-5555,uptown@example.com,456 Elm St,Othertown,NY,67890


In [0]:
# Function to print data types of all columns for each table
def print_column_data_types(table_names):
    for table_name in table_names:
        # Load the table
        df = spark.read.format("delta").load(f"/mnt/ankandatalakesilverlayer/sales/{table_name}")
        
        # Print the table name
        print(f"Table: {table_name}")
        
        # Print data types of all columns
        for column_name, data_type in df.dtypes:
            print(f"Column: {column_name}, Type: {data_type}")

# Get the list of table names
table_names = [i.name.rstrip('/') for i in dbutils.fs.ls('/mnt/ankandatalakesilverlayer/sales/')]

# Print data types for all columns in each table
print_column_data_types(table_names)

Table: customers
Column: customer_id, Type: int
Column: first_name, Type: string
Column: last_name, Type: string
Column: phone, Type: string
Column: email, Type: string
Column: street, Type: string
Column: city, Type: string
Column: state, Type: string
Column: zip_code, Type: string
Table: order_items
Column: order_id, Type: int
Column: item_id, Type: int
Column: product_id, Type: int
Column: quantity, Type: int
Column: list_price, Type: decimal(10,2)
Column: discount, Type: decimal(4,2)
Table: orders
Column: order_id, Type: int
Column: customer_id, Type: int
Column: order_status, Type: int
Column: order_date, Type: string
Column: required_date, Type: string
Column: shipped_date, Type: string
Column: store_id, Type: int
Column: staff_id, Type: int
Table: stores
Column: store_id, Type: int
Column: store_name, Type: string
Column: phone, Type: string
Column: email, Type: string
Column: street, Type: string
Column: city, Type: string
Column: state, Type: string
Column: zip_code, Type: str

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DateType, StringType
import re

def rectify_column_data_types(df: DataFrame) -> DataFrame:
    # Define patterns for column names to identify their correct data types
    int_patterns = ["zip_code", "postal_code"]
    date_patterns = [".*_date$", ".*_time$", "timestamp"]
    
    # Iterate through all columns to rectify their data types based on column names
    for column_name, data_type in df.dtypes:
        if any(re.search(pattern, column_name.lower()) for pattern in int_patterns) and data_type == "string":
            # Cast column to IntegerType if it matches the integer pattern
            df = df.withColumn(column_name, col(column_name).cast(IntegerType()))
        elif any(re.search(pattern, column_name.lower()) for pattern in date_patterns) and data_type == "string":
            # Cast column to DateType if it matches the date pattern
            df = df.withColumn(column_name, col(column_name).cast(DateType()))
    
    return df

def print_column_data_types(df: DataFrame):
    for column_name, data_type in df.dtypes:
        print(f"Column: {column_name}, Type: {data_type}")

# Example usage
if __name__ == "__main__":
    # Assuming you have a SparkSession called 'spark'
    # Create a sample DataFrame
    data = [("12345", "2023-05-01", "2023-05-05", "2023-05-03")]
    columns = ["zip_code", "order_date", "required_date", "shipped_date"]
    df = spark.createDataFrame(data, columns)
    
    print("Before rectification:")
    print_column_data_types(df)
    
    df = rectify_column_data_types(df)
    
    print("\nAfter rectification:")
    print_column_data_types(df)

Before rectification:
Column: zip_code, Type: string
Column: order_date, Type: string
Column: required_date, Type: string
Column: shipped_date, Type: string

After rectification:
Column: zip_code, Type: int
Column: order_date, Type: date
Column: required_date, Type: date
Column: shipped_date, Type: date


In [0]:
# Databricks notebook source
# MAGIC %md
# MAGIC # Silver to Gold Data Transformation
# MAGIC 
# MAGIC This notebook processes data from the silver layer, applies necessary transformations, and saves the results to the gold layer.

# COMMAND ----------

# MAGIC %md
# MAGIC ## Setup and Imports

# COMMAND ----------

from pyspark.sql import DataFrame
from pyspark.sql.functions import coalesce, col, lit, lead
from pyspark.sql.types import IntegerType, DateType, DecimalType
from pyspark.sql.window import Window
import re

# COMMAND ----------

# MAGIC %md
# MAGIC ## Configuration

# COMMAND ----------

# Define the base paths for silver and gold layers
SILVER_BASE_PATH = "/mnt/ankandatalakesilverlayer/sales"
GOLD_BASE_PATH = "/mnt/ankandatalakegoldlayer/sales"

# List of tables to process
TABLE_NAMES = ["customers", "order_items", "orders", "stores"]

# COMMAND ----------

# MAGIC %md
# MAGIC ## Helper Functions

# COMMAND ----------

def rectify_column_data_types(df: DataFrame, table_name: str) -> DataFrame:
    """
    Rectify column data types based on column names and table-specific rules.
    """
    for column_name, data_type in df.dtypes:
        if column_name == "zip_code" and data_type == "string":
            df = df.withColumn(column_name, col(column_name).cast(IntegerType()))
        elif column_name.endswith("_date") and data_type == "string":
            df = df.withColumn(column_name, col(column_name).cast(DateType()))
        elif column_name in ["list_price", "discount"] and table_name == "order_items":
            df = df.withColumn(column_name, col(column_name).cast(DecimalType(10, 2)))
    return df

def replace_shipped_date_null(df: DataFrame) -> DataFrame:
    """
    Replace null values in shipped_date with the subsequent row's required_date.
    """
    if "shipped_date" in df.columns and "required_date" in df.columns:
        window = Window.orderBy("order_date")
        df = df.withColumn(
            "shipped_date",
            coalesce(
                col("shipped_date"),
                lead("required_date").over(window)
            )
        )
    return df

# COMMAND ----------

# MAGIC %md
# MAGIC ## Main Processing Function

# COMMAND ----------

def process_table(table_name: str) -> None:
    """
    Process a single table: read from silver, apply transformations, and write to gold.
    """
    print(f"Processing table: {table_name}")
    
    # Read the table from the silver layer
    silver_path = f"{SILVER_BASE_PATH}/{table_name}"
    df = spark.read.format("delta").load(silver_path)
    
    # Apply data type rectification
    df = rectify_column_data_types(df, table_name)
    
    # Replace shipped_date null values (only for orders table)
    if table_name == "orders":
        df = replace_shipped_date_null(df)
    
    # Display sample data and schema
    print(f"Sample data for {table_name}:")
    display(df.limit(5))
    
    print(f"Schema for {table_name}:")
    df.printSchema()
    
    # Write the processed data to the gold layer
    gold_path = f"{GOLD_BASE_PATH}/{table_name}"
    df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(f"delta.`{gold_path}`")
    print(f"Table {table_name} has been processed and saved to the gold layer.")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Main Execution

# COMMAND ----------

def main():
    for table_name in TABLE_NAMES:
        process_table(table_name)
    print("All tables have been processed successfully.")

# Run the main function
main()

# COMMAND ----------

# MAGIC %md
# MAGIC ## Verification (Optional)

# COMMAND ----------

# MAGIC %md
# MAGIC You can use the following code to verify the data in the gold layer:

# COMMAND ----------

def verify_gold_data(table_name: str):
    gold_path = f"{GOLD_BASE_PATH}/{table_name}"
    df = spark.read.format("delta").load(gold_path)
    print(f"Verifying data for {table_name} in gold layer:")
    display(df.limit(5))
    df.printSchema()

# Uncomment and run the following lines to verify each table
# verify_gold_data("customers")
# verify_gold_data("order_items")
# verify_gold_data("orders")
# verify_gold_data("stores")

# COMMAND ----------

# MAGIC %md
# MAGIC ## Cleanup (Optional)

# COMMAND ----------

# MAGIC %md
# MAGIC If you need to clean up the gold layer or reset the data, you can use the following function:

# COMMAND ----------
'''
def cleanup_gold_layer():
    for table_name in TABLE_NAMES:
        gold_path = f"{GOLD_BASE_PATH}/{table_name}"
        dbutils.fs.rm(gold_path, recurse=True)
        print(f"Cleaned up {table_name} from gold layer.")

# Uncomment the following line to run the cleanup
# cleanup_gold_layer()
'''

Processing table: customers
Sample data for customers:


customer_id,first_name,last_name,phone,email,street,city,state,zip_code
1,John,Doe,123-456-7890,john.doe@example.com,123 Main St,Anytown,CA,12345
2,Jane,Smith,555-555-5555,jane.smith@example.com,456 Elm St,Othertown,NY,67890


Schema for customers:
root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: integer (nullable = true)

Table customers has been processed and saved to the gold layer.
Processing table: order_items
Sample data for order_items:


order_id,item_id,product_id,quantity,list_price,discount
1,1,1,2,1200.0,0.0
1,2,3,1,50.0,0.1
2,1,2,1,1400.0,0.05


Schema for order_items:
root
 |-- order_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- list_price: decimal(10,2) (nullable = true)
 |-- discount: decimal(10,2) (nullable = true)

Table order_items has been processed and saved to the gold layer.
Processing table: orders
Sample data for orders:


order_id,customer_id,order_status,order_date,required_date,shipped_date,store_id,staff_id
1,1,1,2024-09-01,2024-09-05,2024-09-03,1,1
2,2,2,2024-09-02,2024-09-06,,1,2


Schema for orders:
root
 |-- order_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- order_status: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- required_date: date (nullable = true)
 |-- shipped_date: date (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- staff_id: integer (nullable = true)

Table orders has been processed and saved to the gold layer.
Processing table: stores
Sample data for stores:


store_id,store_name,phone,email,street,city,state,zip_code
1,Downtown Bike Store,123-456-7890,downtown@example.com,123 Main St,Anytown,CA,12345
2,Uptown Bike Store,555-555-5555,uptown@example.com,456 Elm St,Othertown,NY,67890


Schema for stores:
root
 |-- store_id: integer (nullable = true)
 |-- store_name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- email: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- zip_code: integer (nullable = true)

Table stores has been processed and saved to the gold layer.
All tables have been processed successfully.


'\ndef cleanup_gold_layer():\n    for table_name in TABLE_NAMES:\n        gold_path = f"{GOLD_BASE_PATH}/{table_name}"\n        dbutils.fs.rm(gold_path, recurse=True)\n        print(f"Cleaned up {table_name} from gold layer.")\n\n# Uncomment the following line to run the cleanup\n# cleanup_gold_layer()\n'

In [0]:
from pyspark.sql.functions import coalesce, when, col
from pyspark.sql.window import Window

# Read the orders table
orders_df = spark.read.format("delta").load("/mnt/ankandatalakesilverlayer/sales/orders")

# Define a window specification sorted by order_id and order_date
window = Window.orderBy("order_id", "order_date")

# Replace null shipped_date with the current row's required_date
orders_df = orders_df.withColumn(
    "shipped_date",
    when(col("shipped_date").isNull(), col("required_date")).otherwise(col("shipped_date"))
)

# Display the updated DataFrame
display(orders_df)

# Optionally, write the updated DataFrame back to the gold layer
orders_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable("delta.`/mnt/ankandatalakegoldlayer/sales/orders`")

order_id,customer_id,order_status,order_date,required_date,shipped_date,store_id,staff_id
1,1,1,2024-09-01,2024-09-05,2024-09-03,1,1
2,2,2,2024-09-02,2024-09-06,2024-09-06,1,2


In [0]:
# orders_df.write.format("delta").option("mergeSchema", "true").mode("overwrite").save("/mnt/ankandatalakegoldlayer/sales/orders")

In [0]:
input_path = "/mnt/ankandatalakegoldlayer/sales/order_items/"
df = spark.read.format("delta").load(input_path)
display(df)

order_id,item_id,product_id,quantity,list_price,discount
1,1,1,2,1200.0,0.0
1,2,3,1,50.0,0.1
2,1,2,1,1400.0,0.05
