In [4]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

region  = boto3.session.Session().region_name

role = get_execution_role()

sklearn_processor = SKLearnProcessor(framework_version='0.20.0',
                                    role=role,
                                    instance_type='ml.m5.large',
                                    instance_count=1)

In [13]:
import pandas as pd

# input data s3://sklearn-script/census/census-income.csv

input_data_train = 's3://sklearn-script/taxi/train.csv'
input_data_test = 's3://sklearn-script/taxi/test.csv'

output_destination = 's3://sklearn-script/taxi/'

df = pd.read_csv(input_data_train)
#df.head(n=10)


In [14]:
df.head()

Unnamed: 0,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
0,4.5,2009-06-15 17:26:21+00:00,-73.844311,40.721319,-73.84161,40.712278,1
1,16.9,2010-01-05 16:52:16+00:00,-74.016048,40.711303,-73.979268,40.782004,1
2,5.7,2011-08-18 00:35:00+00:00,-73.982738,40.76127,-73.991242,40.750562,2
3,7.7,2012-04-21 04:30:42+00:00,-73.98713,40.733143,-73.991567,40.758092,1
4,5.3,2010-03-09 07:51:00+00:00,-73.968095,40.768008,-73.956655,40.783762,1


In [15]:
import argparse
import os
import warnings

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder,LabelBinarizer,KBinsDiscretizer,StandardScaler  
from sklearn.preprocessing import PolynomialFeatures
from sklearn.compose import make_column_transformer

In [23]:
%%writefile preprocessing.py

import argparse
import os
import warnings

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from sklearn.preprocessing import OneHotEncoder
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from scipy import sparse


from sklearn.exceptions import DataConversionWarning
warnings.filterwarnings(action='ignore', category=DataConversionWarning)


# Radius of the earth in kilometers
R = 6378

