In [3]:
%%pyspark
from pyspark.sql.functions import col, lag, when, lit, sum as _sum, avg, max as _max
from pyspark.sql.window import Window

StatementMeta(TestCluster, 9, 2, Finished, Available, Finished)

In [4]:
%%pyspark

billing_data = spark.read.parquet('abfss://bronzelayerdata@energyscotish.dfs.core.windows.net/SQL_Data/billing', header=True, inferSchema=True)
outages_data = spark.read.parquet('abfss://bronzelayerdata@energyscotish.dfs.core.windows.net/SQL_Data/outages_data', header=True, inferSchema=True)
regions_data = spark.read.parquet('abfss://bronzelayerdata@energyscotish.dfs.core.windows.net/SQL_Data/regions', header=True, inferSchema=True)
customers_data = spark.read.parquet('abfss://bronzelayerdata@energyscotish.dfs.core.windows.net/SQL_Data/customers', header=True, inferSchema=True)
iot_bronze_data = spark.read.json('abfss://lendingforiotdata@energyscotish.dfs.core.windows.net/IOTData/customer_data.json')
Meter_Install_data = spark.read.parquet('abfss://bronzelayerdata@energyscotish.dfs.core.windows.net/SQL_Data/Meter_Install',header=True, inferSchema=True)


StatementMeta(TestCluster, 9, 3, Finished, Available, Finished)

**Transform Billing Data **

In [5]:
%%pyspark
billing_data.createOrReplaceTempView("billing_data")
customers_data.createOrReplaceTempView("customers_data")
regions_data.createOrReplaceTempView("regions_data")
iot_bronze_data.createOrReplaceTempView("IOT_DATA")
Meter_Install_data.createOrReplaceTempView("Meterid_customerid")

StatementMeta(TestCluster, 9, 4, Finished, Available, Finished)

In [39]:
%%sql
MERGE INTO delta.`abfss://silverlayerdata@energyscotish.dfs.core.windows.net/billing_silver` AS target
USING (
    WITH FilteredBillingData AS (
        SELECT *
        FROM billing_data
        WHERE 
            last_updated > (
                SELECT MAX(LastProcessedTimestamp)
                FROM MetadataTable
                WHERE TableName = 'billing_silver'
            )
            AND TotalConsumption > 0 
            AND TotalAmount > 0
    )
    SELECT
        b.BillID,
        b.CustomerID,
        c.FullName as CustomerName,
        b.BillingMonth,
        b.TotalConsumption,
        b.TotalAmount,
        b.PaymentStatus,
        (b.TotalAmount / b.TotalConsumption) AS AverageUnitCost,
        CASE 
            WHEN b.PaymentStatus = 'Unpaid' THEN b.TotalAmount
            ELSE 0.0
        END AS OutstandingAmount,
        r.RegionName,
       
        b.last_updated
    FROM 
        FilteredBillingData b
    LEFT JOIN 
        customers_data c 
    ON 
        b.CustomerID = c.CustomerID
    LEFT JOIN 
        regions_data r 
    ON 
        c.RegionID = r.RegionID
) AS source
ON target.BillID = source.BillID
WHEN MATCHED AND (
    target.CustomerID != source.CustomerID OR
    target.TotalAmount != source.TotalAmount OR
    target.last_updated < source.last_updated
) THEN 
    UPDATE SET
        target.CustomerID = source.CustomerID,
        target.CustomerName = source.CustomerName,
        target.BillingMonth = source.BillingMonth,
        target.TotalConsumption = source.TotalConsumption,
        target.TotalAmount = source.TotalAmount,
        target.PaymentStatus = source.PaymentStatus,
        target.AverageUnitCost = source.AverageUnitCost,
        target.OutstandingAmount = source.OutstandingAmount,
        target.RegionName = source.RegionName,
        
        target.last_updated = source.last_updated
WHEN NOT MATCHED THEN
    INSERT (
        BillID,
        CustomerID,
        CustomerName,
        BillingMonth,
        TotalConsumption,
        TotalAmount,
        PaymentStatus,
        AverageUnitCost,
        OutstandingAmount,
        RegionName,
        
        last_updated
    ) VALUES (
        source.BillID,
        source.CustomerID,
        source.CustomerName,
        source.BillingMonth,
        source.TotalConsumption,
        source.TotalAmount,
        source.PaymentStatus,
        source.AverageUnitCost,
        source.OutstandingAmount,
        source.RegionName,
       
        source.last_updated
    );


