## Orders data analysis with Sagemaker

This notebook will demonstrate how to train and test LinearLearner model on SageMaker.

In [None]:
import os
import boto3

import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score, mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.linear_model import SGDRegressor
import sagemaker

import matplotlib.pyplot as plt
%matplotlib inline

import seaborn as sns

from config import *
import warnings
warnings.filterwarnings('ignore')

In [None]:
INPUT_FILENAME = 'train_df.csv'
DATA_LOCAL_PATH = 'orders'

FULL_S3_INPUT_PATH = os.path.join('s3://', SAGEMAKER_S3_BUCKET, INPUT_S3_PATH, INPUT_FILENAME)
FULL_S3_OUTPUT_PATH = os.path.join('s3://', SAGEMAKER_S3_BUCKET, OUTPUT_S3_PATH)

ROLE = SAGEMAKER_ROLE_ARN
SAGEMAKER_SESSION = sagemaker.Session()

CONTAINERS = {
    'us-west-2': '174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest',
    'us-east-1': '382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest',
    'us-east-2': '404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest',
    'eu-west-1': '438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest'
}

In [None]:
!echo "Syncing from $DATA_S3_PATH to $DATA_LOCAL_PATH"
!aws s3 sync "$DATA_S3_PATH" "$DATA_LOCAL_PATH"

In [None]:
def load_orders_df(data_path):
    paths = os.listdir(data_path)
    partial_order_dfs = [pd.read_json(os.path.join(data_path, path), lines=True) for path in paths]
    orders_df = pd.concat(partial_order_dfs, copy=False)
    orders_df['order_date'] = pd.to_datetime(orders_df['order_date'])
    return orders_df


def split_train_test_df(daily_profit_df):
    datetime_index = pd.to_datetime(daily_profit_df.index)
    datetime_index_in_days = np.array(datetime_index).astype('datetime64[D]')
    x = np.array(pd.to_numeric(datetime_index_in_days)) 
    y = daily_profit_df['profit']

    return train_test_split(x, y, test_size=0.2, random_state=0)


def profit_by_period(orders_df, date_period):
    if date_period == 'day':
        time_grouper = pd.TimeGrouper('D')
    else:
        time_grouper = pd.TimeGrouper('M')
    return orders_df.set_index('order_date').groupby(time_grouper).sum().rename({'price': 'profit'}, axis=1)

In [None]:
def summarize_prices(orders_df):
    plt.figure(figsize=(20,10))
    orders_df['price'].plot.hist(bins=100)
    
    
def plot_df_by_date(orders_df, date_period):
    profit_by_date = profit_by_period(orders_df, date_period)
    profit_by_date.plot(figsize=(20,10))
    plt.xticks(rotation='vertical')
    plt.show()


def show_regression_report(x, y, y_pred, scaler=None):        
    x_time_int = pd.Series(np.rint(x.reshape(-1)))
    x_time = pd.to_datetime(x_time_int, unit='d')

    print('Test scores:')
    print('R2:', r2_score(y, y_pred))
    print('RMSE:', np.sqrt(mean_squared_error(y, y_pred)))
    
    pred_df = pd.DataFrame({'profit': y, 'predicted_profit': y_pred}, index=x_time)
    pred_df.plot(figsize=(20, 10))
    plt.xticks(rotation='vertical')
    plt.show()

In [None]:
def training_data_to_dataframe(x, y):
    train_df = pd.DataFrame({'x': x.reshape(-1), 'y': y})
    return train_df.reindex(['y', 'x'], axis=1)


def upload_sagemaker_input(df, sagemaker_bucket, input_s3_path, input_filename):
    df.to_csv(input_filename, index=False, header=False)
    input_s3_key = os.path.join(input_s3_path, input_filename)
    print('Putting input data to {}'.format(os.path.join(sagemaker_bucket, input_s3_key)))
    
    s3_client = boto3.client('s3')
    s3_client.upload_file(
        Bucket=sagemaker_bucket,
        Key=input_s3_key,
        Filename=input_filename
    )
    

def create_sagemaker_linear_regression(train_instance_type, model_name, output_path, mini_batch_size=10):     
    linear_regression = sagemaker.estimator.Estimator(
        CONTAINERS[REGION_NAME],
        ROLE, 
        train_instance_count=1, 
        train_instance_type=train_instance_type,
        output_path=output_path,
        sagemaker_session=SAGEMAKER_SESSION
    )

    linear_regression.set_hyperparameters(
        feature_dim=1,
        predictor_type='regressor',
        loss='squared_loss',
        wd=1e-4,
        optimizer='sgd',
        learning_rate=0.1,
        mini_batch_size=mini_batch_size,
        epochs=5
    )
    
    return linear_regression


def predict_with_sagemaker(sagemaker_predictor, x):
    result = sagemaker_predictor.predict(x)
    predictions = result['predictions']
    return np.array([prediction['score'] for prediction in predictions])

In [None]:
orders_df = load_orders_df(DATA_LOCAL_PATH)

## Data exploration

Let's explore orders using visual analysis and select an algorithm to predict future sales profit.

### Orders price distribution

In [None]:
summarize_prices(orders_df)

### Profit by month

In [None]:
plot_df_by_date(orders_df, 'month')

### Profit by day

In [None]:
plot_df_by_date(orders_df, 'day')

## Data preprocessing

We need to convert data into proper format and split into train and test set.

In [None]:
daily_profit_df = profit_by_period(orders_df, 'day')

In [None]:
x_train, x_test, y_train, y_test = split_train_test_df(daily_profit_df)

## Upload data to S3

The code below will upload training data to S3.

In [None]:
train_df = training_data_to_dataframe(x_train, y_train)

In [None]:
upload_sagemaker_input(train_df, SAGEMAKER_S3_BUCKET, INPUT_S3_PATH, INPUT_FILENAME)

## Fit Sagemaker's Linear Learner
Model training requires specyfing where data is located and what type of instance we want to use.

In [None]:
sagemaker_linear_regression = create_sagemaker_linear_regression(
    train_instance_type=SAGEMAKER_TRAINING_INSTANCE_TYPE,
    model_name=MODEL_NAME,
    output_path=FULL_S3_OUTPUT_PATH
)

In [None]:
sagemaker_linear_regression.fit(
    {
        'train': sagemaker.s3_input(
            FULL_S3_INPUT_PATH,
            content_type='text/csv'
        )
    },
    logs=False
)

## Deploy model to an endpoint

After training, we use fitted object to build and deploy model. This creates a SageMaker endpoint that can be used to perform inference.

In [None]:
linear_predictor = sagemaker_linear_regression.create_model(name=MODEL_NAME).deploy(
    initial_instance_count=1,
    instance_type=SAGEMAKER_HOSTING_INSTANCE_TYPE,
    endpoint_name=ENDPOINT_NAME
)

In [None]:
linear_predictor.content_type = 'text/csv'
linear_predictor.serializer = sagemaker.predictor.csv_serializer
linear_predictor.deserializer = sagemaker.predictor.json_deserializer

## Validate results
Using the deployed endpoint we can check accuracy of our model on test data.

In [None]:
predictions = predict_with_sagemaker(linear_predictor, x_test.reshape(-1, 1))

In [None]:
show_regression_report(x_test, y_test, predictions)