# Probabilistic Forecasting - Electricity

This notebook demonstrates how to perform Data Analysis and Preparation Engineering with Amazon SageMaker Studio using AWS Glue Interactive Session.

Using this notebook, we can execute cells in order to read data, visualize, and perform transformations using PySpark with AWS Glue Interactice Session.

Let's start preparing our dataset.

**SageMaker Studio Kernel**: DataScience 3.0 - Python3

***

# Dataset

The data set (Electricity Price Forecasting) was downloaded from [Kaggle](https://www.kaggle.com/code/dimitriosroussis/electricity-price-forecasting-with-dnns-eda/data).

This dataset is using the past values of the electricity price as well as those of another features related to energy generation and weather conditions

# Step 1 - Import Modules

Here we’ll import some libraries and define some variables.

In [1]:
import os

os.environ["AWS_PROFILE"] = "bpistone-mlops-dev"

In [2]:
import boto3
import sagemaker
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.spark.processing import PySparkProcessor

In [3]:
sagemaker_client = boto3.client("sagemaker")
s3_client = boto3.client("s3")

Create a SageMaker Session and save the default region and the execution role in some Python variables

In [5]:
sagemaker_session = sagemaker.Session()
region = boto3.session.Session().region_name
role = "arn:aws:iam::691148928602:role/mlops-sagemaker-execution-role"

In [6]:
bucket_name = sagemaker_session.default_bucket()

***

# Step 2 - Upload Python Scripts

In [7]:
script_location = "electricity-forecasting/code/processing"

In [8]:
# Download the
# clean the buckets first
s3_client.delete_object(Bucket=bucket_name, Key=script_location)

code_path = sagemaker_session.upload_data('./code', key_prefix=script_location)

code_path

's3://sagemaker-eu-west-1-691148928602/electricity-forecasting/code/processing'

***

# Step 2 - Run the processing job

By using [PySparkProcessor](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker_processing/spark_distributed_data_processing/sagemaker-spark-processing.html), we can provide to the Amazon SageMaker Job the execution PySpark scripts in distributed data processing mode

In [9]:
! pygmentize ./code/processing.py

[34mimport[39;49;00m [04m[36margparse[39;49;00m
[34mimport[39;49;00m [04m[36mcsv[39;49;00m
[34mimport[39;49;00m [04m[36mlogging[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m [34mimport[39;49;00m SparkSession
[34mimport[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mfunctions[39;49;00m [34mas[39;49;00m [04m[36mF[39;49;00m
[34mfrom[39;49;00m [04m[36mpyspark[39;49;00m[04m[36m.[39;49;00m[04m[36msql[39;49;00m[04m[36m.[39;49;00m[04m[36mtypes[39;49;00m [34mimport[39;49;00m DoubleType, TimestampType
[34mimport[39;49;00m [04m[36mscripts[39;49;00m[04m[36m.[39;49;00m[04m[36mservices[39;49;00m[04m[36m.[39;49;00m[04m[36mHDFSManager[39;49;00m

BASE_PATH = os.path.join([33m"[39;49;00m[33m/[39;49;00m[33m"[39;49;00m, [33m"[39;49;00m[33mopt[39;49;00m[

## Global Parameters

In order to allow users to execute the SageMaker Processing Job locally, we are defining the variable `local_mode`. If you want to test the local mode capability, please put the variable to `True`

In [10]:
local_mode = False

In [11]:
processing_image_uri = "{}.dkr.ecr.{}.amazonaws.com/sagemaker-spark-custom-container:latest".format(boto3.client("sts").get_caller_identity()["Account"], region)
processing_code = "electricity-forecasting/code/processing"
processing_input_files_path = "electricity-forecasting/data/input"
processing_instance_count = 2
processing_output_files_path = "electricity-forecasting/data/output"

if local_mode:
    processing_instance_type = "local"
else:
    processing_instance_type = "ml.m5.xlarge"

Define the `PySparkProcessor` object

In [12]:
processor = PySparkProcessor(
    image_uri=processing_image_uri,
    role=role,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
    sagemaker_session=sagemaker_session
)

In [13]:
run_args = processor.get_run_args(
        "processing.py",
        inputs=[
            ProcessingInput(
                input_name="input",
                source="s3://{}/{}".format(bucket_name, processing_input_files_path),
                destination="/opt/ml/processing/input"
            ),
            ProcessingInput(
                input_name="scripts",
                source="s3://{}/{}".format(bucket_name, processing_code),
                destination="/opt/ml/processing/input/code/scripts"
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name="output",
                source="/opt/ml/processing/output",
                destination="s3://{}/{}".format(bucket_name, processing_output_files_path))
        ]
    )

This function has been deprecated and could break pipeline step caching. We recommend using the run() function directly with pipeline sessionsto access step arguments.


In [14]:
processor.run(
    "./code/processing.py",
    arguments=[
        "--copy_hdfs",
        "1",
        "--bucket_name",
        bucket_name,
        "--processing_input_files_path",
        processing_input_files_path,
        "--processing_output_files_path",
        processing_output_files_path
    ],
    inputs=run_args.inputs,
    outputs=run_args.outputs,
    spark_event_logs_s3_uri="s3://{}/electricity-forecasting/logs".format(bucket_name),
    wait=True
)

INFO:sagemaker:Creating processing-job with name sagemaker-spark-custom-container-2023-02-07-11-40-06-788


.........................[35m02-07 11:44 smspark.cli  INFO     Parsing arguments. argv: ['/usr/local/bin/smspark-submit', '--local-spark-event-logs-dir', '/opt/ml/processing/spark-events/', '/opt/ml/processing/input/code/processing.py', '--copy_hdfs', '1', '--bucket_name', 'sagemaker-eu-west-1-691148928602', '--processing_input_files_path', 'electricity-forecasting/data/input', '--processing_output_files_path', 'electricity-forecasting/data/output'][0m
[35m02-07 11:44 smspark.cli  INFO     Raw spark options before processing: {'class_': None, 'jars': None, 'py_files': None, 'files': None, 'verbose': False}[0m
[35m02-07 11:44 smspark.cli  INFO     App and app arguments: ['/opt/ml/processing/input/code/processing.py', '--copy_hdfs', '1', '--bucket_name', 'sagemaker-eu-west-1-691148928602', '--processing_input_files_path', 'electricity-forecasting/data/input', '--processing_output_files_path', 'electricity-forecasting/data/output'][0m
[35m02-07 11:44 smspark.cli  INFO     Rendered 