## 0. Setup

In [2]:
import boto3
import sagemaker
import sagemaker.session


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
model_package_group_name = f"StalkStockModelPackageGroupName"

CONTEXT_LENGTH = 191
PREDICTION_LENGTH = 7

BUCKET = "sagemaker-stock-prices"
s3_data_path = f"{BUCKET}/data"
s3_evaluate_result_path = f"{BUCKET}/result"

In [3]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.c5.2xlarge"
)
# model_approval_status = ParameterString(
#     name="ModelApprovalStatus",
#     default_value="PendingManualApproval"
# )
# input_data = ParameterString(
#     name="InputData",
#     default_value=input_data_uri,
# )
# batch_data = ParameterString(
#     name="BatchData",
#     default_value=batch_data_uri,
# )

## 1. Processing Data

In [4]:
%%writefile stalk-stock/processing.py
import numpy as np
import pandas as pd
import json
import boto3
import io
import os
import sagemaker


# predict for 7 days
PREDICTION_LENGTH = 7

BUCKET = "sagemaker-stock-prices"

s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET)

macd_hists = []


def get_macd(data):
    exp1 = data.ewm(span=12, adjust=False).mean()
    exp2 = data.ewm(span=26, adjust=False).mean()
    macd = exp1 - exp2
    exp3 = macd.ewm(span=9, adjust=False).mean()
    return macd

    
def series_to_obj(ts, cat=None):
    import math
    obj = {"start": f"{ts.index[0]} 00:00:00", "target": [None if math.isnan(t) else t for t in ts]}
    if cat is not None:
        obj["cat"] = cat
    return obj


def series_to_jsonline(ts, cat=None):
    return json.dumps(series_to_obj(ts, cat))


if __name__ == "__main__":
    idx = 0
    for obj in bucket.objects.filter(Prefix="raw/"):
        key = obj.key
        print(key)
        symbol = os.path.basename(key).split('.')[0]

        table = pd.read_csv(io.BytesIO(obj.get()['Body'].read()), index_col=0, parse_dates=True)
        table = table['Close']

        macd = get_macd(table)
        exp3 = macd.ewm(span=9, adjust=False).mean()
        macd_hist = macd - exp3
        macd_hist = macd_hist.asfreq('D')
        macd_hists.append(macd_hist)

        if idx % 10 == 0:
            print(macd_hist)

        idx += 1
    
    macd_hists_training = []
    for h in macd_hists:
        macd_hists_training.append(h[:-PREDICTION_LENGTH])
        
    print(series_to_jsonline(macd_hists[0]).encode("utf-8"))
    
    encoding = "utf-8"
    FILE_TRAIN = "train.json"
    FILE_TEST = "test.json"
    BASE_DIR = "/opt/ml/processing"
    if not os.path.exists(BASE_DIR):
        os.mkdir(BASE_DIR)
    if not os.path.exists(f"{BASE_DIR}/train"):
        os.mkdir(f"{BASE_DIR}/train")
    if not os.path.exists(f"{BASE_DIR}/test"):
        os.mkdir(f"{BASE_DIR}/test")  
        
    with open(f"{BASE_DIR}/train/{FILE_TRAIN}", "wb") as f:
        for ts in macd_hists_training:
            f.write(series_to_jsonline(ts).encode(encoding))
            f.write("\n".encode(encoding))

    with open(f"{BASE_DIR}/test/{FILE_TEST}", "wb") as f:
        for ts in macd_hists:
            f.write(series_to_jsonline(ts).encode(encoding))
            f.write("\n".encode(encoding))

Overwriting stalk-stock/processing.py


In [5]:
from sagemaker.processing import ScriptProcessor


# processing_image_uri = sagemaker.image_uris.retrieve("xgboost", boto3.Session().region_name, version="1.0-1")
script_process = ScriptProcessor(
    image_uri="555178539686.dkr.ecr.us-east-1.amazonaws.com/stalk-stock-processor:latest",
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-stalkstock-processing",
    role=role,
)

In [6]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="StalkStockProcess",
    processor=script_process,
    outputs=[
        ProcessingOutput(output_name="train", destination=f"s3://{s3_data_path}/train/", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", destination=f"s3://{s3_data_path}/test/", source="/opt/ml/processing/test")
    ],
    code="stalk-stock/processing.py",
)

## 2. Training Model

In [7]:
from sagemaker.estimator import Estimator


image_uri = sagemaker.image_uris.retrieve("forecasting-deepar", boto3.Session().region_name)
s3_model_path = f"{BUCKET}/output"

deepar_train = Estimator(
    image_uri=image_uri,
    role=role,
    instance_count=1,
    instance_type=training_instance_type,
    output_path=f"s3://{s3_model_path}",
)

