#**Introduction**

- This notebook completes the task 3 by monitoring the data ingestion pipeline scripts of task1 and data transformation scripts which save the data in a csv files which can be later used for BI/ML use case (task2)

Note: This notebook is built using google colab can be run directly on it.

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark==3.5.1 delta-spark==3.2.0
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Ign:7 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:8 https://r2u.stat.illinois.edu/ubuntu jammy Release
Hit:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list en

# Public Power Data Ingestion Pipeline Monitoring

In [8]:
import requests
import json
from datetime import datetime, timedelta
import time
import pytz
import logging
import os

# Configure logging to file only (optimized for Colab)
log_filename = '/content/public_power_data_pipeline_monitoring.log'

# Remove any existing handlers to avoid conflicts
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Set up logging configuration for FileHandler only
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_filename, mode='w')  # Overwrite log file each run
    ]
)

def fetch_public_power_data(start_time, end_time, country='de'):
    url = "https://api.energy-charts.info/public_power"
    params = {'country': country, 'start': start_time, 'end': end_time}

    try:
        response = requests.get(url, params=params)
        response.raise_for_status()  # Raise an error for unsuccessful status codes
        data = response.json()

        if not data:
            raise ValueError("Received empty data.")

        required_fields = ["unix_seconds", "production_types"]
        for field in required_fields:
            if field not in data:
                raise ValueError(f"Missing expected field: {field}")

        return data

    except requests.exceptions.RequestException as e:
        logging.error(f"Request failed: {e}")
        return None
    except ValueError as ve:
        logging.warning(f"Data quality issue: {ve}")
        return None

def save_data_to_file(data, filename, is_last_entry=False):
    with open(filename, 'a') as json_file:
        json.dump(data, json_file, indent=4)
        if not is_last_entry:
            json_file.write(",\n")

def ingest_data(frequency='15min', backfill_start=None, backfill_end=None, filename='public_power_data.json'):
    now = datetime.now()
    tz = pytz.timezone('Europe/Berlin')

    with open(filename, 'w') as json_file:
        json_file.write("[\n")

    if backfill_start and backfill_end:
        current_time = backfill_start.astimezone(tz)

        while current_time < backfill_end.astimezone(tz):
            next_time = current_time + timedelta(minutes=15)
            is_last_entry = next_time >= backfill_end.astimezone(tz)
            start_time = current_time.isoformat()
            end_time = next_time.isoformat()

            try:
                start = time.time()
                data = fetch_public_power_data(start_time=start_time, end_time=end_time)
                if data:
                    save_data_to_file(data, filename, is_last_entry)
                    logging.info(f"Data ingested for {start_time} to {end_time}")

                duration = time.time() - start
                logging.info(f"Ingestion time for interval: {duration:.2f} seconds")

            except Exception as e:
                logging.error(f"Ingestion error at interval {start_time} - {end_time}: {e}")

            current_time = next_time

        with open(filename, 'a') as json_file:
            json_file.write("\n]")

    else:
        today = now.strftime('%Y-%m-%d')
        start_time = today
        current_time = now.replace(hour=0, minute=0, second=0, microsecond=0).astimezone(tz)
        end_time = now.astimezone(tz)

        while current_time < end_time:
            next_time = current_time + timedelta(minutes=15)
            start_time = current_time.isoformat()
            end_time_interval = next_time.isoformat()

            try:
                start = time.time()
                data = fetch_public_power_data(start_time=start_time, end_time=end_time_interval)
                if data:
                    save_data_to_file(data, filename, is_last_entry=(next_time >= end_time))
                    logging.info(f"Real-time data ingested for {start_time} to {end_time_interval}")

                duration = time.time() - start
                logging.info(f"Ingestion time for interval: {duration:.2f} seconds")

            except Exception as e:
                logging.error(f"Ingestion error for interval {start_time} - {end_time_interval}: {e}")

            current_time = next_time
            time.sleep(15 * 60)

        with open(filename, 'a') as json_file:
            json_file.write("\n]")

    # Explicitly flush and close handlers to ensure logs are written
    for handler in logging.root.handlers:
        handler.flush()
        handler.close()

