### Mounting "Bronze container" from ADLS Gen2

In [0]:
storage_account_name = "storagelayercloud"
container_name = "bronze"
key_vault_name = "data-migration-vault"
secret_name = "storageaccntsecret"

# Retrieve the storage account key from Azure Key Vault
scope = f"kv-{key_vault_name}"
key = dbutils.secrets.get(scope=scope, key=secret_name)

mount_point = f"/mnt/{container_name}"

# Unmount if the mount point already exists
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)

# Mounting the bronze container
try:
    dbutils.fs.mount(
        source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
        mount_point=mount_point,
        extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": key}
    )
    
    # Check if mount is successful
    if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
        print(f"{container_name} container mounted successfully! at {mount_point}")
    else:
        print(False)
        
except Exception as e:
    print(False)
    print(f"Mounting failed due to: {e}")


bronze container mounted successfully! at /mnt/bronze


### Mounting "Silver container" from ADLS Gen2

In [0]:
storage_account_name = "storagelayercloud"
container_name = "silver"
key_vault_name = "data-migration-vault"
secret_name = "storageaccntsecret"

# Retrieve the storage account key from Azure Key Vault
scope = f"kv-{key_vault_name}"
key = dbutils.secrets.get(scope=scope, key=secret_name)

mount_point = f"/mnt/{container_name}"

# Unmount if the mount point already exists
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)

# Mounting the bronze container
try:
    dbutils.fs.mount(
        source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
        mount_point=mount_point,
        extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": key}
    )
    
    # Check if mount is successful
    if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
        print(f"{container_name} container mounted successfully! at {mount_point}")
    else:
        print(False)
        
except Exception as e:
    print(False)
    print(f"Mounting failed due to: {e}")


silver container mounted successfully! at /mnt/silver


### Mounting "Gold container" from ADLS Gen2

In [0]:
storage_account_name = "storagelayercloud"
container_name = "gold"
key_vault_name = "data-migration-vault"
secret_name = "storageaccntsecret"

# Retrieve the storage account key from Azure Key Vault
scope = f"kv-{key_vault_name}"
key = dbutils.secrets.get(scope=scope, key=secret_name)

mount_point = f"/mnt/{container_name}"

# Unmount if the mount point already exists
if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
    dbutils.fs.unmount(mount_point)

# Mounting the bronze container
try:
    dbutils.fs.mount(
        source=f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/",
        mount_point=mount_point,
        extra_configs={f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": key}
    )
    
    # Check if mount is successful
    if any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts()):
        print(f"{container_name} container mounted successfully! at {mount_point}")
    else:
        print(False)
        
except Exception as e:
    print(False)
    print(f"Mounting failed due to: {e}")


gold container mounted successfully! at /mnt/gold


### listing all the files present in the bronze location

In [0]:
def list_all_files(path):
    files = []
    
    try:
        # List all files and folders in the current directory
        items = dbutils.fs.ls(path)
        
        for item in items:
            if item.isDir():
                # If the item is a directory, recurse into it
                files.extend(list_all_files(item.path))
            else:
                # If the item is a file, add it to the list
                files.append(item.path)
    except Exception as e:
        print(f"Error while accessing {path}: {e}")
    
    return files

dbfs:/mnt/bronze/SalesLT/Address/Address.parquet
dbfs:/mnt/bronze/SalesLT/Customer/Customer.parquet
dbfs:/mnt/bronze/SalesLT/CustomerAddress/CustomerAddress.parquet
dbfs:/mnt/bronze/SalesLT/Product/Product.parquet
dbfs:/mnt/bronze/SalesLT/ProductCategory/ProductCategory.parquet
dbfs:/mnt/bronze/SalesLT/ProductDescription/ProductDescription.parquet
dbfs:/mnt/bronze/SalesLT/ProductModel/ProductModel.parquet
dbfs:/mnt/bronze/SalesLT/ProductModelProductDescription/ProductModelProductDescription.parquet
dbfs:/mnt/bronze/SalesLT/SalesOrderDetail/SalesOrderDetail.parquet
dbfs:/mnt/bronze/SalesLT/SalesOrderHeader/SalesOrderHeader.parquet


### Dynamic DataFrame Creation

For each file:
-   Load it as a Spark DataFrame.
-   Extract the file name (without extension) to create a DataFrame name.
-   Store the DataFrame as a variable using globals().

Tracking & Displaying:
-   Keep track of created DataFrames in a list.
-   Display all successfully created DataFrames.
-   Accessing DataFrames:
-   DataFrames are accessible directly by their dynamically generated names (e.g., customers_dataframe, orders_dataframe).

Code Flow
-   Read Files: read_files_dynamically()
-   Access DataFrames: Directly by name (e.g., customers_dataframe.show()).
-   List Created DataFrames: Printed at the end of the script.

In [0]:
import os

def read_files_dynamically(base_path):
    all_files = list_all_files(base_path)
    created_dataframes = []  # To keep track of created DataFrame names
    
    for file_path in all_files:
        try:
            if file_path.endswith(".parquet"):
                df = spark.read.format('parquet') \
                        .option('header', 'true') \
                        .option('inferSchema', 'true') \
                        .load(file_path)
                
                # Extract the file name without extension and directory path
                file_name = os.path.basename(file_path).replace(".parquet", "")
                
                # Create a DataFrame name based on the file name
                dataframe_name = f"{file_name}_dataframe"
                
                # Store the DataFrame as a variable with the desired name using globals()
                globals()[dataframe_name.lower()] = df
                
                # Track the DataFrame name
                created_dataframes.append(dataframe_name.lower())
            else:
                print(f"Unsupported file format for: {file_path}")
                continue
        except Exception as e:
            print(f"Failed to read {file_path}: {e}")
    return created_dataframes

# Define the base path
base_folder = "SalesLT"
mount_point = "/mnt/bronze"  # Make sure this is your correct mount point
base_path = f"{mount_point}/{base_folder}"

# Read files dynamically and store them as named DataFrames
created_dataframes = read_files_dynamically(base_path)

**Purpose**
To convert all timestamp type columns in dynamically created DataFrames to date type with the format yyyy-MM-dd.

**Process**
- Traverse through the schema of each DataFrame.
- Collect all columns with the data type timestamp.
- Convert identified columns to date type using date_format() function.
- Format applied: 'yyyy-MM-dd'.
- Replace the original DataFrame variables with the modified DataFrames using globals().
- Save them to Silver layer in DELTA format.

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrame

def convert_timestamps_to_date(dataframe: DataFrame) -> DataFrame:
    timestamp_columns = [field.name for field in dataframe.schema.fields if field.dataType.typeName() == 'timestamp']
    
    for column in timestamp_columns:
        dataframe = dataframe.withColumn(column, F.date_format(F.col(column), 'yyyy-MM-dd').cast('date'))
    
    return dataframe

def process_all_dataframes(created_dataframes):
    for dataframe_name in created_dataframes:
        original_df = globals()[dataframe_name]
        
        # Convert timestamp columns to date format
        modified_df = convert_timestamps_to_date(original_df)
        
        # Update the global variable with the modified DataFrame
        globals()[dataframe_name.lower()] = modified_df
        folder_name = dataframe_name.capitalize().split('_')[0]
        silver_path = f"/mnt/silver/{base_folder}/{folder_name}/"
        # Save DataFrame as Delta format to 'silver' container in overwrite mode
        modified_df.write.format("delta").mode("overwrite").save(silver_path)

# Process all created DataFrames
process_all_dataframes(created_dataframes)