# Introduction

Linear models are supervised learning algorithms used for solving either classification or regression problems.

The dataset we use contains information collected from unicorn ride data. The features are measurements like health points, magic points, temperature min, temperature max, and precipitation, while the label defines if too many magic points were used for each ride. The question we have decided to answer is, "Given the distance and weather (temp min, temp max, precipitation), were too many magic points consumed?"

We will use Amazon SageMaker's python SDK in order to train a linear learner classifier in its simplest setting. We explain uploading data to Amazon S3, training a model, and using Lambda for serverless inference.


## Dataset

We're going to work with the unicorn ride dataset we uploaded to our raw S3 bucket. After you uploaded the file, a Lambda function added a few new features: temperature min, temperature max, and precipitation amount. The Lambda function also added a boolean (zero or one) label indicating if we think too many magic points were used. Our current calculation is: magic_points >= distance * 50

Provide the transformed and model bucket names found in CloudFormation. **Watch out for tabs if you copy/paste!**

In [None]:
transformed_bucket = '' # provide your transformed bucket name from CloudFormation
model_bucket = '' # provide your model bucket name from CloudFormation

In [None]:
%%time

import boto3
import io
import os
from concurrent import futures

processed_prefix = 'processed'

s3 = boto3.resource('s3')
bucket = s3.Bucket(transformed_bucket)
objects = bucket.objects.filter(Prefix=processed_prefix)

folder = '/home/ec2-user/SageMaker/'

def download(obj):
    filename = os.path.join(folder,obj.key)
    os.makedirs(os.path.dirname(filename), exist_ok=True)
    bucket.download_file(obj.key, filename)

with futures.ThreadPoolExecutor(max_workers=10) as executor:
    future_to_obj = {executor.submit(download, obj): obj for obj in objects}

    print("All URLs submitted.")
    
    files = 0
    for future in futures.as_completed(future_to_obj):
        files += 1
    
    print("All " + str(files) + " files downloaded.")

Let's just verify the count of how many files were downloaded at a file system level

In [None]:
%%bash
ls processed | wc -l

Now that our files have downloaded to the notebook instance, let's load them into a dataframe.

We are also going to modify the format of 'statustime' to match up with ground station data collected daily.

In [None]:
import glob
import pandas as pd

filenames = glob.glob(folder + "/processed/*.csv")

list = []
for filename in filenames:
    list.append(pd.read_csv(filename))
frame = pd.concat(list, axis=0, ignore_index=True)
frame['statustime'] = pd.to_datetime(frame['statustime']).dt.to_period('D')
display(frame)

Let's load our ground station weather data into a dataframe.

In [None]:
import csv

nygs_prefix = 'nygroundstationdata'

s3 = boto3.resource('s3')
bucket = s3.Bucket(transformed_bucket)
objects = bucket.objects.filter(Prefix=nygs_prefix)
for obj in objects:
    if obj.key.endswith('csv'):
        ny = pd.read_csv(obj.get()['Body'], index_col="id")
ny['year_date'] = pd.to_datetime(ny['year_date'], format='%Y%m%d').dt.to_period('D')
display(ny)

As you can see we have weather data across the state of New York going back to 1869. Let's pivot the table so each row has a date, weather station ID, and weather values we are interested in.

Notice there are many elements in the 'ny' dataframe. We are trimming the elements to a select few. We are also replacing empty values with zero to make our future work easier.

This is a key move to enable us to construct a join the easy way.

In [None]:
ny_pivot = ny.pivot_table(index=['year_date','id'], columns='element', values='data_value')
trimmed_ny_pivot = ny_pivot[['TMIN','TMAX','PRCP']].fillna(0)
display(trimmed_ny_pivot)

Now we just need to merge our ride data with weather data

In [None]:
merge_df = frame.merge(trimmed_ny_pivot, left_on=['statustime','groundstation'], right_on=['year_date','id'])
display(merge_df)

## Pre-Processing the Data
Now that we have the raw data, let's process it. 
We'll first load the data into numpy arrays, and randomly split it into train and test with a 90/10 split.

In [None]:
import numpy as np
import os

processed_subdir = "standardized"
train_features_file = os.path.join(folder, processed_subdir, "train/csv/features.csv")
train_labels_file = os.path.join(folder, processed_subdir, "train/csv/labels.csv")
test_features_file = os.path.join(folder, processed_subdir, "test/csv/features.csv")
test_labels_file = os.path.join(folder, processed_subdir, "test/csv/labels.csv")

raw = merge_df[['distance','healthpoints','magicpoints','TMIN','TMAX','PRCP','heavy_utilization']].to_numpy(dtype=np.float32)