# Example usage:
mode = input("Enter mode (realtime/backfill): ").strip().lower()

if mode == "backfill":
    backfill_start = datetime(2024, 10, 15, 0, 0)
    backfill_end = datetime(2024, 10, 15, 23, 59)
    ingest_data(backfill_start=backfill_start, backfill_end=backfill_end, filename='/content/public_power_data.json')

elif mode == "realtime":
    ingest_data(filename='/content/real_time_power_data.json')
else:
    print("Invalid mode. Please enter 'realtime' or 'backfill'.")


Enter mode (realtime/backfill): backfill


# Public Power Data Transformation Pipeline Monitoring


In [10]:
import logging
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, from_unixtime, expr
from pyspark.sql.types import StructType, StructField, ArrayType, LongType, StringType, FloatType, BooleanType

# Configure logging
log_filename = '/content/public_power_data_transformation_pipeline_monitoring.log'

# Remove any existing handlers to avoid conflicts
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Set up logging configuration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[logging.FileHandler(log_filename, mode='w')]
)

# Initialize Spark session with Delta Lake support and required configurations
spark = (
    SparkSession.builder
    .appName("Public Power Data Processing with Data Quality Checks")
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
    .config("spark.sql.crossJoin.enabled", "true")
    .getOrCreate()
)

try:
    # Load public power data from JSON file without schema
    start_time = time.time()
    public_power_df = spark.read.option("multiline", "true").json("/content/public_power_data.json")
    load_duration = time.time() - start_time
    logging.info(f"Loaded JSON data in {load_duration:.2f} seconds with {public_power_df.count()} rows.")

    # Initial schema and content check
    public_power_df.printSchema()
    public_power_df.show(truncate=False)

    # Filter out deprecated records
    start_time = time.time()
    public_power_filtered = public_power_df.filter(~col("deprecated"))
    filter_duration = time.time() - start_time
    logging.info(f"Filtered deprecated records in {filter_duration:.2f} seconds. Row count: {public_power_filtered.count()}")

    # Explode the production_types and combine with unix_seconds
    start_time = time.time()
    public_power_exploded = public_power_filtered.select(
        explode("production_types").alias("production_type"),
        "unix_seconds"
    )
    explode_duration = time.time() - start_time
    logging.info(f"Exploded production_types in {explode_duration:.2f} seconds. Row count: {public_power_exploded.count()}")

    # Pair unix_seconds with production data using zip_with
    start_time = time.time()
    public_power_transformed = public_power_exploded.select(
        col("production_type.name").alias("production_type_name"),
        expr("zip_with(unix_seconds, production_type.data, (x, y) -> struct(x, y))").alias("unix_and_data")
    ).select(
        explode("unix_and_data").alias("unix_data"),
        "production_type_name"
    ).select(
        from_unixtime(col("unix_data.x")).alias("timestamp"),
        "production_type_name",
        col("unix_data.y").alias("production_value")
    )
    transform_duration = time.time() - start_time
    logging.info(f"Transformed data in {transform_duration:.2f} seconds. Row count: {public_power_transformed.count()}")

    # Data quality check: filter out rows with null production_value
    start_time = time.time()
    public_power_cleaned = public_power_transformed.filter(col("production_value").isNotNull())
    clean_duration = time.time() - start_time
    logging.info(f"Filtered null production values in {clean_duration:.2f} seconds. Row count: {public_power_cleaned.count()}")

    # Save the cleaned DataFrame as a CSV file
    csv_file_path = "/content/public_power_cleaned.csv"
    start_time = time.time()
    public_power_cleaned.write.format("csv").mode("overwrite").option("header", "true").save(csv_file_path)
    save_duration = time.time() - start_time
    logging.info(f"Saved cleaned data as CSV in {save_duration:.2f} seconds at {csv_file_path}")

except Exception as e:
    logging.error(f"Error in data processing pipeline: {e}")
finally:
    # Ensure resources are released
    spark.stop()

# Check log file contents
with open(log_filename, 'r') as log_file:
    print(log_file.read())


