In [None]:
%%capture
%pip install awswrangler

In [None]:
import os
import sys
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)
import util
import time
import boto3
import sagemaker
import pandas as pd
import awswrangler as wr
import seaborn as sns
from operator import attrgetter
from datetime import timedelta
from datetime import datetime
model = 'schedule'
forecastHorizon = 168

In [None]:
tStart = time.time()

In [None]:
# Parameter
DATASET_FREQUENCY = "H" 
TIMESTAMP_FORMAT = "yyyy-MM-dd hh:mm:ss"
algorithmArn = 'arn:aws:forecast:::algorithm/Deep_AR_Plus'
# AWS setup
region = sagemaker.Session().boto_region_name
session = boto3.Session(region_name=region) 
forecast = session.client(service_name='forecast') 
forecastquery = session.client(service_name='forecastquery')
# Naming
role_arn = 'arn:aws:iam::123456:role/PuretechBillSuccessRateForecast'
bucket_name = sagemaker.Session().default_bucket()
key='puretech_data/' + model +'.csv'
s3DataPath = "s3://"+bucket_name+"/"+key
project = 'bill_scs_rate_' + model
datasetName= project+'_ds'
datasetGroupName= project +'_gp'
predictorName= project+'_predictor'
forecastName= project+'_forecast'
forecastExportName= 'export'
outputPath='s3://'+bucket_name+'/forecast_output/'+model

# Creating the Dataset Group and Dataset 

In [None]:
# Specify the schema of your dataset here. Make sure the order of columns matches the raw data files.
schema ={
   "Attributes":[
      {
         "AttributeName":"timestamp",
         "AttributeType":"timestamp"
      },
      {
         "AttributeName":"item_id",
         "AttributeType":"string"
      },
      {
         "AttributeName":"target_value",
         "AttributeType":"float"
      }
   ]
}

response=forecast.create_dataset(
                    Domain="CUSTOM",
                    DatasetType='TARGET_TIME_SERIES',
                    DatasetName=datasetName,
                    DataFrequency=DATASET_FREQUENCY, 
                    Schema = schema
                   )
datasetArn = response['DatasetArn']
print(datasetArn)

In [None]:
create_dataset_group_response = forecast.create_dataset_group(DatasetGroupName=datasetGroupName,
                                                              Domain="CUSTOM",
                                                              DatasetArns= [datasetArn]
                                                             )
datasetGroupArn = create_dataset_group_response['DatasetGroupArn']
print(datasetGroupArn)

## Create Data Import Job



Brings the data into Amazon Forecast system ready to forecast from raw data.

In [None]:
datasetImportJobName = 'EP_DSIMPORT_JOB_TARGET'
ds_import_job_response=forecast.create_dataset_import_job(DatasetImportJobName=datasetImportJobName,
                                                          DatasetArn=datasetArn,
                                                          DataSource= {
                                                              "S3Config" : {
                                                                 "Path":s3DataPath,
                                                                 "RoleArn": role_arn
                                                              } 
                                                          },
                                                          TimestampFormat=TIMESTAMP_FORMAT
                                                         )
ds_import_job_arn=ds_import_job_response['DatasetImportJobArn']
print(ds_import_job_arn)

In [None]:
status_indicator = util.StatusIndicator()

while True:
    status = forecast.describe_dataset_import_job(DatasetImportJobArn=ds_import_job_arn)['Status']
    status_indicator.update(status)
    if status in ('ACTIVE', 'CREATE_FAILED'): break
    time.sleep(10)

status_indicator.end()

## Create Predictor with customer forecast horizon

