## Listing data Files in ADLS 

In [0]:
display(dbutils.fs.ls("abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/"))

path,name,size,modificationTime
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Calendar Lookup.csv,AdventureWorks Calendar Lookup.csv,10950,1730882208000
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Customer Lookup.csv,AdventureWorks Customer Lookup.csv,1892621,1730882212000
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Product Categories Lookup.csv,AdventureWorks Product Categories Lookup.csv,83,1730882209000
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Product Lookup.csv,AdventureWorks Product Lookup.csv,58122,1730882209000
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Product Subcategories Lookup.csv,AdventureWorks Product Subcategories Lookup.csv,637,1730882209000
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Returns Data.csv,AdventureWorks Returns Data.csv,36435,1730882209000
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Sales Data 2020.csv,AdventureWorks Sales Data 2020.csv,123962,1730882209000
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Sales Data 2021.csv,AdventureWorks Sales Data 2021.csv,1127915,1730882212000
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Sales Data 2022.csv,AdventureWorks Sales Data 2022.csv,1388999,1730882213000
abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/AdventureWorks Territory Lookup.csv,AdventureWorks Territory Lookup.csv,400,1730882209000


## Creating Schemas following Medallion Architecture

In [0]:
# SETUP AND CONFIGURATIONS
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *

# Create schemas for different layers
def create_schemas():
    spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")
    spark.sql("CREATE SCHEMA IF NOT EXISTS silver")
    spark.sql("CREATE SCHEMA IF NOT EXISTS gold")

create_schemas()

## Creating Raw Tables - Bronze Layer

In [0]:
def create_bronze_tables():
    # Base path for all files
    base_path = "abfss://adlscontainer@storageaccount8697.dfs.core.windows.net/"
    
    # Read and store sales data
    for year in ['2020', '2021', '2022']:
        file_path = f"{base_path}AdventureWorks Sales Data {year}.csv"
        df = spark.read.format("csv").option("header", "true").load(file_path)
        
        # Add metadata columns
        df = df.withColumn("source_file", lit(f"Sales_{year}")) \
               .withColumn("ingestion_date", current_timestamp())
        
        # Write as delta table
        df.write.format("delta") \
          .mode("overwrite") \
          .saveAsTable(f"bronze.sales_{year}")
    
    # Read and store lookup tables
    lookup_tables = {
        'customer': 'AdventureWorks Customer Lookup.csv',
        'product': 'AdventureWorks Product Lookup.csv',
        'product_categories': 'AdventureWorks Product Categories Lookup.csv',
        'product_subcategories': 'AdventureWorks Product Subcategories Lookup.csv',
        'territory': 'AdventureWorks Territory Lookup.csv',
        'returns': 'AdventureWorks Returns Data.csv'
    }
    
    for table_name, file_name in lookup_tables.items():
        file_path = f"{base_path}{file_name}"
        df = spark.read.format("csv").option("header", "true").load(file_path)
        
        # Add metadata columns
        df = df.withColumn("source_file", lit(file_name)) \
               .withColumn("ingestion_date", current_timestamp())
        
        # Write as delta table
        df.write.format("delta") \
          .mode("overwrite") \
          .saveAsTable(f"bronze.{table_name}")

create_bronze_tables()

## Creating Cleaned Tables - Silver Tables