root
 |-- deprecated: boolean (nullable = true)
 |-- production_types: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- data: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |-- unix_seconds: array (nullable = true)
 |    |-- element: long (containsNull = true)

+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

# Price Data Ingestion Pipeline Monitoring

In [13]:
import requests
import json
from datetime import datetime, timedelta
import pytz
import logging
import os

# Configure logging to file and console
log_filename = '/content/price_data_pipeline_monitoring.log'

# Ensure the directory exists for the log file
if not os.path.exists(os.path.dirname(log_filename)):
    os.makedirs(os.path.dirname(log_filename))

# Remove any existing handlers to avoid conflicts
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Set up logging configuration for both FileHandler and StreamHandler
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_filename, mode='w'),  # Overwrite log file each run
        logging.StreamHandler()  # Output to console
    ]
)

def fetch_price_data(start_date, end_date, bzn='DE-LU'):
    url = "https://api.energy-charts.info/price"

    params = {
        'bzn': bzn,
        'start': start_date,
        'end': end_date
    }

    try:
        response = requests.get(url, params=params)
        response.raise_for_status()  # Raise an error for unsuccessful status codes
        return response.json()
    except requests.exceptions.RequestException as e:
        logging.error(f"Failed to fetch data for {start_date} to {end_date}: {e}")
        return None

def save_price_data_to_file(data, filename, is_last_entry=False):
    with open(filename, 'a') as json_file:
        json.dump(data, json_file, indent=4)
        if not is_last_entry:
            json_file.write(",\n")

def ingest_price_data(frequency='daily', start_date=None, end_date=None, filename='price_data.json'):
    # Set timezone to Europe/Berlin for automatic handling of CET and CEST
    tz = pytz.timezone('Europe/Berlin')

    # Start the JSON array in the file
    with open(filename, 'w') as json_file:
        json_file.write("[\n")

    entries = []  # List to store all entries for validation

    if start_date and end_date:
        # Backfill ingestion with specified start and end dates
        start_date_obj = datetime.strptime(start_date, '%Y-%m-%d').astimezone(tz)
        end_date_obj = datetime.strptime(end_date, '%Y-%m-%d').astimezone(tz)

        current_date = start_date_obj

        while current_date <= end_date_obj:
            try:
                start = time.time()  # Start timing the fetch
                data = fetch_price_data(start_date=current_date.strftime('%Y-%m-%d'), end_date=current_date.strftime('%Y-%m-%d'))

                if data:
                    entries.append(data)  # Collect data for later validation
                    save_price_data_to_file(data, filename, is_last_entry=(current_date == end_date_obj))
                    logging.info(f"Price data ingested for {current_date.strftime('%Y-%m-%d')}")
                else:
                    logging.warning(f"No data returned for {current_date.strftime('%Y-%m-%d')}.")

                duration = time.time() - start
                logging.info(f"Ingestion time for {current_date.strftime('%Y-%m-%d')}: {duration:.2f} seconds")

            except Exception as e:
                logging.error(f"Ingestion error for {current_date.strftime('%Y-%m-%d')}: {e}")

            current_date += timedelta(days=1)  # Increment the date by one day

    else:
        # Default to today's date for backfill
        today = datetime.now(tz)
        start_date_obj = today.replace(hour=0, minute=0, second=0, microsecond=0)
        end_date_obj = today.replace(hour=23, minute=59, second=59, microsecond=999999)

        current_date = start_date_obj

        while current_date <= end_date_obj:
            try:
                start = time.time()  # Start timing the fetch
                data = fetch_price_data(start_date=current_date.strftime('%Y-%m-%d'), end_date=current_date.strftime('%Y-%m-%d'))

                if data:
                    entries.append(data)  # Collect data for later validation
                    save_price_data_to_file(data, filename, is_last_entry=(current_date == end_date_obj))
                    logging.info(f"Real-time price data ingested for {current_date.strftime('%Y-%m-%d')}")

                duration = time.time() - start
                logging.info(f"Ingestion time for {current_date.strftime('%Y-%m-%d')}: {duration:.2f} seconds")

            except Exception as e:
                logging.error(f"Ingestion error for {current_date.strftime('%Y-%m-%d')}: {e}")

            current_date += timedelta(days=1)  # Increment the date by one day

    # Close the JSON array at the end
    with open(filename, 'a') as json_file:
        json_file.write("\n]")  # Write the closing bracket

    # Validate JSON format
    validate_json_file(filename)

