# Forecasting Currency Price & Volume with DeepAR
## Part 1 - Feature Engineering

This lab will walk you through the main steps of preparing a data set for training, testing and validation of a deep learning model called DeepAR that is available as an in-built SageMaker model. The data to be transformed is composed of one minute intervals of curreny price and volume data from 2107-2019.

### Introduction

#### DeepAR
DeepAR is a deep learning model that leverages multiple layers of recurrent neural networks to predict time series quantities. The model takes as input a context of historical data, and generates quantiles of forecasted values. Deep AR is based on research done by Amazon to improve our own internal forecasting capabailities. To read more on how the algorithm works, [go here](https://docs.aws.amazon.com/sagemaker/latest/dg/deepar_how-it-works.html).

> **What Are Quantiles?**
> Quantiles percentages that represent the likelyhood a value will be lower or higher than a value. For example, a 50% quantile of 10, means that 50% of values will higher, and 50% lower than 10. This is also the mean. A 10% quantile of 10, means that 10% of values will be higher than 10, and 90% will be lower.

#### Deep AR Features
The model you will develop will forecast 30 minutes of volume and price quantiles for currencies, based on the previous 30 minutes of values. In order to do this, you need to generate features in a specific format that can be read by the Sage Maker model in training as well as inference. The format is composed of the following properties:

1. Start Time - DeepAR will utilise seasonal and periodic correlations when tarining the model. It needs the start time as a feature in order to do this.
2. Target Values - A list of time ordered values. For this lab the value will be one of open, close, high or low price or volume.
3. Categorial Array - A list of categorical values to distinguish different time series from each other. For this lab, the categorical array will have two values, one for the currency type, and another for the type of value (open, close, high or low price or volume)

A full row will look like this:
```
Row(
    cat    : [currency_id, value_type_id],
    start  : Timestamp of start of time series,
    target : Array( values )
)
```   

#### Spark
Spark is used to process the data due to the size of the dataset. For the tutorial, you will only use a subset of data in the interest of time. You also only use a local (non-clustered) instance of Spark. For the full production model, you would need to process far more years or data as well as addtional currency pairs. A dataset that large will not fit into memory, and processing would be impossible on a single computer. This is a common problem in machine learning that requires large scale data sets. Spark solves these issues by providing a way to process data using horizontally scalable compute instances. It also provides an SQL langauge that makes it easy to use for data scienctists with development or analyst backgrounds.

### 1. Get Started
To begin, modify the cell below so your generated s3 data will not interfere with other people working on the lab.

In [None]:
USER = <YOUR USER NAME IN QUOTES> # example 'whitefish'
BUCKET = <LAB BUCKET NAME in QUOTES>  # example 'grr.amazon.com-lab'

OVERIDE_PATH = None

### Imports & Configuration
First, you will import all the required python and Spark libraries and create a spark session. You need to include the required SageMaker classpath which enables access to s3 from within Spark.

In [None]:
from datetime import timedelta
import sagemaker_pyspark, boto3
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType, StringType, TimestampType, StructType, StructField
import json

region = boto3.Session().region_name
classpath = ":".join(sagemaker_pyspark.classpath_jars())
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath).getOrCreate()

### 2. Read CSV File
Now you can create a Spark data frame from the csv file that contains Australian dollar versus US Dollar. The file is located in s3. After loading the data into a data frame, you create a temporary view that allows us to query the data with Spark SQL queries. You also rename one of the columns so it is easier to deal with.

In [None]:
inputpath = 's3a://{}/labs/deepar/data/audusd_1m_partial.csv'.format(BUCKET)
print('reading csv file {}'.format(inputpath))
df = spark.read.csv(inputpath, inferSchema=True, header=True).withColumnRenamed('Gmt time', 'Time')
df.createOrReplaceTempView('forex')
df.show(5)

### 3. Standardize & Order Data
You need our current string-based date column to be a timestamp type as you will use this column for ordering and creating subsets of data by time. Once you have the column cast to a timestamp, you can order the values in ascending order.

In [None]:
query = '''
SELECT 
    to_timestamp(Time, "dd.MM.yyyy HH:mm:ss.SSS") as start, 
    *
FROM forex 
ORDER BY start
'''
spark.sql(query).show(5)

### 4. Create Weekly Time Series
As opposed to treating the entire 2 years of time series as a single time series, you will group them up into weeks. To do this you create a group column that is comnposed of the year and week of year.

#### Create Group Column
You will append the year and week of year into a unique group identifier.

In [None]:
query = '''
SELECT 
    weekofyear(start) + year(start) * 100 as group,
    *
FROM (
    SELECT 
        to_timestamp(Time, "dd.MM.yyyy HH:mm:ss.SSS") as start, 
        * 
    FROM forex 
)
'''  
spark.sql(query).show(5)

#### Group Into Lists
Once you have the group column, you can group the entire data set by group, collecting each column into a list of values for each group. You also get the start date for the time series by finding the minimum timestamp within a group.

