# Probabilistic Forecasting - Electricity

This notebook demonstrates how to perform Data Analysis and Preparation Engineering with Amazon SageMaker Studio using AWS Glue Interactive Session.

Using this notebook, we can execute cells in order to read data, visualize, and perform transformations using PySpark with AWS Glue Interactice Session.

Let's start preparing our dataset.

In [None]:
%pip install -U -q sagemaker

***

# Dataset

The data set (Electricity Price Forecasting) was downloaded from [Kaggle](https://www.kaggle.com/code/dimitriosroussis/electricity-price-forecasting-with-dnns-eda/data).

This dataset is using the past values of the electricity price as well as those of another features related to energy generation and weather conditions

# Step 1 - Import Modules

Here we’ll import some libraries and define some variables.

In [None]:
import os

# os.environ["AWS_PROFILE"] = "<aws_profile>"

In [None]:
import boto3
import sagemaker
from sagemaker.modules.configs import (
    Compute,
    OutputDataConfig,
    SourceCode,
    StoppingCondition,
)
from sagemaker.modules.train import ModelTrainer

In [None]:
sagemaker_client = boto3.client("sagemaker")
s3_client = boto3.client("s3")

Create a SageMaker Session and save the default region and the execution role in some Python variables

In [None]:
sagemaker_session = sagemaker.Session()
region = boto3.session.Session().region_name
role = sagemaker.get_execution_role()

In [None]:
bucket_name = sagemaker_session.default_bucket()
default_prefix = sagemaker_session.default_bucket_prefix

bucket_name

***

# Step 2 - Prepare data and upload to S3

In [None]:
! python utils/syntetic_data_energy.py

In [None]:
! python utils/syntetic_data_weather.py

In [None]:
from pathlib import Path

output_dir = Path("./data/output")

for file_path in output_dir.rglob("*"):
    if file_path.is_file():
        # Create S3 key by replacing local path structure
        relative_path = file_path.relative_to(output_dir)
        s3_key = f"electricity-forecasting/data/input/{relative_path}"

        print(f"Uploading {file_path} to s3://{bucket_name}/{s3_key}")
        s3_client.upload_file(str(file_path), bucket_name, s3_key)

print("Upload complete!")

***

# Step 3 - Run the training job

By using [ModelTrainer](https://sagemaker.readthedocs.io/en/stable/api/training/model_trainer.html), we can provide to the Amazon SageMaker Job the execution PySpark scripts in distributed data processing mode

In [None]:
! pygmentize ./code/processing.py

## Global Parameters

In order to allow users to execute the SageMaker Processing Job locally, we are defining the variable `local_mode`. If you want to test the local mode capability, please put the variable to `True`

In [None]:
# Change spark_image_uri based on your region. Visit https://github.com/aws/sagemaker-spark-container/releases
spark_image_uri = "173754725891.dkr.ecr.us-east-1.amazonaws.com/sagemaker-spark-processing:3.5-cpu-py312-v1.0"

processing_code = "electricity-forecasting/code/processing"
processing_input_files_path = "electricity-forecasting/data/input"
processing_output_files_path = "electricity-forecasting/data/output"

processing_instance_count = 2
processing_instance_type = "ml.m5.12xlarge"

## Spark Configuration

In [None]:
spark_configurations = [
    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.executor.cores": 5,
            "spark.driver.cores": 5,
            "spark.executor.memory": "35g",
            "spark.executor.memoryOverhead": "3g",
            "spark.driver.memory": "35g",
            "spark.executor.instances": 17,
            "spark.sql.parquet.fs.optimized.comitter.optimization-enabled": True,
        },
    }
]

In [None]:
import json
import os
from sagemaker.s3 import S3Uploader

# Write spark_configurations to JSON file
with open("configuration.json", "w") as f:
    json.dump(spark_configurations, f)

# Upload to S3
if default_prefix:
    input_path = (
        f"s3://{bucket_name}/{default_prefix}/sagemaker-training-spark/configurations"
    )
else:
    input_path = f"s3://{bucket_name}/sagemaker-training-spark/configurations"

spark_config_s3_path = S3Uploader.upload(
    local_path="configuration.json", desired_s3_uri=f"{input_path}/config"
)

os.remove("configuration.json")

print(f"Spark config uploaded to:")
print(spark_config_s3_path)

Define the `ModelTrainer` object.

### Update:

From the container version `sagemaker-spark-processing:3.3-cpu-py39-v1.2`, SageMaker Spark Containers are providing an automated optimized Spark configuration. For using it, provide the environment variable `AWS_SPARK_CONFIG_MODE = "2"`

```
env={
    "AWS_SPARK_CONFIG_MODE": "2"
}
```

In [None]:
args = [
    "--local-spark-event-logs-dir",
    "/opt/ml/output/data/spark-events/",
    "/opt/ml/input/data/code/processing.py",
]

# Define the script to be run
source_code = SourceCode(
    source_dir="./code",
    requirements="requirements.txt",
    command=f"smspark-submit {' '.join(args)}"
)

# Define the compute
compute_configs = Compute(
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    keep_alive_period_in_seconds=0,
)

# define Training Job Name
job_name = f"process-spark-job"

# define OutputDataConfig path
if default_prefix:
    output_path = f"s3://{bucket_name}/{default_prefix}/{processing_output_files_path}"
else:
    output_path = f"s3://{bucket_name}/{processing_output_files_path}"

# Define the ModelTrainer
model_trainer = ModelTrainer(
    training_image=spark_image_uri,
    source_code=source_code,
    base_job_name=job_name,
    compute=compute_configs,
    stopping_condition=StoppingCondition(max_runtime_in_seconds=18000),
    environment={
        "IS_TRAINING_JOB": "true", 
        "AWS_SPARK_CONFIG_MODE": "2"
    },
    output_data_config=OutputDataConfig(
        s3_output_path=output_path, 
        compression_type="NONE"
    ),
)

In [None]:
from sagemaker.modules.configs import InputData, S3DataSource

# Pass the input data
train_input = InputData(
    channel_name="input",
    data_source=S3DataSource(
        s3_data_type="S3Prefix",
        s3_uri="s3://{}/{}/".format(bucket_name, processing_input_files_path),
        s3_data_distribution_type="ShardedByS3Key"
    ),
)

config_input = InputData(
    channel_name="conf",
    data_source=spark_config_s3_path,  # S3 path where training data is stored
)

# Check input channels configured
data = [train_input, config_input]
data

In [None]:
model_trainer.train(input_data_config=data, wait=False)