def validate_json_file(filename):
    """ Check if the JSON file is properly formatted. """
    with open(filename, 'r') as json_file:
        try:
            json_file.seek(0)  # Move to the start of the file
            json.load(json_file)  # Try loading the JSON to validate
            logging.info("JSON format is valid.")
        except json.JSONDecodeError as e:
            logging.error(f"JSON format is invalid: {e}")

# Example usage:
mode = input("Enter mode (backfill) or press Enter for backfill: ").strip().lower()

if mode == "backfill" or mode == "":
    start_date = input("Enter start date (YYYY-MM-DD) or press Enter for today: ").strip() or None
    end_date = input("Enter end date (YYYY-MM-DD) or press Enter for today: ").strip() or None

    # Run in backfill ingestion mode
    ingest_price_data(start_date=start_date, end_date=end_date, filename='/content/price_data.json')

else:
    logging.warning("Invalid mode. Defaulting to backfill.")
    start_date = input("Enter start date (YYYY-MM-DD) or press Enter for today: ").strip() or None
    end_date = input("Enter end date (YYYY-MM-DD) or press Enter for today: ").strip() or None

    # Run in backfill ingestion mode
    ingest_price_data(start_date=start_date, end_date=end_date, filename='/content/price_data.json')


Enter mode (backfill) or press Enter for backfill: backfill
Enter start date (YYYY-MM-DD) or press Enter for today: 2024-10-15
Enter end date (YYYY-MM-DD) or press Enter for today: 2024-10-15


2024-10-28 19:07:13,228 - INFO - Price data ingested for 2024-10-15
2024-10-28 19:07:13,229 - INFO - Ingestion time for 2024-10-15: 1.25 seconds
2024-10-28 19:07:13,232 - INFO - JSON format is valid.


# Price Data Transformation Pipeline Monitoring

In [14]:
from pyspark.sql import SparkSession
import logging
import os

# Configure logging to file and console
log_filename = '/content/price_data_transformation_pipeline.log'

# Ensure the directory exists for the log file
if not os.path.exists(os.path.dirname(log_filename)):
    os.makedirs(os.path.dirname(log_filename))

# Remove any existing handlers to avoid conflicts
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Set up logging configuration for both FileHandler and StreamHandler
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_filename, mode='w'),  # Overwrite log file each run
        logging.StreamHandler()  # Output to console
    ]
)

# Initialize Spark session with Delta Lake support and required configurations
spark = (
    SparkSession.builder
    .appName("Price Data Cleaning")
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
    .config("spark.sql.crossJoin.enabled", "true")
    .getOrCreate()
)

try:
    # Load the price data from a JSON file
    price_data_df = spark.read.option("multiline", "true").json("/content/price_data.json")
    logging.info("Loaded price data from JSON file.")

    initial_count = price_data_df.count()
    logging.info(f"Initial DataFrame count: {initial_count}")
    price_data_df.show(truncate=False)

    # Clean the data
    # Filter out deprecated entries and rows with null values in 'price', 'unit', or 'unix_seconds'
    from pyspark.sql.functions import col, from_unixtime, posexplode

    price_cleaned = price_data_df.filter(
        (col("deprecated") == False) &
        (col("price").isNotNull()) &
        (col("unit").isNotNull()) &
        (col("unix_seconds").isNotNull())
    )
    logging.info("Filtered out deprecated entries and null values.")

    # Use posexplode to maintain the relationship between unix_seconds and price
    price_exploded = price_cleaned.select(
        "license_info",
        posexplode(col("unix_seconds")).alias("pos", "unix_seconds"),
        col("price"),
        "unit",
        "deprecated"
    )
    logging.info("Exploded price data to maintain relationship between unix_seconds and price.")

    # Select the corresponding price using the position index
    price_exploded = price_exploded.select(
        "license_info",
        col("unix_seconds"),
        col("price")[col("pos")].alias("price"),  # Match price with the same position
        "unit",
        "deprecated"
    )

    # Convert Unix timestamps to a readable timestamp format
    price_exploded = price_exploded.withColumn("timestamp", from_unixtime(col("unix_seconds")))
    logging.info("Converted Unix timestamps to readable timestamp format.")

    # Select relevant columns
    price_cleaned_final = price_exploded.select(
        "license_info",
        "price",
        "unit",
        "deprecated",
        "timestamp"
    )

    # Specify the path where the cleaned CSV file will be stored
    csv_file_path = "/content/price_cleaned.csv"

    # Save the cleaned DataFrame as a CSV file
    price_cleaned_final.write.format("csv").mode("overwrite").option("header", "true").save(csv_file_path)
    logging.info(f"Cleaned price data CSV file saved at: {csv_file_path}")