def haversine_np(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance between two points
    on the earth (specified in decimal degrees)

    All args must be of equal length.    
    
    source: https://stackoverflow.com/a/29546836

    """
    # Convert latitude and longitude to radians
    lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])

    # Find the differences
    dlon = lon2 - lon1
    dlat = lat2 - lat1

    # Apply the formula 
    a = np.sin(dlat/2.0)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2.0)**2
    # Calculate the angle (in radians)
    c = 2 * np.arcsin(np.sqrt(a))
    # Convert to kilometers
    km = R * c
    
    return km

def data_transformation(df,train_flag = True):

    ohe_columns = ['hour','weekday','day','month','year']
    traget_col = ['fare_amount']
    nom_columns = ['pickup_longitude','pickup_latitude','dropoff_longitude','dropoff_latitude','distance']
    
    # drop na values
    df = df.dropna()
    if train_flag:
        # remove outliers
        df = df[df['fare_amount'].between(left=2.5,right=100)]
       
    # removing outliers in passenger count 
    df = df[(df['passenger_count'] <= 6) & (df['passenger_count'] >=1)]

    # removing outliners in lat, lag features
    df = df.loc[df['pickup_latitude'].between(40, 42)]
    df = df.loc[df['pickup_longitude'].between(-75, -72)]

    # dropoff
    df = df.loc[df['dropoff_latitude'].between(40, 42)]
    df = df.loc[df['dropoff_longitude'].between(-75, -72)]

    ## adding new features
    df['hour']    = df['pickup_datetime'].dt.hour
    df['weekday'] = df['pickup_datetime'].dt.dayofweek
    df['day']     = df['pickup_datetime'].dt.day
    df['month']   = df['pickup_datetime'].dt.month
    df['year']    = df['pickup_datetime'].dt.year

    # removing pickup_datetime feature
    df.drop('pickup_datetime',axis=1,inplace=True)

    ## adding new feature haversine_np distance
    df['distance'] = haversine_np(df['pickup_longitude'],df['pickup_latitude'],df['dropoff_longitude'],df['dropoff_latitude'])
    
    # transformation
    #One hot encoding the cataogrical features.
    ohe = OneHotEncoder(categories='auto')
    ohe_mat = ohe.fit_transform(df[ohe_columns])

    # converting nom features to csr matrix format
    df_nom = sparse.csr_matrix(df[nom_columns].values)

    # merging
    df_csr = sparse.hstack((df_nom,ohe_mat))
    
    if train_flag:
        # target variable
        y = df[traget_col].values
        
        return df_csr,y
    
    return df_csr


    
if __name__=='__main__':
    

    input_data_path_train = os.path.join('/opt/ml/processing/input_train', 'train.csv')
    input_data_path_test = os.path.join('/opt/ml/processing/input_test', 'test.csv')
    
    train = pd.read_csv(input_data_path_train,parse_dates=['pickup_datetime'])
    test = pd.read_csv(input_data_path_test,parse_dates=['pickup_datetime'])
    
    train_features,y_train = data_transformation(train,train_flag=True)
    test_features = data_transformation(test,train_flag=False)
    
    print('Train data shape after preprocessing: {}'.format(train_features.shape))
    print('Test data shape after preprocessing: {}'.format(test_features.shape))
    
    train_features_output_path = os.path.join('/opt/ml/processing/train', 'train_features')
    train_labels_output_path = os.path.join('/opt/ml/processing/train', 'train_labels')
    
    test_features_output_path = os.path.join('/opt/ml/processing/test', 'test_features')
    
    
    print('Saving training features to {}'.format(train_features_output_path))
    sparse.save_npz(train_features_output_path,train_features,)
    
    print('Saving test features to {}'.format(test_features_output_path))
    sparse.save_npz(test_features_output_path,test_features)
    
    print('Saving training labels to {}'.format(train_labels_output_path))
    np.save(train_labels_output_path,y_train)
    

Overwriting preprocessing.py


In [24]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(code='preprocessing.py',
                      inputs=[ProcessingInput(
                        source=input_data_train,
                        destination='/opt/ml/processing/input_train'),
                              ProcessingInput(
                                  source=input_data_test,
                                  destination='/opt/ml/processing/input_test')],
                      outputs=[ProcessingOutput(output_name='train_data',
                                                destination = output_destination,
                                                source='/opt/ml/processing/train'),
                               ProcessingOutput(output_name='test_data',
                                                destination = output_destination,
                                                source='/opt/ml/processing/test')]
                     )


Job Name:  sagemaker-scikit-learn-2020-01-04-11-58-46-501
Inputs:  [{'InputName': 'input-1', 'S3Input': {'S3Uri': 's3://sklearn-script/taxi/train.csv', 'LocalPath': '/opt/ml/processing/input_train', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'S3Input': {'S3Uri': 's3://sklearn-script/taxi/test.csv', 'LocalPath': '/opt/ml/processing/input_test', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-166087373671/sagemaker-scikit-learn-2020-01-04-11-58-46-501/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train_data', 'S3Output': {'S3Uri': 's3://sklearn-script/taxi/',

In [25]:
preprocessing_job_description = sklearn_processor.jobs[-1].describe()


In [26]:
preprocessing_job_description

{'ProcessingInputs': [{'InputName': 'input-1',
   'S3Input': {'S3Uri': 's3://sklearn-script/taxi/train.csv',
    'LocalPath': '/opt/ml/processing/input_train',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}},
  {'InputName': 'input-2',
   'S3Input': {'S3Uri': 's3://sklearn-script/taxi/test.csv',
    'LocalPath': '/opt/ml/processing/input_test',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}},
  {'InputName': 'code',
   'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-166087373671/sagemaker-scikit-learn-2020-01-04-11-58-46-501/input/code/preprocessing.py',
    'LocalPath': '/opt/ml/processing/input/code',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'train_d

In [1]:
#output_config = preprocessing_job_description['ProcessingOutputConfig']
#output_config

In [2]:
''' 
for output in output_config['Outputs']:
    if output['OutputName'] == 'train_data':
        preprocessed_training_data = output['S3Output']['S3Uri']
    if output['OutputName'] == 'test_data':
        preprocessed_test_data = output['S3Output']['S3Uri']

'''

" \nfor output in output_config['Outputs']:\n    if output['OutputName'] == 'train_data':\n        preprocessed_training_data = output['S3Output']['S3Uri']\n    if output['OutputName'] == 'test_data':\n        preprocessed_test_data = output['S3Output']['S3Uri']\n\n"

## Training using the pre-processed data# 

In [5]:

from sagemaker.sklearn.estimator import SKLearn

sklearn = SKLearn(
    entry_point='train.py',
    train_instance_type="ml.m5.large",
    role=role)



In [41]:
preprocessed_training_data = 's3://sklearn-script/taxi/'

In [53]:
%%writefile train.py

import os 
from sklearn.linear_model import LinearRegression
from scipy import sparse
import boto3
import numpy as np
from io import BytesIO
from sklearn.externals import joblib

if __name__=="__main__":
    
    training_data_directory = '/opt/ml/input/data/train'
    
    train_features_data = os.path.join(training_data_directory, 'train_features.csv.npz')
    
    train_labels_data = os.path.join(training_data_directory, 'train_labels.csv.npy')
    
    s3 = boto3.client('s3')
    
    print(f'training data directory :{training_data_directory}')
    print(f'training features path :{train_features_data}')
    print(f'training labels path :{train_labels_data}')
    
    print('Reading input data')
    
    
    X_train = sparse.load_npz(train_features_data)
    y_train = np.load(train_labels_data)
    
    
    model = LinearRegression()
    model.fit(X_train,y_train)

    print('Training LR model')
    model_output_directory = os.path.join('/opt/ml/model', "model.joblib")
    
    print('Saving model to {}'.format(model_output_directory))
    joblib.dump(model, model_output_directory)
    
    
    

Overwriting train.py


In [54]:

sklearn.fit({'train': preprocessed_training_data})

training_job_description = sklearn.jobs[-1].describe()

model_data_s3_uri = '{}{}/{}'.format(
    training_job_description['OutputDataConfig']['S3OutputPath'],
    training_job_description['TrainingJobName'],
    'output/model.tar.gz')

2020-01-04 16:16:35 Starting - Starting the training job...
2020-01-04 16:16:38 Starting - Launching requested ML instances......
2020-01-04 16:18:01 Starting - Preparing the instances for training...
2020-01-04 16:18:33 Downloading - Downloading input data...
2020-01-04 16:19:06 Training - Downloading the training image..[34m2020-01-04 16:19:19,791 sagemaker-containers INFO     Imported framework sagemaker_sklearn_container.training[0m
[34m2020-01-04 16:19:19,794 sagemaker-containers INFO     No GPUs detected (normal if no gpus installed)[0m
[34m2020-01-04 16:19:19,803 sagemaker_sklearn_container.training INFO     Invoking user training script.[0m
[34m2020-01-04 16:19:20,026 sagemaker-containers INFO     Module train does not provide a setup.py. [0m
[34mGenerating setup.py[0m
[34m2020-01-04 16:19:20,026 sagemaker-containers INFO     Generating setup.cfg[0m
[34m2020-01-04 16:19:20,026 sagemaker-containers INFO     Generating MANIFEST.in[0m
[34m2020-01-04 16:19:20,026 sag

In [31]:
from sklearn.linear_model import LinearRegression
from scipy import sparse
import numpy as np
from io import BytesIO

s3 = boto3.client('s3')

obj = s3.get_object(Bucket='sklearn-script', Key='sklearn-script/taxi/train_features.csv.npz')

X_train = sparse.load_npz(BytesIO(obj['Body'].read()))

obj = s3.get_object(Bucket='sklearn-script', Key='taxi/train_labels.csv.npy')

y_train = np.load(BytesIO(obj['Body'].read()))

model = LinearRegression()
model.fit(X_train,y_train)


LinearRegression(copy_X=True, fit_intercept=True, n_jobs=None,
         normalize=False)

In [32]:
y_pred = model.predict(X_train)

In [33]:
y_pred

array([[12.63810099],
       [19.84125233],
       [ 5.41045209],
       ...,
       [12.99612292],
       [ 8.42002382],
       [13.24840725]])

In [38]:
import numpy as np
from s3fs.core import S3FileSystem
s3 = S3FileSystem()

key = 'taxi/train_features.csv.npz'
bucket = 'sklearn-script'

df = sparse.load_npz(s3.open('{}/{}'.format(bucket, key)))

In [39]:
df.shape

(9747, 86)

In [47]:
s3 = boto3.client('s3')

obj = s3.get_object(Key='s3://sklearn-script/taxi/train_features.csv.npz')

X_train = sparse.load_npz(BytesIO(obj['Body'].read()))

ParamValidationError: Parameter validation failed:
Missing required parameter in input: "Bucket"