In [0]:
def create_silver_tables():
    """
    Creates silver layer tables with cleaned and standardized data from bronze layer.
    Includes data type conversions, standardization, and basic data quality checks.
    """
    
    # Customer table transformation
    def transform_customer():
        df = spark.table("bronze.customer")
        
        return df.select(
            col("CustomerKey").cast("integer").alias("customer_key"),
            initcap(col("Prefix")).alias("prefix"),
            initcap(col("FirstName")).alias("first_name"),
            initcap(col("LastName")).alias("last_name"),
            to_date(col("BirthDate"), "yyyy-MM-dd").alias("birth_date"),
            col("MaritalStatus").alias("marital_status"),
            col("Gender").alias("gender"),
            lower(col("EmailAddress")).alias("email_address"),
            col("AnnualIncome").cast("integer").alias("annual_income"),
            col("TotalChildren").cast("integer").alias("total_children"),
            col("EducationLevel").alias("education_level"),
            col("Occupation").alias("occupation"),
            col("HomeOwner").alias("home_owner"),
            col("source_file").alias("source_file"),
            col("ingestion_date").alias("ingestion_date")
        ).where(
            col("CustomerKey").isNotNull() &
            col("FirstName").isNotNull() &
            col("LastName").isNotNull()
        )

    # Product table transformation
    def transform_product():
        df = spark.table("bronze.product")
        
        return df.select(
            col("ProductKey").cast("integer").alias("product_key"),
            col("ProductSubcategoryKey").cast("integer").alias("product_subcategory_key"),
            col("ProductSKU").alias("product_sku"),
            col("ProductName").alias("product_name"),
            col("ModelName").alias("model_name"),
            col("ProductDescription").alias("product_description"),
            col("ProductColor").alias("product_color"),
            col("ProductSize").alias("product_size"),
            col("ProductStyle").alias("product_style"),
            col("ProductCost").cast("decimal(10,2)").alias("product_cost"),
            col("ProductPrice").cast("decimal(10,2)").alias("product_price"),
            col("source_file").alias("source_file"),
            col("ingestion_date").alias("ingestion_date")
        ).where(
            col("ProductKey").isNotNull() &
            col("ProductName").isNotNull()
        )

    # Product Categories transformation
    def transform_product_categories():
        df = spark.table("bronze.product_categories")
        
        return df.select(
            col("ProductCategoryKey").cast("integer").alias("product_category_key"),
            col("CategoryName").alias("category_name"),
            col("source_file").alias("source_file"),
            col("ingestion_date").alias("ingestion_date")
        ).where(
            col("ProductCategoryKey").isNotNull() &
            col("CategoryName").isNotNull()
        )

    # Product Subcategories transformation
    def transform_product_subcategories():
        df = spark.table("bronze.product_subcategories")
        
        return df.select(
            col("ProductSubcategoryKey").cast("integer").alias("product_subcategory_key"),
            col("SubcategoryName").alias("subcategory_name"),
            col("ProductCategoryKey").cast("integer").alias("product_category_key"),
            col("source_file").alias("source_file"),
            col("ingestion_date").alias("ingestion_date")
        ).where(
            col("ProductSubcategoryKey").isNotNull() &
            col("SubcategoryName").isNotNull()
        )

    # Territory transformation
    def transform_territory():
        df = spark.table("bronze.territory")
        
        return df.select(
            col("SalesTerritoryKey").cast("integer").alias("territory_key"),
            col("Region").alias("region"),
            col("Country").alias("country"),
            col("Continent").alias("continent"),
            col("source_file").alias("source_file"),
            col("ingestion_date").alias("ingestion_date")
        ).where(
            col("SalesTerritoryKey").isNotNull() &
            col("Region").isNotNull()
        )

    # Returns transformation
    def transform_returns():
        df = spark.table("bronze.returns")
        
        return df.select(
            to_date(col("ReturnDate"), "yyyy-MM-dd").alias("return_date"),
            col("TerritoryKey").cast("integer").alias("territory_key"),
            col("ProductKey").cast("integer").alias("product_key"),
            col("ReturnQuantity").cast("integer").alias("return_quantity"),
            col("source_file").alias("source_file"),
            col("ingestion_date").alias("ingestion_date")
        ).where(
            col("ReturnDate").isNotNull() &
            col("ProductKey").isNotNull()
        )

    # Sales transformation
    def transform_sales(year):
        df = spark.table(f"bronze.sales_{year}")
        
        return df.select(
            to_date(col("OrderDate"), "yyyy-MM-dd").alias("order_date"),
            to_date(col("StockDate"), "yyyy-MM-dd").alias("stock_date"),
            col("OrderNumber").alias("order_number"),
            col("ProductKey").cast("integer").alias("product_key"),
            col("CustomerKey").cast("integer").alias("customer_key"),
            col("TerritoryKey").cast("integer").alias("territory_key"),
            col("OrderLineItem").cast("integer").alias("order_line_item"),
            col("OrderQuantity").cast("integer").alias("order_quantity"),
            col("source_file").alias("source_file"),
            col("ingestion_date").alias("ingestion_date")
        ).where(
            col("OrderDate").isNotNull() &
            col("ProductKey").isNotNull() &
            col("CustomerKey").isNotNull()
        )

    # Execute transformations and write to silver layer
    try:
        # Transform and write dimension tables
        transform_customer().write.format("delta").mode("overwrite").saveAsTable("silver.customer_dim")
        transform_product().write.format("delta").mode("overwrite").saveAsTable("silver.product_dim")
        transform_product_categories().write.format("delta").mode("overwrite").saveAsTable("silver.product_category_dim")
        transform_product_subcategories().write.format("delta").mode("overwrite").saveAsTable("silver.product_subcategory_dim")
        transform_territory().write.format("delta").mode("overwrite").saveAsTable("silver.territory_dim")
        
        # Transform and write fact tables
        transform_returns().write.format("delta").mode("overwrite").saveAsTable("silver.returns_fact")
        
        # Transform and write sales fact tables for each year
        for year in ['2020', '2021', '2022']:
            transform_sales(year).write.format("delta").mode("overwrite").saveAsTable(f"silver.sales_{year}_fact")
        
        # Union all sales tables into a single sales fact table
        sales_2020 = spark.table("silver.sales_2020_fact")
        sales_2021 = spark.table("silver.sales_2021_fact")
        sales_2022 = spark.table("silver.sales_2022_fact")
        
        sales_combined = sales_2020.unionAll(sales_2021).unionAll(sales_2022)
        
        sales_combined.write.format("delta").mode("overwrite").saveAsTable("silver.sales_fact")
        
        # Drop individual year tables
        for year in ['2020', '2021', '2022']:
            spark.sql(f"DROP TABLE IF EXISTS silver.sales_{year}_fact")
            
        print("Silver layer tables created successfully!")
        
    except Exception as e:
        print(f"Error creating silver layer tables: {str(e)}")
        raise

