# Probabilistic Forecasting - Electricity

This notebook demonstrates how to perform Data Analysis and Preparation Engineering with Amazon SageMaker Studio using AWS Glue Interactive Session.

Using this notebook, we can execute cells in order to read data, visualize, and perform transformations using PySpark with AWS Glue Interactice Session.

Let's start preparing our dataset.

**SageMaker Studio Kernel**: SparkAnalytics 2.0 - Glue PySpark

***

# Dataset

The data set (Electricity Price Forecasting) was downloaded from [Kaggle](https://www.kaggle.com/code/dimitriosroussis/electricity-price-forecasting-with-dnns-eda/data).
This dataset is using the past values of the electricity price as well as those of another features related to energy generation and weather conditions

***

# Step 1 - Start Glue interactive session

Let's define the AWS Glue Interactive session, by using Magics command. A full list of commands can be explored by running `%help` command.

In [None]:
%session_id_prefix ts-electricity-forecasting-
%glue_version 3.0
%idle_timeout 60
%%configure 
{
    "--enable-spark-ui": "true",
    "--spark-event-logs-path": "s3://<BUCKET_NAME>/<BUCKET_PREFIX>/logs/"
}

By running this cell, we are creating an AWS Glue Interactive session.

In [None]:
catalog_name = ""
bucket_name = ""
bucket_prefix = ""
database_name = ""
table_name = "electricity"

***

# Step 2 - Import Modules

In [None]:
import boto3
import csv
import logging
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType, TimestampType
import seaborn as sns

In [None]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
pd.set_option("display.max_columns", None)

In [None]:
spark = SparkSession.builder \
    .config("spark.sql.legacy.timeParserPolicy", "CORRECTED") \
    .getOrCreate()

***

# Step 3 - Data Preparation

## Electricity Dataset

In [None]:
df_e = spark.read.csv(
    f"s3://{bucket_name}/{bucket_prefix}/data/input/energy_dataset.csv",
    header=True
)

In [None]:
df_e.show(10)

In [None]:
df_e = df_e.withColumn("time", F.to_timestamp("time", "yyyy-MM-dd HH:mm:ssVV"))

In [None]:
columns_to_drop = [
    'generation fossil coal-derived gas','generation fossil oil shale',
    'generation fossil peat',
    'generation geothermal',
    'generation hydro pumped storage aggregated',
    'generation marine',
    'generation wind offshore',
    'forecast wind offshore eday ahead',
    'total load forecast',
    'forecast solar day ahead',
    'forecast wind onshore day ahead']

df_e = df_e.drop(*columns_to_drop)

### Round all floats to a two decimal place

In [None]:
for column in df_e.schema.names:
    if column != "time":
        df_e = df_e.withColumn(column, df_e[column].cast(DoubleType()))
        df_e = df_e.withColumn(column, F.round(F.col(column), 2))

In [None]:
df_e.schema

### Check NaN and duplicate values in the dataset

In [None]:
df_e \
.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df_e.schema.names if c != "time"]) \
.toPandas()

In order to fill Null values in our energy demand distribution, we are using a linear interpolation.

In [None]:
def interpolate(pdf):
    pdf = pdf.set_index('time')
    pdf.interpolate(method='linear', limit_direction='forward', inplace=True, axis=0)
    pdf.reset_index(inplace=True)
    return pdf

In [None]:
df_e_p = df_e.toPandas()
df_e_p = interpolate(df_e_p)
df_e = spark.createDataFrame(df_e_p)

In [None]:
print((df_e.count(), len(df_e.columns)))

## Weather Dataset

In [None]:
df_w = spark.read.csv(
    f"s3://{bucket_name}/{bucket_prefix}/data/input/weather_features.csv",
    header=True
)

In [None]:
df_w.schema

Change column types to Double/Float

In [None]:
columns = ["city_name", "weather_id", "weather_main", "weather_description", "weather_icon", "dt_iso"]

for c in df_w.columns:
    if c not in columns:
        df_w = df_w.withColumn(c, df_w[c].cast(DoubleType()))

In [None]:
df_w = df_w.withColumn("time", F.to_timestamp("dt_iso", "yyyy-MM-dd HH:mm:ssVV"))

In [None]:
df_w = df_w.drop("dt_iso")

Now let's remove duplicates

In [None]:
df_w.distinct().groupby("city_name").count().show()

In [None]:
df_w = df_w.orderBy("time").coalesce(1).dropDuplicates(subset = ["city_name", "time"])

In [None]:
df_w.distinct().groupby("city_name").count().show()

Finally, let's join the two datasets

In [None]:
df_w_barcelona = df_w.filter(F.col("city_name") == " Barcelona")
df_w_bilbao = df_w.filter(F.col("city_name") == "Bilbao")
df_w_madrid = df_w.filter(F.col("city_name") == "Madrid")
df_w_seville = df_w.filter(F.col("city_name") == "Seville")
df_w_valencia = df_w.filter(F.col("city_name") == "Valencia")

df_w_barcelona = df_w_barcelona.select([F.col(c).alias(c + "_barcelona") for c in df_w_barcelona.columns]).drop("city_name_barcelona")
df_w_bilbao = df_w_bilbao.select([F.col(c).alias(c + "_bilbao") for c in df_w_bilbao.columns]).drop("city_name_bilbao")
df_w_madrid = df_w_madrid.select([F.col(c).alias(c + "_madrid") for c in df_w_madrid.columns]).drop("city_name_madrid")
df_w_seville = df_w_seville.select([F.col(c).alias(c + "_seville") for c in df_w_seville.columns]).drop("city_name_seville")
df_w_valencia = df_w_valencia.select([F.col(c).alias(c + "_valencia") for c in df_w_valencia.columns]).drop("city_name_valencia")

In [None]:
df_final = df_e.join(df_w_barcelona, df_e.time == df_w_barcelona.time_barcelona, how='full').drop("time_barcelona")
df_final = df_final.join(df_w_bilbao, df_e.time == df_w_bilbao.time_bilbao, how='full').drop("time_bilbao")
df_final = df_final.join(df_w_madrid, df_e.time == df_w_madrid.time_madrid, how='full').drop("time_madrid")
df_final = df_final.join(df_w_seville, df_e.time == df_w_seville.time_seville, how='full').drop("time_seville")
df_final = df_final.join(df_w_valencia, df_e.time == df_w_valencia.time_valencia, how='full').drop("time_valencia")

In [None]:
df_final.show(1)

In [None]:
print((df_final.count(), len(df_final.columns)))

# Step 4 - Write CSV

In [None]:
df_final.repartition(1).write \
    .format("com.databricks.spark.csv") \
    .mode('overwrite') \
    .option("quote", '"') \
    .option("header", True) \
    .option("sep", ",") \
    .option('encoding', 'UTF-8') \
    .save(f"s3://{bucket_name}/{bucket_prefix}/data/output/electricity_full",)

# Step 5 - Stop Session

In [None]:
%stop_session