StatementMeta(TestCluster, 0, 39, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

Customer_Data

In [53]:
MERGE INTO delta.`abfss://silverlayerdata@energyscotish.dfs.core.windows.net/customer_silver` AS target
USING (
    SELECT
        c.CustomerID,
        c.FullName AS CustomerName,
        c.Email AS ContactEmail,  -- Rename for consistency
        c.ContactNumber AS ContactPhone,  -- Rename for consistency
        -- Apply address transformations: standardize abbreviations and trim spaces
        TRIM(REPLACE(REPLACE(REPLACE(REPLACE(c.Address, 'St', 'Street'), 'Ln', 'Lane'), 'Ave', 'Avenue'), 'Blvd', 'Boulevard')) AS Address,
        c.DateOfBirth,  -- Include DateOfBirth
        r.RegionID,
        c.last_updated
    FROM
        customers_data c
    LEFT JOIN
        regions_data r
    ON
        c.RegionID = r.RegionID
    WHERE
        c.last_updated > (
            SELECT MAX(LastProcessedTimestamp)
            FROM delta.`abfss://silverlayerdata@energyscotish.dfs.core.windows.net/metadata_table`
            WHERE TableName = 'customers_silver'
        )
) AS source
ON target.CustomerID = source.CustomerID
WHEN MATCHED THEN
    UPDATE SET
        target.CustomerName = source.CustomerName,
        target.ContactEmail = source.ContactEmail,
        target.ContactPhone = source.ContactPhone,
        target.Address = source.Address,  -- Updated address with transformation
        target.DateOfBirth = source.DateOfBirth,  -- Include DateOfBirth in the update
        target.RegionID = source.RegionID,
        target.last_updated = source.last_updated
WHEN NOT MATCHED THEN
    INSERT (
        CustomerID,
        CustomerName,
        ContactEmail,
        ContactPhone,
        Address,
        DateOfBirth,
        RegionID,
        last_updated
    )
    VALUES (
        source.CustomerID,
        source.CustomerName,
        source.ContactEmail,
        source.ContactPhone,
        source.Address,  -- Insert transformed address
        source.DateOfBirth,  -- Include DateOfBirth in the insert
        source.RegionID,
        source.last_updated
    );


StatementMeta(TestCluster, 0, 53, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

Region _Data

In [52]:
%%sql
MERGE INTO delta.`abfss://silverlayerdata@energyscotish.dfs.core.windows.net/regions_data` AS target
USING (
    SELECT
        r.RegionID,
        r.RegionName,
        r.State,
        r.ZipCode,
        r.last_updated
    FROM
        regions_data r
    WHERE
        r.last_updated > (
            SELECT MAX(LastProcessedTimestamp)
            FROM delta.`abfss://silverlayerdata@energyscotish.dfs.core.windows.net/metadata_table`
            WHERE TableName = 'regions_silver'
        )
) AS source
ON target.RegionID = source.RegionID
WHEN MATCHED THEN
    UPDATE SET
        target.RegionName = source.RegionName,
        target.State = source.State,
        target.ZipCode = source.ZipCode,
        target.update_date = source.last_updated
WHEN NOT MATCHED THEN
    INSERT (
        RegionID,
        RegionName,
        State,
        ZipCode,
        update_date 
    )
    VALUES (
        source.RegionID,
        source.RegionName,
        source.State,
        source.ZipCode,
        source.last_updated 
    );



StatementMeta(TestCluster, 3, 9, Finished, Cancelled, Cancelled)

IOT _Data Tranformation

In [40]:
-- Create the Silver Delta table with partitioning
CREATE TABLE IF NOT EXISTS silver_iot_data (
    MeterID STRING,
    Timestamp TIMESTAMP,
    Date DATE, -- Add a separate date column for partitioning
    ConsumptionKWH DOUBLE,
    DeviceStatus STRING,
    AnomalyFlag BOOLEAN
)
USING DELTA
PARTITIONED BY (Date) -- Use the raw `Date` column for partitioning
LOCATION 'abfss://silverlayerdata@energyscotish.dfs.core.windows.net/silver/iot_data';


StatementMeta(TestCluster, 2, 43, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [44]:
-- Merge incremental IoT data into the Silver Delta table
MERGE INTO silver_iot_data AS target
USING (
    SELECT
        MeterID,
        CAST(Timestamp AS TIMESTAMP) AS Timestamp,
        DATE(CAST(Timestamp AS TIMESTAMP)) AS Date, -- Extract date for partitioning
        COALESCE(ConsumptionKWH, 0) AS ConsumptionKWH,
        DeviceStatus,
        AnomalyFlag
    FROM IOT_DATA
) AS source
ON target.MeterID = source.MeterID AND target.Timestamp = source.Timestamp
WHEN MATCHED THEN
    UPDATE SET
        target.ConsumptionKWH = source.ConsumptionKWH,
        target.DeviceStatus = source.DeviceStatus,
        target.AnomalyFlag = source.AnomalyFlag
WHEN NOT MATCHED THEN
    INSERT (MeterID, Timestamp, Date, ConsumptionKWH, DeviceStatus, AnomalyFlag)
    VALUES (source.MeterID, source.Timestamp, source.Date, source.ConsumptionKWH, source.DeviceStatus, source.AnomalyFlag);



StatementMeta(TestCluster, 2, 47, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 4 fields>

Installation _Data

In [12]:
CREATE OR REPLACE TEMP VIEW cleaned_installation_data AS
SELECT DISTINCT
    CustomerID,
    MeterID,
    InstallationID
FROM Meterid_customerid
WHERE CustomerID IS NOT NULL AND MeterID IS NOT NULL AND InstallationID IS NOT NULL;


StatementMeta(TestCluster, 9, 13, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [13]:
CREATE TABLE silver_installation_data
USING DELTA
PARTITIONED BY (CustomerID)
LOCATION 'abfss://silverlayerdata@energyscotish.dfs.core.windows.net/silver_installation_data'
AS
SELECT
    CustomerID,
    MeterID,
    InstallationID,
    CURRENT_DATE() AS DataIngestionDate
FROM cleaned_installation_data;


StatementMeta(TestCluster, 9, 14, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>