create_silver_tables()

## Creating Aggregated Tables - Gold Layer

In [0]:
def create_gold_tables():
    """
    Creates gold layer tables with business-ready views, aggregated metrics,
    and denormalized data for analytics and reporting.
    """
    
    # 1. Sales Analysis by Product
    def create_product_sales_analysis():
        return spark.sql("""
            WITH product_sales AS (
                SELECT 
                    p.product_key,
                    p.product_name,
                    p.product_cost,
                    p.product_price,
                    pc.category_name,
                    ps.subcategory_name,
                    s.order_date,
                    s.order_quantity,
                    (s.order_quantity * p.product_price) as revenue,
                    (s.order_quantity * p.product_cost) as cost,
                    ((s.order_quantity * p.product_price) - (s.order_quantity * p.product_cost)) as profit
                FROM silver.sales_fact s
                JOIN silver.product_dim p ON s.product_key = p.product_key
                JOIN silver.product_subcategory_dim ps ON p.product_subcategory_key = ps.product_subcategory_key
                JOIN silver.product_category_dim pc ON ps.product_category_key = pc.product_category_key
            )
            SELECT 
                product_key,
                product_name,
                category_name,
                subcategory_name,
                COUNT(DISTINCT date_trunc('month', order_date)) as months_sold,
                SUM(order_quantity) as total_quantity_sold,
                SUM(revenue) as total_revenue,
                SUM(cost) as total_cost,
                SUM(profit) as total_profit,
                ROUND(AVG(revenue), 2) as avg_monthly_revenue,
                ROUND(AVG(profit), 2) as avg_monthly_profit
            FROM product_sales
            GROUP BY product_key, product_name, category_name, subcategory_name
        """)

    # 2. Customer Purchase Analysis
    def create_customer_purchase_analysis():
        return spark.sql("""
            WITH customer_purchases AS (
                SELECT 
                    c.customer_key,
                    c.first_name,
                    c.last_name,
                    c.email_address,
                    c.annual_income,
                    t.region,
                    t.country,
                    s.order_date,
                    s.order_quantity,
                    (s.order_quantity * p.product_price) as purchase_amount
                FROM silver.sales_fact s
                JOIN silver.customer_dim c ON s.customer_key = c.customer_key
                JOIN silver.product_dim p ON s.product_key = p.product_key
                JOIN silver.territory_dim t ON s.territory_key = t.territory_key
            )
            SELECT 
                customer_key,
                CONCAT(first_name, ' ', last_name) as customer_name,
                email_address,
                annual_income,
                region,
                country,
                COUNT(DISTINCT date_trunc('month', order_date)) as number_of_months_active,
                COUNT(DISTINCT date_trunc('day', order_date)) as number_of_purchase_days,
                SUM(order_quantity) as total_items_purchased,
                ROUND(SUM(purchase_amount), 2) as total_purchase_amount,
                ROUND(AVG(purchase_amount), 2) as avg_purchase_amount
            FROM customer_purchases
            GROUP BY customer_key, first_name, last_name, email_address, annual_income, region, country
        """)

    # 3. Monthly Sales Trends
    def create_monthly_sales_trends():
        return spark.sql("""
            SELECT 
                date_trunc('month', s.order_date) as sale_month,
                pc.category_name,
                t.region,
                t.country,
                COUNT(DISTINCT s.order_number) as total_orders,
                SUM(s.order_quantity) as total_quantity,
                ROUND(SUM(s.order_quantity * p.product_price), 2) as total_revenue,
                ROUND(SUM(s.order_quantity * p.product_cost), 2) as total_cost,
                ROUND(SUM(s.order_quantity * (p.product_price - p.product_cost)), 2) as total_profit,
                COUNT(DISTINCT s.customer_key) as unique_customers
            FROM silver.sales_fact s
            JOIN silver.product_dim p ON s.product_key = p.product_key
            JOIN silver.product_subcategory_dim ps ON p.product_subcategory_key = ps.product_subcategory_key
            JOIN silver.product_category_dim pc ON ps.product_category_key = pc.product_category_key
            JOIN silver.territory_dim t ON s.territory_key = t.territory_key
            GROUP BY date_trunc('month', s.order_date), pc.category_name, t.region, t.country
            ORDER BY sale_month
        """)

    # 4. Returns Analysis
    def create_returns_analysis():
        return spark.sql("""
            SELECT 
                date_trunc('month', r.return_date) as return_month,
                p.product_name,
                pc.category_name,
                t.region,
                t.country,
                COUNT(*) as return_count,
                SUM(r.return_quantity) as total_return_quantity,
                ROUND(SUM(r.return_quantity * p.product_price), 2) as total_return_value
            FROM silver.returns_fact r
            JOIN silver.product_dim p ON r.product_key = p.product_key
            JOIN silver.product_subcategory_dim ps ON p.product_subcategory_key = ps.product_subcategory_key
            JOIN silver.product_category_dim pc ON ps.product_category_key = pc.product_category_key
            JOIN silver.territory_dim t ON r.territory_key = t.territory_key
            GROUP BY 
                date_trunc('month', r.return_date),
                p.product_name,
                pc.category_name,
                t.region,
                t.country
            ORDER BY return_month
        """)

    # 5. Territory Performance
    def create_territory_performance():
        return spark.sql("""
            WITH territory_metrics AS (
                SELECT 
                    t.territory_key,
                    t.region,
                    t.country,
                    t.continent,
                    date_trunc('month', s.order_date) as sale_month,
                    COUNT(DISTINCT s.order_number) as order_count,
                    COUNT(DISTINCT s.customer_key) as customer_count,
                    SUM(s.order_quantity) as total_quantity,
                    SUM(s.order_quantity * p.product_price) as total_revenue,
                    SUM(s.order_quantity * p.product_cost) as total_cost
                FROM silver.sales_fact s
                JOIN silver.territory_dim t ON s.territory_key = t.territory_key
                JOIN silver.product_dim p ON s.product_key = p.product_key
                GROUP BY t.territory_key, t.region, t.country, t.continent, date_trunc('month', s.order_date)
            )
            SELECT 
                territory_key,
                region,
                country,
                continent,
                COUNT(DISTINCT sale_month) as active_months,
                SUM(order_count) as total_orders,
                SUM(customer_count) as total_customers,
                ROUND(AVG(customer_count), 2) as avg_monthly_customers,
                ROUND(SUM(total_revenue), 2) as total_revenue,
                ROUND(SUM(total_cost), 2) as total_cost,
                ROUND(SUM(total_revenue - total_cost), 2) as total_profit,
                ROUND(AVG(total_revenue), 2) as avg_monthly_revenue
            FROM territory_metrics
            GROUP BY territory_key, region, country, continent
        """)

    # Execute all transformations and write to gold layer
    try:
        # Create gold tables
        create_product_sales_analysis().write.format("delta").mode("overwrite").saveAsTable("gold.product_sales_analysis")
        create_customer_purchase_analysis().write.format("delta").mode("overwrite").saveAsTable("gold.customer_purchase_analysis")
        create_monthly_sales_trends().write.format("delta").mode("overwrite").saveAsTable("gold.monthly_sales_trends")
        create_returns_analysis().write.format("delta").mode("overwrite").saveAsTable("gold.returns_analysis")
        create_territory_performance().write.format("delta").mode("overwrite").saveAsTable("gold.territory_performance")
        
        print("Gold layer tables created successfully!")
        
    except Exception as e:
        print(f"Error creating gold layer tables: {str(e)}")
        raise

