In [1]:
import copy
from pathlib import Path
import warnings

import numpy as np
import pandas as pd
import pytorch_lightning as pl
from pytorch_lightning.callbacks import EarlyStopping, LearningRateMonitor
from pytorch_lightning.loggers import TensorBoardLogger
import torch

from pytorch_forecasting import Baseline, TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.metrics import SMAPE, PoissonLoss, QuantileLoss
from pytorch_forecasting.models.temporal_fusion_transformer.tuning import optimize_hyperparameters

In [2]:
import sagemaker
import uuid

sagemaker_session = sagemaker.Session()
print('SageMaker version: ' + sagemaker.__version__)

bucket = sagemaker_session.default_bucket()
prefix = 'sagemaker/DEMO-pytorch-cnn-cifar10'

role = sagemaker.get_execution_role()
checkpoint_suffix = str(uuid.uuid4())[:8]
checkpoint_s3_path = 's3://{}/checkpoints/checkpoint-{}'.format(bucket, checkpoint_suffix)

print('Checkpointing Path: {}'.format(checkpoint_s3_path))

SageMaker version: 2.103.0
Checkpointing Path: s3://sagemaker-us-east-1-551329315830/checkpoints/checkpoint-8faddec6


In [3]:
electricity_df = pd.read_csv("tft/outputs/data/electricity/hourly_electricity.csv")

electricity_df['hours_from_start'] = electricity_df['hours_from_start'].astype(int)
electricity_df['power_usage'] = electricity_df['power_usage'].astype(float)
electricity_df['hour'] = electricity_df['hour'].astype(int)
electricity_df['day_of_week'] = electricity_df['day_of_week'].astype(int)
electricity_df['categorical_id'] = electricity_df['categorical_id'].astype(str)

In [4]:
import os
import json
def save_local_and_upload_s3(data_df, sagemaker_session, bucket, dir_name = "timeseries_data", data_filename = "data"):
    #create data directory if not exist
    if os.path.isdir(dir_name):
        print("Checkpointing directory {} exists".format(dir_name))
    else:
        print("Creating Checkpointing directory {}".format(dir_name))
        os.makedirs(dir_name)

    data_df.to_parquet('{}/{}.parquet'.format(dir_name, data_filename))   
    print("saved raw data to {}/{}.parquet".format(dir_name, data_filename))
    
    return sagemaker_session.upload_data(path=dir_name, bucket=bucket, key_prefix='data/{}'.format(dir_name))



def metadata_json_upload_s3(training_metadata, sagemaker_session, bucket, dir_name = "timeseries_data", metadata_filename = "data_metadata"):
    #create data directory if not exist
    if os.path.isdir(dir_name):
        print("Checkpointing directory {} exists".format(dir_name))
    else:
        print("Creating Checkpointing directory {}".format(dir_name))
        os.makedirs(dir_name)
    
    with open('{}/{}.json'.format(dir_name, metadata_filename), 'w') as fp:
        json.dump(training_metadata, fp)
        print("saved metadata to {}/{}.json".format(dir_name, metadata_filename))
    
    return sagemaker_session.upload_data(path=dir_name, bucket=bucket, key_prefix='data/{}'.format(dir_name))


In [5]:
# max_prediction_length = 24
# max_encoder_length = 24 * 7
# num_epochs = 100
# early_stopping_patience = 5
# multiprocessing_workers = 5


# dropout_rate = 0.1
# hidden_layer_size = 160
# learning_rate = 0.001
# minibatch_size = 64
# max_gradient_norm = 0.01
# num_heads = 4
# stack_size =  1

In [6]:
import pandas as pd 

inputs = save_local_and_upload_s3(electricity_df, sagemaker_session, bucket,
                                  dir_name = "timeseries_data/electricity",
                                  data_filename="electricity_training_data")
inputs

Checkpointing directory timeseries_data/electricity exists
saved raw data to timeseries_data/electricity/electricity_training_data.parquet


's3://sagemaker-us-east-1-551329315830/data/timeseries_data/electricity'

In [7]:
max_prediction_length = 24
max_encoder_length = 24 * 7