In [None]:
query = '''
SELECT
    min(start) as ts_start,
    collect_list(Open) as open,
    collect_list(Close) as close,
    collect_list(Low) as low,
    collect_list(Volume) as volume
FROM (
    SELECT 
        weekofyear(start) + year(start) * 100 as group,
        *
    FROM (
        SELECT 
            to_timestamp(Time, "dd.MM.yyyy HH:mm:ss.SSS") as start, 
            *        
        FROM forex 
    ) ORDER BY start
) GROUP BY group ORDER BY group
'''
df = spark.sql(query)
df.createOrReplaceTempView('features')
df.show(5)

### 5. Create Training Set
Now that you have the data in the right format, you need to split of the training set. For time series data, the training set should have no visibility into the time period of the test set, to insure the model is not just fitting to the known time period. 

#### Get Cut-offs Dates
First, you need to get the start date based on the minimum of all the starts.

In [None]:
query = '''
SELECT 
    min(size(volume)) AS min_cnt, 
    max(size(volume)) AS max_cnt,
    min(ts_start) AS min_date, 
    max(ts_start) AS max_date 
FROM features
'''

stats = spark.sql(query).collect()[0]
spark.sql(query).show(5)

There are three sets of data. The training set is for training the model. The test set is for calculating the performance of the model during hyperparameter tuning and training evaluation. The holdout set is used for the final evaluation of the model to see how it will perform on new data.

To better understand how the time is divided up, you will print a diagram of the different cut-off times for each of these sets. You use the weeks of holdout, and weeks of test to calculate these time periods.

In [None]:
print('time series have between {} and {} entries\n\n'.format(stats.min_cnt, stats.max_cnt))

holdout_start = stats.max_date - timedelta(days=(7*6)) # 8 weeks of holdout
test_start = holdout_start - timedelta(days=8*7)      # 8 weeks of test set
holdout_end = stats.max_date

print('    |....... train .......|....... test .......|....... holdout .......|')
print('{}            {}           {}              {}'.format(
    stats.min_date.date(), 
    test_start.date(), 
    holdout_start.date(), 
    holdout_end.date())
)

#### Store To S3
Now that you have the cut-off dates, you can select the features and save them to an S3 bucket of training data. For training, you write the data in compressed parquet format. You also will add a category column `cat` with the first value being 0 for currency `audusd` and the second value being 0 for the `close`. Eventually when you wish to infer forecasts with the trained model, you must pass in same the category id for the currency and value types.

> **Note** For the tutorial, you limit the query results so the write operation does not take too long.

In [None]:
query = '''
SELECT 
    date_format(ts_start, 'yyyy-MM-dd HH:mm:ss') as start, 
    close as target,
    array(0,0) as cat
FROM features
WHERE ts_start < to_timestamp('2018-10-16', 'yyyy-MM-dd')
LIMIT 100
'''
train = spark.sql(query)

outpath = 's3a://{}/labs/deepar/output/{}/dev_features/train'.format(BUCKET, USER)
train.write.mode("append").parquet(outpath)

print('wrote train data to '+outpath)

### 6. Create Test Set
Now you create the test from data beginning at the end of the training set range, and ending at the beginning of the holdout set. For training data, you need the time series to begin at different times, so you create a function to randomly generate different time ranges. The time ranges need to be the same as the model requires for input, and also have the same output length. For the example, you create 5 random subsets from each timeseries, and make them 60 minutes in length, since the model requires 30 minutes of data and forecast 30 minutes into the future. None of this is needed for training data, as the training algorithm will do this automatically.

#### Create UDF
The code below defines a Spark user defined function (UDF) that you can use to generate the sub samples required for the test set, all within a Spark SQL query.

In [None]:
import random
def create_tests(start, values, length, ntests):
    
    # create random set of start indexes
    start_indexes = random.sample(range(0, len(values) - length), ntests)
    
    # create new sequences for each start index of the required length
    tests = []
    for s in start_indexes:
        tests.append({
            'ts_start': start + timedelta(minutes=s),
            'target': values[s: s+length]
        })   
    return tests

# define the return type
ret_type =  ArrayType(
                 StructType([
                    StructField('ts_start', TimestampType(), True),
                    StructField('target', ArrayType(DoubleType()), True)
                ])
            )

# register the udf
spark.udf.register("create_tests", create_tests, ret_type)

#### Query Test Data
Now that you have the UDF you can query the test data with subsamples. The UDF generates lists or time series in a single row. Sage Maker requires a single time series per row, so you need to explode each row into multiple rows using the 'LATERAL VIEW explode' syntax below. You also order the rows randomly, so when Sage Maker evaluates the model it looks at different types of series.

In [None]:
query = '''
SELECT 
    date_format(test.ts_start, 'yyyy-MM-dd HH:mm:ss') as start,
    test.target AS target,
    array(0,0) as cat
FROM (
    SELECT create_tests(ts_start, close, 60, 5) as targets 
    FROM features 
    WHERE ts_start >= to_timestamp('2018-10-16', 'yyyy-MM-dd')  AND ts_start < to_timestamp('2018-11-27', 'yyyy-MM-dd')
    LIMIT 100
)
LATERAL VIEW explode(targets) t as test 
ORDER BY rand()
'''
test = spark.sql(query)
test.show(5)