# hyperparameters = {
#     "time_freq": "D",
#     "context_length": str(CONTEXT_LENGTH),
#     "prediction_length": str(PREDICTION_LENGTH),
#     "embedding_dimension": "49",
#     "num_cells": "181",
#     "num_layers": "4",
#     "likelihood": "gaussian",
#     "epochs": "100",
#     "mini_batch_size": "200",
#     "learning_rate": "0.000102117271092181",
#     "dropout_rate": "0.07290734463384366",
#     "early_stopping_patience": "10",
# }
hyperparameters = {
    "time_freq": "D",
    "context_length": str(CONTEXT_LENGTH),
    "prediction_length": str(PREDICTION_LENGTH),
    "embedding_dimension": "21",
    "num_cells": "180",
    "num_layers": "2",
    "likelihood": "gaussian",
    "epochs": "40",
    "mini_batch_size": "171",
    "learning_rate": "0.00017010544816687748",
    "dropout_rate": "0.17607352219111227",
    "early_stopping_patience": "10",
}
deepar_train.set_hyperparameters(**hyperparameters)

In [8]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="StalkStockTrain",
    estimator=deepar_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
        )
    },
)

## 3. Predicting

In [9]:
%%writefile stalk-stock/evaluating.py
import numpy as np
import pandas as pd
import json
import boto3
import sagemaker
import io
import os
import datetime
import math


def series_to_obj(ts, cat=None):
    import math
    obj = {"start": f"{ts.index[0]} 00:00:00", "target": [None if math.isnan(t) else t for t in ts]}
    if cat is not None:
        obj["cat"] = cat
    return obj


def series_to_jsonline(ts, cat=None):
    return json.dumps(series_to_obj(ts, cat))


class DeepARPredictor(sagemaker.predictor.RealTimePredictor):
    def set_prediction_parameters(self, freq, prediction_length):
        """Set the time frequency and prediction length parameters. This method **must** be called
        before being able to use `predict`.

        Parameters:
        freq -- string indicating the time frequency
        prediction_length -- integer, number of predicted time points

        Return value: none.
        """
        self.freq = freq
        self.prediction_length = prediction_length

    def predict(
            self,
            ts,
            cat=None,
            encoding="utf-8",
            num_samples=100,
            quantiles=["0.1", "0.5", "0.9"],
            content_type="application/json",
    ):
        """Requests the prediction of for the time series listed in `ts`, each with the (optional)
        corresponding category listed in `cat`.

        Parameters:
        ts -- list of `pandas.Series` objects, the time series to predict
        cat -- list of integers (default: None)
        encoding -- string, encoding to use for the request (default: "utf-8")
        num_samples -- integer, number of samples to compute at prediction time (default: 100)
        quantiles -- list of strings specifying the quantiles to compute (default: ["0.1", "0.5", "0.9"])

        Return value: list of `pandas.DataFrame` objects, each containing the predictions
        """
        prediction_times = [x.index[-1] + pd.Timedelta(1, unit=self.freq) for x in ts]
        req = self.__encode_request(ts, cat, encoding, num_samples, quantiles)
        res = super(DeepARPredictor, self).predict(req, initial_args={"ContentType": content_type})
        return self.__decode_response(res, prediction_times, encoding)

    def __encode_request(self, ts, cat, encoding, num_samples, quantiles):
        instances = [series_to_obj(ts[k], cat[k] if cat else None) for k in range(len(ts))]
        configuration = {
            "num_samples": num_samples,
            "output_types": ["quantiles"],
            "quantiles": quantiles,
        }
        http_request_data = {"instances": instances, "configuration": configuration}
        return json.dumps(http_request_data).encode(encoding)

    def __decode_response(self, response, prediction_times, encoding):
        response_data = json.loads(response.decode(encoding))
        list_of_df = []
        for k in range(len(prediction_times)):
            prediction_index = pd.date_range(
                start=prediction_times[k], freq=self.freq, periods=self.prediction_length
            )
            list_of_df.append(
                pd.DataFrame(
                    data=response_data["predictions"][k]["quantiles"], index=prediction_index
                )
            )
        return list_of_df


def macd_hist_to_price(stock_prices, macd_hist):
    dt = macd_hist.index[0] - datetime.timedelta(days=1)
    stock_prices = stock_prices.loc[:dt]
    #     print(macd_hist)
    #     print(stock_prices)
    for dt, m in macd_hist.items():
        stock_prices = binary_search(stock_prices, dt, m)
    return stock_prices


def get_macd(data):
    exp1 = data.ewm(span=12, adjust=False).mean()
    exp2 = data.ewm(span=26, adjust=False).mean()
    macd = exp1 - exp2
    exp3 = macd.ewm(span=9, adjust=False).mean()
    return macd