create_gold_tables()

## Role Based Access Controls

In [0]:
# List of users and their corresponding roles/schemas
user_permissions = {
    "data_engineer1@mukeshbackup09gmail.onmicrosoft.com": {
        "catalog": "accessmanagement_demo",
        "schemas": {
            "bronze": ["MODIFY", "SELECT", "CREATE", "USAGE"],
            "silver": ["MODIFY", "SELECT", "CREATE", "USAGE"],
            "gold": ["MODIFY", "SELECT", "CREATE", "USAGE"]
        }
    },
    "analyst1@mukeshbackup09gmail.onmicrosoft.com": {
        "catalog": "accessmanagement_demo",
        "schemas": {
            "silver": ["USAGE", "SELECT"],
            "gold": ["MODIFY", "SELECT", "CREATE", "USAGE"]
        }
    },
    "business_analyst1@mukeshbackup09gmail.onmicrosoft.com": {
        "catalog": "accessmanagement_demo",
        "schemas": {
            "gold": ["USAGE", "SELECT"]
        }
    },
    "sales_user1@mukeshbackup09gmail.onmicrosoft.com": {
        "catalog": "accessmanagement_demo",
        "schemas": {
            "gold": ["USAGE", "SELECT"]
        },
        "tables": [
            "gold.product_sales_analysis",
            "gold.customer_purchase_analysis"
        ]
    },
    "marketing_user1@mukeshbackup09gmail.onmicrosoft.com": {
        "catalog": "accessmanagement_demo",
        "schemas": {
            "gold": ["USAGE", "SELECT"]
        },
        "tables": [
            "gold.product_sales_analysis"
        ]
    }
}