training_metadata = {}
training_metadata['time_idx'] = "hours_from_start"
training_metadata['target'] = "power_usage"
training_metadata['group_ids'] = ["categorical_id"]
training_metadata['min_encoder_length'] = max_encoder_length      # keep encoder length long (as it is in the validation set)
training_metadata['max_encoder_length'] = max_encoder_length
training_metadata['min_prediction_length'] = 1      
training_metadata['max_prediction_length'] = max_prediction_length
training_metadata['static_categoricals'] = ["categorical_id"]
training_metadata['static_reals'] = []
training_metadata['time_varying_known_categoricals'] = []
training_metadata['variable_groups'] = {}
training_metadata['time_varying_known_reals'] = ["hours_from_start", "day_of_week", "hour"]
training_metadata['time_varying_unknown_categoricals'] = []
training_metadata['time_varying_unknown_reals'] = []
training_metadata['target_normalizer'] = {
                            "normalized_groups": ["categorical_id"],
                            "normalization_transformation": 'softplus'
                        }
training_metadata['add_relative_time_idx'] = True
training_metadata['add_target_scales'] = True
training_metadata['add_encoder_length'] = True
training_metadata['allow_missing_timesteps'] = True

training_metadata['training_cutoff'] = int(electricity_df[training_metadata['time_idx']].max() - max_prediction_length)
# training_metadata['training_cutoff'] = int(electricity_df[training_metadata['time_idx']].max())

# upload metadata
metadata_json_upload_s3(training_metadata, sagemaker_session, bucket, 
                                    dir_name = "timeseries_data/electricity",
                                    metadata_filename="electricity_metadata")

Checkpointing directory timeseries_data/electricity exists
saved metadata to timeseries_data/electricity/electricity_metadata.json


's3://sagemaker-us-east-1-551329315830/data/timeseries_data/electricity'

In [8]:
hyperparameters = {
        'data-filename': "electricity_training_data.parquet",
        'metadata-filename': "electricity_metadata.json",
    
        'max-prediction-length' : max_prediction_length,
        'max-encoder-length' : max_encoder_length,
#         'num-epochs' : 2,
        'num-epochs' : 100,

        'early-stopping-patience' : 5,
        'multiprocessing-workers' : 5,


        'dropout-rate' : 0.1,
        'hidden-layer-size' : 160,
        'learning-rate' : 0.001,
        'minibatch-size' : 64,
        'max-gradient-norm' : 0.01,
        'num-heads' : 4
    
    }


In [9]:
use_spot_instances = True
max_run=36000      # in seconds, after this, job will be terminated
max_wait = 10 * max_run if use_spot_instances else None
local_image_name = 'pytorch-tft-container-test'

In [None]:
from sagemaker.pytorch import PyTorch
from sagemaker.estimator import Estimator
from sagemaker.debugger import TensorBoardOutputConfig


tensorboard_output_config = TensorBoardOutputConfig(
    s3_output_path='s3://sagemaker-us-east-1-551329315830/tensorboard',
    container_local_output_path='/lightning_logs'
)

spot_estimator  = PyTorch(entry_point='../TFT_docker/TFT.py',
                            dependencies=['../TFT_docker/requirements.txt'],
                            role=role,
                            framework_version='1.7.1',
                            py_version='py3',
                            instance_count=1,
#                             instance_type='local',
                            instance_type='ml.p3.2xlarge',
#                             instance_type='ml.p2.xlarge',
                            base_job_name='tft-pytorch-spot-1',
                            hyperparameters=hyperparameters,
                            checkpoint_s3_uri=checkpoint_s3_path,
                            debugger_hook_config=False,
                            input_mode = 'File',
                            use_spot_instances=use_spot_instances,
                            max_run=max_run,
                            max_wait=max_wait,
                            tensorboard_output_config=tensorboard_output_config
                           )

spot_estimator.fit(
                inputs,
                logs = 'All'
            )

2022-09-01 05:10:37 Starting - Starting the training job...
2022-09-01 05:11:00 Starting - Insufficient capacity error from EC2 while launching instances, retrying!ProfilerReport-1662009037: InProgress
......................................................

In [None]:
spot_estimator.model_data

In [None]:
electricity_df.info()

In [None]:
len(electricity_df['categorical_id'].unique())

In [None]:
sum(electricity_df['id'] != electricity_df['categorical_id']

In [None]:
electricity_df.info()

In [None]:
electricity_df.head(5)