def binary_search(prices, dt, m):
    low = 0.0
    high = prices.iloc[-1] * 2.0
    cnt = 0
    while cnt < 1000:
        mid = low + (high - low) / 2
        new_prices = pd.concat([prices, pd.Series([mid], index=[dt])])
        macd = get_macd(new_prices)
        exp3 = macd.ewm(span=9, adjust=False).mean()
        macd_hist = macd - exp3
        #         print(f"low={low}, high={high}, mid={mid}")
        #         print(f"loss={math.fabs(m - macd_hist.iloc[-1])}")
        if math.fabs(m - macd_hist.iloc[-1]) < 1e-10:
            break
        elif m < macd_hist.iloc[-1]:
            high = mid
        elif m > macd_hist.iloc[-1]:
            low = mid
    return new_prices


macd_hists = []
stock_prices = []
symbol_names = []

PREDICTION_LENGTH = 7

boto3_session = boto3.Session(
    aws_access_key_id="AKIAYCQZYHKTIPZ2WF6V",
    aws_secret_access_key="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",
    region_name="us-east-1"
)
sagemaker_session = sagemaker.Session(boto_session=boto3_session)
# role = sagemaker_session.get_execution_role()
# print(role)

BUCKET = "sagemaker-stock-prices"
s3 = boto3_session.resource('s3')

bucket = s3.Bucket(BUCKET)

for obj in bucket.objects.filter(Prefix="raw/"):
    key = obj.key
    print(key)
    symbol = os.path.basename(key).split('.')[0]

    table = pd.read_csv(io.BytesIO(obj.get()['Body'].read()), index_col=0, parse_dates=True)
    table = table['Close']
    stock_prices.append(table)
    symbol_names.append(symbol)

    macd = get_macd(table)
    exp3 = macd.ewm(span=9, adjust=False).mean()
    macd_hist = macd - exp3
    macd_hist = macd_hist.asfreq('D')
    macd_hists.append(macd_hist)

sm = boto3_session.client('sagemaker')
response = sm.list_training_jobs(
    MaxResults=10,
    NameContains='StalkStockTrain',
    StatusEquals='Completed',
    SortBy='CreationTime',
    SortOrder='Descending',
)
# print(response)
job_name = response["TrainingJobSummaries"][0]["TrainingJobName"]
print(job_name)

image_uri = sagemaker.image_uris.retrieve("forecasting-deepar", boto3_session.region_name)
endpoint_name = sagemaker_session.endpoint_from_job(
    job_name=job_name,
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    image_uri=image_uri,
    # role=role,
)

predictor = DeepARPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
predictor.set_prediction_parameters("D", PREDICTION_LENGTH)

list_of_df = predictor.predict(macd_hists, content_type="application/json")

# remove weekend
for k in range(len(list_of_df)):
    s = list_of_df[k]
    list_of_df[k] = s[s.index.dayofweek < 5]

predict_prices = []
for k in range(len(list_of_df)):
    predict_prices.append(pd.DataFrame({
        "0.1": macd_hist_to_price(stock_prices[k], list_of_df[k]["0.1"]),
        "0.5": macd_hist_to_price(stock_prices[k], list_of_df[k]["0.5"]),
        "0.9": macd_hist_to_price(stock_prices[k], list_of_df[k]["0.9"])
    }))

encoding = "utf-8"
BASE_DIR = "/opt/ml/processing"

if not os.path.exists(BASE_DIR):
    os.mkdir(BASE_DIR)
if not os.path.exists(f"{BASE_DIR}/macd"):
    os.mkdir(f"{BASE_DIR}/macd")
if not os.path.exists(f"{BASE_DIR}/price"):
    os.mkdir(f"{BASE_DIR}/price")

for k in range(len(symbol_names)):
    list_of_df[k].to_csv(f"{BASE_DIR}/macd/{symbol_names[k]}.csv")
    predict_prices[k].to_csv(f"{BASE_DIR}/price/{symbol_names[k]}.csv")

sagemaker_session.delete_endpoint(endpoint_name)

Overwriting stalk-stock/evaluating.py


In [10]:
from sagemaker.processing import ScriptProcessor


# processing_image_uri = sagemaker.image_uris.retrieve("xgboost", boto3.Session().region_name, version="1.0-1")
script_process = ScriptProcessor(
    image_uri="555178539686.dkr.ecr.us-east-1.amazonaws.com/stalk-stock-processor:latest",
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-stalkstock-evaluating",
    role=role,
)

In [11]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_evaluate = ProcessingStep(
    name="StalkStockEvaluate",
    processor=script_process,
    outputs=[
        ProcessingOutput(output_name="macd", destination=f"s3://{s3_evaluate_result_path}/macd/", source="/opt/ml/processing/macd"),
        ProcessingOutput(output_name="price", destination=f"s3://{s3_evaluate_result_path}/price/", source="/opt/ml/processing/price")
    ],
    code="stalk-stock/evaluating.py",
    depends_on=["StalkStockTrain"]
)

In [12]:
boto3.Session().region_name

'us-east-1'

In [13]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"StalkStockPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type
    ],
    steps=[step_process, step_train, step_evaluate],
)

NameError: name 'model_approval_status' is not defined

In [None]:
import json

json.loads(pipeline.definition())

In [None]:
pipeline.upsert(role_arn=role)