except Exception as e:
    logging.error(f"Error occurred during data transformation: {e}")

finally:
    # Stop the Spark session
    spark.stop()
    logging.info("Spark session stopped.")


2024-10-28 19:10:40,029 - INFO - Loaded price data from JSON file.
2024-10-28 19:10:40,362 - INFO - Initial DataFrame count: 1
2024-10-28 19:10:41,084 - INFO - Filtered out deprecated entries and null values.
2024-10-28 19:10:41,160 - INFO - Exploded price data to maintain relationship between unix_seconds and price.


+----------+---------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|deprecated|license_info                                                                     |price                                                                                                                                                                       |unit               |unix_seconds                                                                                                                            

2024-10-28 19:10:41,316 - INFO - Converted Unix timestamps to readable timestamp format.
2024-10-28 19:10:42,152 - INFO - Cleaned price data CSV file saved at: /content/price_cleaned.csv
2024-10-28 19:10:42,317 - INFO - Spark session stopped.


# Installed Power Ingestion Pipeline Monitoring

In [15]:
import requests
import json
import logging
import os
from datetime import datetime

# Configure logging to file and console
log_filename = '/content/installed_power_data_ingestion_pipeline_monitoring.log'

# Ensure the directory exists for the log file
if not os.path.exists(os.path.dirname(log_filename)):
    os.makedirs(os.path.dirname(log_filename))

# Remove any existing handlers to avoid conflicts
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Set up logging configuration for both FileHandler and StreamHandler
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_filename, mode='w'),  # Overwrite log file each run
        logging.StreamHandler()  # Output to console
    ]
)

def fetch_installed_power_data(country='de', time_step='monthly', installation_decommission=False):
    url = "https://api.energy-charts.info/installed_power"

    # Parameters for the API request
    params = {
        'country': country,
        'time_step': time_step,
        'installation_decommission': str(installation_decommission).lower()  # Convert boolean to lowercase string
    }

    try:
        response = requests.get(url, params=params)
        response.raise_for_status()  # Raise an error for unsuccessful status codes
        logging.info("Successfully fetched installed power data.")
        return response.json()
    except requests.exceptions.RequestException as e:
        logging.error(f"Failed to fetch data: {e}")
        raise

def save_data_to_file(data, filename, is_last_entry=False):
    # Open the file in append mode
    with open(filename, 'a') as json_file:
        # Convert data to JSON and write it
        json.dump(data, json_file, indent=4)

        if not is_last_entry:
            json_file.write(",\n")  # Add a comma only if it's not the last entry

def ingest_installed_power_data(time_step='monthly', installation_decommission=False, filename='installed_power_data.json'):
    # Start the JSON array in the file
    with open(filename, 'w') as json_file:
        json_file.write("[\n")

    try:
        # Fetch the installed power data
        data = fetch_installed_power_data(
            country='de',
            time_step=time_step,
            installation_decommission=installation_decommission
        )

        # Save the data to the file
        save_data_to_file(data, filename, is_last_entry=True)
        logging.info(f"Data ingested successfully for {time_step} with installation_decommission={installation_decommission}.")

    except Exception as e:
        logging.error(f"Error during ingestion: {e}")

    # Close the JSON array after the data is written
    with open(filename, 'a') as json_file:
        json_file.write("\n]")