# Grant permissions dynamically based on the defined structure
for user, permissions in user_permissions.items():
    catalog = permissions["catalog"]
    
    # Grant catalog level permissions
    spark.sql(f"GRANT USAGE ON CATALOG {catalog} TO `{user}`")
    
    # Iterate through schema permissions
    for schema, perms in permissions["schemas"].items():
        for perm in perms:
            spark.sql(f"GRANT {perm} ON SCHEMA {schema} TO `{user}`")

    # Iterate tables
    if "tables" in permissions:
        for table in permissions["tables"]:
            spark.sql(f"GRANT SELECT ON {table} TO `{user}`")


## Creating Views for more Security

In [0]:
%sql
CREATE OR REPLACE VIEW gold.secured_product_sales_analysis AS
SELECT *
FROM gold.product_sales_analysis
WHERE 
    CASE 
        WHEN CURRENT_USER() = 'sales_user1@mukeshbackup09gmail.onmicrosoft.com' THEN category_name IN ('Bikes')
        WHEN CURRENT_USER() = 'marketing_user1@mukeshbackup09gmail.onmicrosoft.com' THEN category_name IN ('Clothing', 'Accessories')
        ELSE TRUE
    END;


In [0]:
%sql
CREATE OR REPLACE VIEW gold.secured_customer_purchase_analysis AS
SELECT *
FROM gold.customer_purchase_analysis
WHERE 
    CASE 
        WHEN CURRENT_USER() = 'sales_user1@mukeshbackup09gmail.onmicrosoft.com' THEN region IN ('Germany')
        WHEN CURRENT_USER() = 'marketing_user1@mukeshbackup09gmail.onmicrosoft.com' THEN region IN ('Northwest')
        ELSE TRUE
    END;


In [0]:
%sql
CREATE OR REPLACE VIEW gold.secured_monthly_sales_trends AS
SELECT *
FROM gold.monthly_sales_trends
WHERE 
    CASE 
        WHEN CURRENT_USER() = 'sales_user1@mukeshbackup09gmail.onmicrosoft.com' THEN region IN ('Germany')
        WHEN CURRENT_USER() = 'marketing_user1@mukeshbackup09gmail.onmicrosoft.com' THEN region IN ('Northwest')
        ELSE TRUE
    END;


In [0]:
%sql
CREATE OR REPLACE VIEW gold.secured_territory_performance AS
SELECT *
FROM gold.territory_performance
WHERE 
    CASE 
        WHEN CURRENT_USER() = 'sales_user1@mukeshbackup09gmail.onmicrosoft.com' THEN region IN ('Germany')
        WHEN CURRENT_USER() = 'marketing_user1@mukeshbackup09gmail.onmicrosoft.com' THEN region IN ('Northwest')
        ELSE TRUE
    END;