#### Save the Test Data
Now you can save the test data to an s3 bucket. The exact same process can be followed for creating a holdout dataset, using the holdout set time cut-offs. 

In [None]:
outpath = 's3a://{}/labs/deepar/output/{}/dev_features/test'.format(BUCKET, USER)
test.write.parquet(outpath, mode='append')
print('wrote test data to '+outpath)

### 7. ETL with Glue
All the above code can now be gathered into a single Glue script and executed across many different currencies, as well as different data columns like volume, high price, or low price. Glue enables us to do serverless Spark jobs and avoid setting up an EMR cluster for our spark jobs. Up until now, you have been running Spark locally on a small set of data. When you start adding in more years of data, more currencies, and more test and holdout set samples, this would quickly become untenable to run on a single node. 

Some key difference in the glue script to the above code is that is parameterized, so you can generate different types of data sets. It enables multiple currencies and currency properties. Each time series is categorized by the instrument and property type, so the model can distinguish them from each other. In addition, it generates a holdout set in the same way the test set was generated. The holdout set is generated in json, to enable batch scoring.

The Glue script will also randomly shuffle all the stored data, so the training batches don't overly fit to one type of time series.

#### Create Glue Job
The following script creates the job.

In [None]:
import boto3, time

# get the s3 and glue client
glue = boto3.client('glue')
s3 = boto3.resource('s3')

# save the glue script to s3
filepath = 'labs/deepar/output/{}/scripts/glue_create_features.py'.format(USER)
s3.Object(BUCKET, filepath).upload_file('glue_create_features.py')
script_path = 's3://{}/{}'.format(BUCKET, filepath)
print('wrote glue script to {}'.format(script_path))

# set the glue log directory
log_dir = 's3://{}/labs/deepar/output/{}/glue_logs'.format(BUCKET, USER)

# set the glue temp directory
temp_dir = 's3://{}/labs/deepar/output/{}/glue_temp'.format(BUCKET, USER)

# set the output path 
# the overide path is used to create the master data set for rest of lab
if OVERIDE_PATH:
    s3_path = overide_path
else:
    s3_path = 's3://{}/labs/deepar/output/{}/features'.format(BUCKET, USER)
print('s3_path feature path is ' + s3_path)

# create a glue job
job_name = USER + time.strftime('-%Y-%m-%d-%H-%M-%S', time.gmtime())
print('using job name '+job_name)

glue.delete_job(JobName=job_name)
response = glue.create_job(
    Name=   job_name,
    LogUri= log_dir,
    Role='GlueServiceRole',
    ExecutionProperty={ 
        'MaxConcurrentRuns': 1
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': script_path
    },
    DefaultArguments={
        '--TempDir': temp_dir,
        '--enable-metrics': ''
    },
    MaxRetries=0,
    AllocatedCapacity=60,
    Timeout=600
)
print("\ncreated job")
print(json.dumps(response, indent=4, sort_keys=True))

#### Run Glue Job
This script runs the job, by passing in parameters that define how the dataset will be generated. You can now check the glue console to see the job running.

In [None]:
# execute the job
response = glue.start_job_run(
    JobName = job_name, 
        Arguments = {
            '--instruments':   'audusd,eurusd,usdjpy',
            '--columns'    :   'open,close,high,low',
            '--test_weeks' :   '12',
            '--n_tests':       '300',
            '--holdout_weeks': '12',
            '--length':        '60',
            '--s3_bucket':    BUCKET,
            '--s3_path':      s3_path,
            '--user':         USER
        }
)
print('\nstarted job')
print(json.dumps(response, indent=4, sort_keys=True))

> **Note**
To monitor the executing glue job, you can open the AWS console and goto `Services -> AWS Glue -> ETL / Jobs`


### Stop Glue Job
Since the job is a long running task, you wlll now stop it. The next lab will work off of a previously generated full feature set, so running the full feature generation job is not necessary.

In [None]:
if OVERIDE_PATH is None:
    glue.batch_stop_job_run(JobName=job_name, JobRunIds=[response['JobRunId']])
    print('stopped glue job '+response['JobRunId'])

#### Glue Script
The actual glue script can be viewed below.

In [None]:
%pycat glue_create_features.py

### Conclusion
In this lab you have performed all the steps needed to prepare the data for training a Deep AR model. For your own data sets, the process will be very similar. For the final production system, some additional steps would be:
1. Automation of Glue scripts with a schedule of processing times.
2. Versioning of generated features to enable different model architectures.
3. Augmenting the dataset with more currencies, or a longer time period.

In the next tutorial, you will use the generated data to [tune and train the DeepAR model](Deep%20AR%20Tune%20%26%20Train.ipynb).