In [None]:
create_predictor_response=forecast.create_predictor(PredictorName=predictorName, 
                                                  AlgorithmArn=algorithmArn,
                                                  ForecastHorizon=forecastHorizon,
                                                  PerformAutoML= False,
                                                  PerformHPO=True,
                                                  TrainingParameters= {"likelihood": 'beta'},
                                                  EvaluationParameters= {"NumberOfBacktestWindows": 1, 
                                                                         "BackTestWindowOffset": 168}, 
                                                  InputDataConfig= {"DatasetGroupArn": datasetGroupArn},
                                                  FeaturizationConfig= {"ForecastFrequency": "H", 
                                                                        "Featurizations": 
                                                                        [
                                                                          {"AttributeName": "target_value", 
                                                                           "FeaturizationPipeline": 
                                                                            [
                                                                              {"FeaturizationMethodName": "filling", 
                                                                               "FeaturizationMethodParameters": 
                                                                                {"frontfill": "none", 
                                                                                 "middlefill": "median", 
                                                                                 "backfill": "median"}
                                                                              }
                                                                            ]
                                                                          }
                                                                        ]
                                                                       }
                                                 )
predictorArn=create_predictor_response['PredictorArn']
print(predictorArn)

In [None]:
status_indicator = util.StatusIndicator()

while True:
    status = forecast.describe_predictor(PredictorArn=predictorArn)['Status']
    status_indicator.update(status)
    if status in ('ACTIVE', 'CREATE_FAILED'): break
    time.sleep(10)

status_indicator.end()

### Error Metrics

In [None]:
forecast.get_accuracy_metrics(PredictorArn=predictorArn)

## Create Forecast

In [None]:
create_forecast_response=forecast.create_forecast(ForecastName=forecastName,
                                                  ForecastTypes = ["0.5"],
                                                  PredictorArn=predictorArn)
forecastArn = create_forecast_response['ForecastArn']
print(forecastArn)

In [None]:
status_indicator = util.StatusIndicator()

while True:
    status = forecast.describe_forecast(ForecastArn=forecastArn)['Status']
    status_indicator.update(status)
    if status in ('ACTIVE', 'CREATE_FAILED'): break
    time.sleep(10)

status_indicator.end()

## Create Forecast Export

In [None]:
forecast_export_response = forecast.create_forecast_export_job(
                                             ForecastExportJobName = forecastExportName,
                                             ForecastArn = forecastArn,
                                             Destination = {
                                                "S3Config" : {
                                                    "Path":outputPath,
                                                    "RoleArn": role_arn
                                                } 
                                             }
                                           )
forecastExportJobArn = forecast_export_response['ForecastExportJobArn']
print(forecastExportJobArn)

In [None]:
status_indicator = util.StatusIndicator()

while True:
    status = forecast.describe_forecast_export_job(ForecastExportJobArn=forecastExportJobArn)['Status']
    status_indicator.update(status)
    if status in ('ACTIVE', 'CREATE_FAILED'): break
    time.sleep(10)

status_indicator.end()

## S3 to DynamoDB

In [None]:
def load_pred():
    bucket = sagemaker.Session().default_bucket()
    prefix = 'forecast_output/' + model 
    path = f's3://{bucket}/{prefix}'   
    suffix = 'part0.csv'    
    df = wr.s3.read_csv(path=path, path_suffix=suffix, last_modified_begin=datetime.now(timezone.utc)-timedelta(hours=24))
    return df

In [None]:
def transform(df):
    df = df.drop('item_id', axis=1)
    df['date'] = pd.to_datetime(df['date'], format="%Y-%m-%dT%H:%M:%SZ")
    df['date'] = df['date']+timedelta(hours=8)
    return df

In [None]:
def put_time(model, timetoday, best_time, dynamodb=None):
    if not dynamodb:
        dynamodb = boto3.resource('dynamodb')

    table = dynamodb.Table('BestBillTime')
    response = table.put_item(
       Item={
            'mt_category': model,
            'forecast_date': timetoday,
            'best_time': str(best_time)
        }
    )
    return response

In [None]:
df = load_pred()
df = transform(df)
sns.lineplot(x='date', y='p50', data=df)

In [None]:
best_time = {}
best_idx = df['p50'].idxmax()
best_time, best_value = df.iloc[best_idx].date, df.iloc[best_idx].p50
print(best_time, best_value)

In [None]:
timetoday = datetime.today().strftime('%Y-%m-%d')
put_time(model, timetoday, best_time)

In [None]:
tEnd = time.time()
print ("Spent %f minutes" % ((tEnd - tStart)/60))