In [0]:
from pyspark.sql import SparkSession
from delta import *

# Initialize Spark session with Delta support
spark = SparkSession.builder \
    .appName("DeltaLakeSetup") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("fs.azure.account.key.azddevstorage.dfs.core.windows.net", "YOUR_STORAGE_ACCOUNT_KEY_HERE") \
    .getOrCreate()




In [0]:
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType

# Define schemas
# Define the improved schema 
consumption_batch_schema = StructType([
    StructField("date", TimestampType(), True),
    StructField("location", StringType(), True),
    StructField("residential_consumption", FloatType(), True),
    StructField("commercial_consumption", FloatType(), True),
    StructField("industrial_consumption", FloatType(), True)
])

consumption_real_time_schema = StructType([
    StructField("time", TimestampType(), True),
    StructField("location", StringType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("country", StringType(), True),
    StructField("energy_consumption_kWh", FloatType(), True)
])

# Define the production schema (same for both batch and real-time)
production_schema = StructType([
    StructField("time", TimestampType(), True),
    StructField("location", StringType(), True),
    StructField("solar_energy", FloatType(), True),
    StructField("wind_energy", FloatType(), True),
    StructField("hydro_energy", FloatType(), True)
])

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, TimestampType
from delta import *



# Locations and paths for batch and real-time data
locations = ["Boston", "New York", "San Francisco", "Chicago"]
batch_data_base_path = "abfss://<container-name>@<storage-acc>.dfs.core.windows.net/raw-data/batch/"
real_time_data_base_path = "abfss://<container-name>@<storage-acc>.dfs.core.windows.net/raw-data/real-time/"
delta_base_path = "abfss://<container-name>@<storage-acc>.dfs.core.windows.net/delta/"

def process_data(location, data_type, data_category):
    try:
        # Determine the input path and schema based on data type and category (consumption/production)
        if data_type == "batch":
            if data_category == "consumption":
                input_path = f"{batch_data_base_path}{location}/consumption_batch_data/"
                schema = consumption_batch_schema
                output_path = f"{delta_base_path}batch/{location}/consumption_data/"
            elif data_category == "production":
                input_path = f"{batch_data_base_path}{location}/production_batch_data/"
                schema = production_schema
                output_path = f"{delta_base_path}batch/{location}/production_data/"
            print(f"[INFO] Processing {data_category.capitalize()} {data_type.capitalize()} data for location: {location}...")
        elif data_type == "real-time":
            if data_category == "consumption":
                input_path = f"{real_time_data_base_path}{location}/energy_consumption/"
                schema = consumption_real_time_schema
                output_path = f"{delta_base_path}real-time/{location}/energy_consumption_data/"
            elif data_category == "production":
                input_path = f"{real_time_data_base_path}{location}/production_simulation/"
                schema = production_schema
                output_path = f"{delta_base_path}real-time/{location}/production_simulation_data/"
            print(f"[INFO] Processing {data_category.capitalize()} {data_type.capitalize()} data for location: {location}...")
        else:
            print("[ERROR] Invalid data type. Please use 'batch' or 'real-time'.")
            return

        # Check if the input path exists for the specific data category
        print(f"[INFO] Checking paths for {data_category} {data_type} data in {location}...")

        files = dbutils.fs.ls(input_path)  # Using dbutils to list files in the input path

        # If path doesn't have files, skip processing and log the issue
        if not files:
            print(f"[ERROR] No files found in {input_path} for {data_category} {data_type} data in {location}.")
            return

        # Load the data for the given category
        df = spark.read.format("csv").option("header", "true").schema(schema).load(input_path)
        print(f"[INFO] Loaded {data_category} data from {input_path}.")

        # Write the data to Delta Lake
        print(f"[INFO] Writing {data_category} data to Delta Lake at {output_path}")
        df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("location").save(output_path)
        print(f"[INFO] {data_category.capitalize()} data successfully written to Delta Lake at {output_path}.")

    except Exception as e:
        print(f"[ERROR] Error processing {data_category} {data_type} data for {location}: {e}")
        return

# Loop through locations and process batch and real-time data for both consumption and production
for location in locations:
    for data_type in ["batch", "real-time"]:
        for data_category in ["consumption", "production"]:
            process_data(location, data_type, data_category)
        print("**"*20)

print("[INFO] All data processed and stored in Delta Lake.")


[INFO] Processing Consumption Batch data for location: Boston...
[INFO] Checking paths for consumption batch data in Boston...
[INFO] Loaded consumption data from abfss://energy-datastore@azddevstorage.dfs.core.windows.net/raw-data/batch/Boston/consumption_batch_data/.
[INFO] Writing consumption data to Delta Lake at abfss://energy-datastore@azddevstorage.dfs.core.windows.net/delta/batch/Boston/consumption_data/
[INFO] Consumption data successfully written to Delta Lake at abfss://energy-datastore@azddevstorage.dfs.core.windows.net/delta/batch/Boston/consumption_data/.
[INFO] Processing Production Batch data for location: Boston...
[INFO] Checking paths for production batch data in Boston...
[INFO] Loaded production data from abfss://energy-datastore@azddevstorage.dfs.core.windows.net/raw-data/batch/Boston/production_batch_data/.
[INFO] Writing production data to Delta Lake at abfss://energy-datastore@azddevstorage.dfs.core.windows.net/delta/batch/Boston/production_data/
[INFO] Product