# split into train/test with a 90/10 split
np.random.seed(0)
np.random.shuffle(raw)
train_size = int(0.9 * raw.shape[0])
train_features = raw[:train_size, :-1]
train_labels = raw[:train_size, -1]
test_features = raw[train_size:, :-1]
test_labels = raw[train_size:, -1]

print('train_features shape = ', train_features.shape)
print('train_labels shape = ', train_labels.shape)
print('test_features shape = ', test_features.shape)
print('test_labels shape = ', test_labels.shape)

## Upload to Amazon S3
Now, since typically the dataset will be large and located in Amazon S3, let's write the data to Amazon S3 in recordio-protobuf format. We first create an io buffer wrapping the data, next we upload it to Amazon S3. Notice that the choice of bucket and prefix should change for different users and different datasets

In [None]:
import sagemaker.amazon.common as smac

train_prefix = 'train'
key = 'recordio-pb-data'

buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, train_features, train_labels)
buf.seek(0)

boto3.resource('s3').Bucket(transformed_bucket).Object(os.path.join(train_prefix, key)).upload_fileobj(buf)
s3_train_data = 's3://{}/{}/{}'.format(transformed_bucket, train_prefix, key)
print('uploaded training data location: {}'.format(s3_train_data))

It is also possible to provide test data. This way we can get an evaluation of the performance of the model from the training logs. In order to use this capability let's upload the test data to Amazon S3 as well

In [None]:
test_prefix = 'test'

buf = io.BytesIO()
smac.write_numpy_to_dense_tensor(buf, test_features, test_labels)
buf.seek(0)

boto3.resource('s3').Bucket(transformed_bucket).Object(os.path.join(test_prefix, key)).upload_fileobj(buf)
s3_test_data = 's3://{}/{}/{}'.format(transformed_bucket, test_prefix, key)
print('uploaded test data location: {}'.format(s3_test_data))

## Training

We take a moment to explain at a high level, how Machine Learning training and prediction works in Amazon SageMaker. First, we need to train a model. This is a process that given a labeled dataset and hyper-parameters guiding the training process,  outputs a model. Once the training is done, we set up what is called an **endpoint**. An endpoint is a web service that given a request containing an unlabeled data point, or mini-batch of data points, returns a prediction(s).

In Amazon SageMaker the training is done via an object called an **estimator**. When setting up the estimator we specify the location (in Amazon S3) of the training data, the path (again in Amazon S3) to the output directory where the model will be serialized, generic hyper-parameters such as the machine type to use during the training process, and specific hyper-parameters such as the index type, etc. Once the estimator is initialized, we can call its **fit** method in order to do the actual training.

Now that we are ready for training, we start with a convenience function that starts a training job.

In [None]:
import matplotlib.pyplot as plt

import sagemaker
from sagemaker import get_execution_role
from sagemaker.predictor import csv_serializer, json_deserializer
from sagemaker.amazon.amazon_estimator import get_image_uri


def trained_estimator_from_hyperparams(s3_train_data, hyperparams, output_path, s3_test_data=None):
    """
    Create an Estimator from the given hyperparams, fit to training data, 
    and return a deployed predictor
    
    """
    # set up the estimator
    linear = sagemaker.estimator.Estimator(get_image_uri(boto3.Session().region_name, "linear-learner"),
        get_execution_role(),
        train_instance_count=1,
        train_instance_type='ml.m5.2xlarge',
        output_path=output_path,
        sagemaker_session=sagemaker.Session())
    linear.set_hyperparameters(**hyperparams)
    
    # train a model. fit_input contains the locations of the train and test data
    fit_input = {'train': s3_train_data}
    if s3_test_data is not None:
        fit_input['test'] = s3_test_data
    linear.fit(fit_input)
    return linear

Now, we run the actual training job. For now, we stick to default parameters.

In [None]:
import math

hyperparams = {
    'feature_dim': int(train_features.shape[1]),
    'mini_batch_size': int(0.1 * train_features.shape[0]),
    'predictor_type': 'binary_classifier' 
}

output_path = 's3://' + model_bucket
linear_estimator = trained_estimator_from_hyperparams(s3_train_data, hyperparams, output_path, 
                                                   s3_test_data=s3_test_data)

Notice that we mentioned a test set in the training job. When a test set is provided the training job doesn't just produce a model but also applies it to the test set and reports the accuracy. In the logs you can view the accuracy of the model on the test set.

## Conclusion

We now have a trained model living in S3. Instead of creating a SageMaker Endpoint, we will use Lambda to make inferences against the model.

If you want to test the model using a SageMaker Endpoint before moving on, check out our documentation:
https://docs.aws.amazon.com/sagemaker/latest/dg/ex1-deploy-model.html#ex1-deploy-model-boto