# Example usage:
# For monthly ingestion
ingest_installed_power_data(time_step='monthly', installation_decommission=False, filename='installed_power_data_monthly.json')

# For yearly ingestion, change time_step to 'yearly'
# ingest_installed_power_data(time_step='yearly', installation_decommission=False, filename='installed_power_data_yearly.json')


2024-10-28 19:21:12,900 - INFO - Successfully fetched installed power data.
2024-10-28 19:21:12,905 - INFO - Data ingested successfully for monthly with installation_decommission=False.


# Installed Power Transformation Pipeline Monitoring

In [20]:
import os
import logging
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Configure logging to file and console
log_filename = '/content/installed_power_transformation_pipeline_monitoring.log'

# Ensure the directory exists for the log file
if not os.path.exists(os.path.dirname(log_filename)):
    os.makedirs(os.path.dirname(log_filename))

# Remove any existing handlers to avoid conflicts
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Set up logging configuration for both FileHandler and StreamHandler
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(log_filename, mode='w'),  # Overwrite log file each run
        logging.StreamHandler()  # Output to console
    ]
)

# Initialize Spark session with Delta Lake support and required configurations
spark = (
    SparkSession.builder
    .appName("Installed Power Data Cleaning")
    .master("local[*]")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore")
    .config("spark.sql.crossJoin.enabled", "true")
    .getOrCreate()
)

# Load the installed power data from a JSON file
installed_power_data_df = spark.read.option("multiline", "true").json("/content/installed_power_data_monthly.json")

# Display the initial DataFrame count and data
initial_count = installed_power_data_df.count()
logging.info(f"Initial DataFrame count: {initial_count}")
installed_power_data_df.show(truncate=False)

# Clean the data
# Filter out deprecated entries
installed_power_cleaned = installed_power_data_df.filter(
    F.col("deprecated") == False
)

# Log the count after cleaning
cleaned_count = installed_power_cleaned.count()
logging.info(f"Cleaned DataFrame count (deprecated removed): {cleaned_count}")

# First explode the production_types to get individual rows for each production type
production_exploded = installed_power_cleaned.select(
    F.explode(F.col("production_types")).alias("production_type")
)

# Now, join back with the original DataFrame to get the time information
# Explode the time and capture the index
time_exploded = installed_power_cleaned.select(
    F.posexplode(F.col("time")).alias("pos", "time")
)

# Join the two exploded DataFrames
final_cleaned_data = production_exploded.join(
    time_exploded,
    production_exploded["production_type.data"].getItem(F.col("pos")).isNotNull()
)

# Use the 'pos' index to align each 'data' entry with the correct 'time'
final_cleaned_data = final_cleaned_data.select(
    F.col("time"),
    F.col("production_type.name").alias("production_name"),
    F.col("production_type.data").getItem(F.col("pos")).alias("production_value")
)

# Filter out null values in production_value
final_cleaned_data = final_cleaned_data.filter(
    F.col("production_value").isNotNull()
)

# Log the count after final cleaning
final_count = final_cleaned_data.count()
logging.info(f"Final cleaned DataFrame count (null values removed): {final_count}")

# Specify the path where the cleaned CSV file will be stored
csv_file_path = "/content/installed_power_cleaned.csv"

# Save the cleaned DataFrame as a CSV file
final_cleaned_data.write.format("csv").mode("overwrite").option("header", "true").save(csv_file_path)

# Log the success message with file path
logging.info(f"Cleaned installed power data CSV file saved at: {csv_file_path}")


2024-10-28 19:28:44,769 - INFO - Initial DataFrame count: 1
2024-10-28 19:28:45,200 - INFO - Cleaned DataFrame count (deprecated removed): 1


+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

2024-10-28 19:28:46,060 - INFO - Final cleaned DataFrame count (null values removed): 923
2024-10-28 19:28:46,608 - INFO - Cleaned installed power data CSV file saved at: /content/installed_power_cleaned.csv
