# s3 as Input and Output Source

## Pre-Processing

In [None]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

region = boto3.session.Session().region_name

role = get_execution_role()
# instantiate a SKLearnProcessor object
sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0", role=role, instance_type="ml.t3.medium", instance_count=1
)

In [None]:
import pandas as pd

input_data = "s3://{sagemaker-databucket}/census-income.csv"
df = pd.read_csv(input_data, nrows=10)
#df.dtypes
df.head(n=10)

In [None]:
%%writefile preprocessing.py

import argparse
import os
import warnings

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelBinarizer, KBinsDiscretizer
from sklearn.preprocessing import PolynomialFeatures
from sklearn.compose import make_column_transformer

from sklearn.exceptions import DataConversionWarning

warnings.filterwarnings(action="ignore", category=DataConversionWarning)

columns = [
    "age",
    "class_of_worker",
    "education",
    "major_industry_code",
    "capital_gains",
    "capital_losses",
    "dividends_from_stocks",
    "num_persons_worked_for_employer",
    "income",
]
class_labels = [0, 1]


def print_shape(df):
    print('*****IN print_shape df')
    negative_examples, positive_examples = np.bincount(df["income"])
    print(
        "Data shape: {}, {} positive examples, {} negative examples".format(
            df.shape, positive_examples, negative_examples
        )
    )


if __name__ == "__main__":
    print('**** IN MAIN')
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    print("Received arguments {}".format(args))

    input_data_path = os.path.join("/opt/ml/processing/input", "census-income.csv")

    print("Reading input data from {}".format(input_data_path))
    df = pd.read_csv(input_data_path)
    pd.set_option('max_columns', None)
    # df = pd.DataFrame(data=df, columns=columns)
    
    df.dropna(inplace=True)
    df.drop_duplicates(inplace=True)
    #df.replace(class_labels, [0, 1], inplace=True)

    negative_examples, positive_examples = np.bincount(df["income"])
    print(
        "Data after cleaning: {}, {} positive examples, {} negative examples".format(
            df.shape, positive_examples, negative_examples
        )
    )
    
    # Spliting data into 70% training dataset and 30% test datatset
    split_ratio = args.train_test_split_ratio
    print("Splitting data into train and test sets with ratio {}".format(split_ratio))
    X_train, X_test, y_train, y_test = train_test_split(
        df.drop("income", axis=1), df["income"], test_size=split_ratio, random_state=0
    )
    
    # Converting columns into machine readable format (i.e. 0's and 1's)
    preprocess = make_column_transformer(
        (
            ["age", "num_persons_worked_for_employer"],
            KBinsDiscretizer(encode="onehot-dense", n_bins=10),
        ),
        (["capital_gains", "capital_losses", "dividends_from_stocks"], StandardScaler()),
        (["education", "major_industry_code", "class_of_worker"], OneHotEncoder(sparse=False)),
    )
    print("Running preprocessing and feature engineering transformations")
    train_features = preprocess.fit_transform(X_train)
    test_features = preprocess.transform(X_test)

    print("Train data shape after preprocessing: {}".format(train_features.shape))
    print("Test data shape after preprocessing: {}".format(test_features.shape))

    train_features_output_path = os.path.join("/opt/ml/processing/train", "train_features.csv")
    train_labels_output_path = os.path.join("/opt/ml/processing/train", "train_labels.csv")

    test_features_output_path = os.path.join("/opt/ml/processing/test", "test_features.csv")
    test_labels_output_path = os.path.join("/opt/ml/processing/test", "test_labels.csv")

    print("Saving training features to {}".format(train_features_output_path))
    pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False)

    print("Saving test features to {}".format(test_features_output_path))
    pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False)

    print("Saving training labels to {}".format(train_labels_output_path))
    y_train.to_csv(train_labels_output_path, header=False, index=False)

    print("Saving test labels to {}".format(test_labels_output_path))
    y_test.to_csv(test_labels_output_path, header=False, index=False)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

# run this processing job, by specifying the code to use, inputs, outputs, and the arguments, if any
sklearn_processor.run(
    code="preprocessing.py",
    inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(output_name="train_data", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/test"),
    ],
    arguments=["--train-test-split-ratio", "0.2"],
)

preprocessing_job_description = sklearn_processor.jobs[-1].describe()

output_config = preprocessing_job_description["ProcessingOutputConfig"]
for output in output_config["Outputs"]:
    if output["OutputName"] == "train_data":
        preprocessed_training_data = output["S3Output"]["S3Uri"]
    if output["OutputName"] == "test_data":
        preprocessed_test_data = output["S3Output"]["S3Uri"]

In [None]:
training_features = pd.read_csv(preprocessed_training_data + "/train_features.csv", nrows=10)
print("Training features shape: {}".format(training_features.shape))
training_features.head(n=10)

## Training

In [None]:
from sagemaker.sklearn.estimator import SKLearn

# Instantiate a SKLearn Job
sklearn = SKLearn(
    entry_point="train.py", framework_version="0.20.0", instance_type="ml.m5.xlarge", role=role
)

In [None]:
%%writefile train.py

import os

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.externals import joblib

# This function is required for batch prediction/inference
def model_fn(model_dir):
    clf = joblib.load(os.path.join(model_dir, "model.joblib"))
    return clf

if __name__ == "__main__":
    training_data_directory = "/opt/ml/input/data/train"
    train_features_data = os.path.join(training_data_directory, "train_features.csv")
    train_labels_data = os.path.join(training_data_directory, "train_labels.csv")
    print("Reading input data")
    X_train = pd.read_csv(train_features_data, header=None)
    y_train = pd.read_csv(train_labels_data, header=None)

    model = LogisticRegression(class_weight="balanced", solver="lbfgs")
    print("Training LR model")
    model.fit(X_train, y_train)
    model_output_directory = os.path.join("/opt/ml/model", "model.joblib")
    print("Saving model to {}".format(model_output_directory))
    joblib.dump(model, model_output_directory)

In [None]:
# To start training job we call .fit() method
sklearn.fit({"train": preprocessed_training_data})
training_job_description = sklearn.jobs[-1].describe()
model_data_s3_uri = "{}{}/{}".format(
    training_job_description["OutputDataConfig"]["S3OutputPath"],
    training_job_description["TrainingJobName"],
    "output/model.tar.gz",
)
print('model_data_s3_uri: ', model_data_s3_uri)

## Model Evaluation

In [None]:
%%writefile evaluation.py

import json
import os
import tarfile

import pandas as pd

from sklearn.externals import joblib
from sklearn.metrics import classification_report, roc_auc_score, accuracy_score

if __name__ == "__main__":
    model_path = os.path.join("/opt/ml/processing/model", "model.tar.gz")
    print("Extracting model from path: {}".format(model_path))
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    print("Loading model")
    model = joblib.load("model.joblib")

    print("Loading test input data")
    test_features_data = os.path.join("/opt/ml/processing/test", "test_features.csv")
    test_labels_data = os.path.join("/opt/ml/processing/test", "test_labels.csv")

    X_test = pd.read_csv(test_features_data, header=None)
    y_test = pd.read_csv(test_labels_data, header=None)
    predictions = model.predict(X_test)

    print("Creating classification evaluation report")
    report_dict = classification_report(y_test, predictions, output_dict=True)
    report_dict["accuracy"] = accuracy_score(y_test, predictions)
    report_dict["roc_auc"] = roc_auc_score(y_test, predictions)

    print("Classification report:\n{}".format(report_dict))

    evaluation_output_path = os.path.join("/opt/ml/processing/evaluation", "evaluation.json")
    print("Saving classification report to {}".format(evaluation_output_path))

    with open(evaluation_output_path, "w") as f:
        f.write(json.dumps(report_dict))

In [None]:
import json
from sagemaker.s3 import S3Downloader

sklearn_processor.run(
    code="evaluation.py",
    inputs=[
        ProcessingInput(source=model_data_s3_uri, destination="/opt/ml/processing/model"),
        ProcessingInput(source=preprocessed_test_data, destination="/opt/ml/processing/test"),
    ],
    outputs=[ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
)
evaluation_job_description = sklearn_processor.jobs[-1].describe()

In [None]:
evaluation_output_config = evaluation_job_description["ProcessingOutputConfig"]
for output in evaluation_output_config["Outputs"]:
    if output["OutputName"] == "evaluation":
        evaluation_s3_uri = output["S3Output"]["S3Uri"] + "/evaluation.json"
        break

evaluation_output = S3Downloader.read_file(evaluation_s3_uri)
evaluation_output_dict = json.loads(evaluation_output)
print(json.dumps(evaluation_output_dict, sort_keys=True, indent=4))

## Batch Inference

In [None]:
import sagemaker as sage
from time import gmtime, strftime

sagemaker_session = sage.Session()

transform_output_folder = "batch-transform-output"
output_path = "s3://{}/{}".format(sagemaker_session.default_bucket(), transform_output_folder)
print("output_path: ", output_path)
transformer = sklearn.transformer(
    instance_count=1,
    instance_type="ml.m4.xlarge",
    output_path=output_path,
    assemble_with="Line",
    accept="text/csv",
)

In [None]:
data_location = "{}/{}".format(preprocessed_test_data, "test_features.csv") 

print("data_location: ", data_location)
df = pd.read_csv(data_location, nrows=5)
df.head(n=5)

In [None]:
transformer.transform(
    data_location, content_type="text/csv", split_type="Line"#, input_filter="$[1:]"
)
transformer.wait()

In [None]:
import pandas as pd

inference_output_data = "{}/{}".format(output_path, "test_features.csv.out")
df = pd.read_csv(inference_output_data, nrows=10)
df.head(n=10)

# Snowflake as Input and Output Source

## Build your custom container

In [None]:
!pip install sagemaker-studio-image-build

In [None]:
!mkdir docker

In [None]:
%%writefile docker/Dockerfile


FROM python:3.8
#-slim-buster

RUN pip install pandas==1.1.5 numpy==1.16.5 scikit-learn==0.23.1 
RUN pip install "snowflake-snowpark-python[pandas]"

#ENV PYTHONUNBUFFERED=TRUE


ENTRYPOINT ["python3"]

In [None]:
!sm-docker build ./docker --repository mydockerrepo1:2

In [None]:
processing_repository_uri = '{account_no}.dkr.ecr.{region}.amazonaws.com/mydockerrepo1:2'

## Preprocessing using custom container

In [None]:
from sagemaker.processing import ScriptProcessor

# Initialising the Script Processor
script_processor = ScriptProcessor(
    command=["python3"],
    image_uri=processing_repository_uri,
    role=role,
    instance_count=1,
    instance_type="ml.t3.medium",
)

In [None]:
%%writefile preprocessing_custom.py


import argparse
import os
import warnings

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelBinarizer, KBinsDiscretizer
from sklearn.preprocessing import PolynomialFeatures
from sklearn.compose import make_column_transformer

from sklearn.exceptions import DataConversionWarning

from snowflake.snowpark import (
    Column,
    DataFrame,
    Session,
    Window
)
from snowflake.snowpark import functions as f
from snowflake.snowpark.types import IntegerType, StringType, StructType, DateType, StructField, MapType
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, when, to_date


warnings.filterwarnings(action="ignore", category=DataConversionWarning)


columns = [
    "age",
    "class_of_worker",
    "education",
    "major_industry_code",
    "capital_gains",
    "capital_losses",
    "dividends_from_stocks",
    "num_persons_worked_for_employer",
    "income",
]
class_labels = [0, 1]

CONNECTION_PARAMETERS = {
"account": "",
"user": "",
"password": "",
"role": "",
"warehouse": "",
"database": "",
"schema": "",
}


def print_shape(df):
    print('*****IN print_shape df')
    negative_examples, positive_examples = np.bincount(df["income"])
    print(
        "Data shape: {}, {} positive examples, {} negative examples".format(
            df.shape, positive_examples, negative_examples
        )
    )
    

def transformation_pipeline():
    print('INSIDE TRANSFORMSTION PIPELINE')
    session = Session.builder.configs(CONNECTION_PARAMETERS).create()
    
    session.sql("select current_warehouse(), current_database(), current_schema()").show()
    
    df_train = session.table('SAGEMAKER_TABLE')

    df_train_pd = df_train.to_pandas()
    
    print("df_train_pd: ",df_train_pd.columns)
    return df_train_pd


if __name__ == "__main__":
    print('**** IN MAIN')
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    print("Received arguments {}".format(args))

    df = transformation_pipeline()
    pd.set_option('max_columns', None)
    print('******read snowflake table df', df.head(n=5))
 
    df.dropna(inplace=True)
    df.drop_duplicates(inplace=True)

    negative_examples, positive_examples = np.bincount(df["income"])
    print(
        "Data after cleaning: {}, {} positive examples, {} negative examples".format(
            df.shape, positive_examples, negative_examples
        )
    )

    split_ratio = args.train_test_split_ratio
    print("Splitting data into train and test sets with ratio {}".format(split_ratio))
    X_train, X_test, y_train, y_test = train_test_split(
        df.drop("income", axis=1), df["income"], test_size=split_ratio, random_state=0
    )

    preprocess = make_column_transformer(
        (
            KBinsDiscretizer(encode="onehot-dense", n_bins=10),
            ["age", "num_persons_worked_for_employer"],
        ),
        (StandardScaler(), ["capital_gains", "capital_losses", "dividends_from_stocks"]),
        (OneHotEncoder(sparse=False), ["education", "major_industry_code", "class_of_worker"]),
    )
    print("Running preprocessing and feature engineering transformations")
    train_features = preprocess.fit_transform(X_train)
    test_features = preprocess.transform(X_test)

    print("Train data shape after preprocessing: {}".format(train_features.shape))
    print("Test data shape after preprocessing: {}".format(test_features.shape))

    train_features_output_path = os.path.join("/opt/ml/processing/train", "train_features.csv")
    train_labels_output_path = os.path.join("/opt/ml/processing/train", "train_labels.csv")

    test_features_output_path = os.path.join("/opt/ml/processing/test", "test_features.csv")
    test_labels_output_path = os.path.join("/opt/ml/processing/test", "test_labels.csv")

    print("Saving training features to {}".format(train_features_output_path))
    pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False)

    print("Saving test features to {}".format(test_features_output_path))
    pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False)

    print("Saving training labels to {}".format(train_labels_output_path))
    y_train.to_csv(train_labels_output_path, header=False, index=False)

    print("Saving test labels to {}".format(test_labels_output_path))
    y_test.to_csv(test_labels_output_path, header=False, index=False)


In [None]:
script_processor.run(
    code="preprocessing_custom.py",
    inputs=[],
    outputs=[
        ProcessingOutput(output_name="train_data", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test_data", source="/opt/ml/processing/test"),
    ],
    arguments=["--train-test-split-ratio", "0.2"],
)
script_processor_job_description = script_processor.jobs[-1].describe()
print('$$$$$$$$ outside script processor')
output_config = script_processor_job_description["ProcessingOutputConfig"]
for output in output_config["Outputs"]:
    if output["OutputName"] == "train_data":
        preprocessed_training_data_custom = output["S3Output"]["S3Uri"]
        print('$$$$$$$$ preprocessed_training_data_custom: ', preprocessed_training_data_custom)
    if output["OutputName"] == "test_data":
        preprocessed_test_data_custom = output["S3Output"]["S3Uri"]
        print('$$$$$$$$ preprocessed_test_data_custom: ', preprocessed_test_data_custom)
        

In [None]:
training_custom_features = pd.read_csv(preprocessed_training_data_custom + "/train_features.csv", nrows=10)
print("Training features shape: {}".format(training_features.shape))
training_custom_features.head(n=10)

## Model Training

In [None]:
from sagemaker.sklearn.estimator import SKLearn

sklearn = SKLearn(
    entry_point="train.py", framework_version="0.20.0", instance_type="ml.m5.xlarge", role=role
)

In [None]:
# Using previous train.py

In [None]:
sklearn.fit({"train": preprocessed_training_data_custom})
training_job_description = sklearn.jobs[-1].describe()
model_data_s3_uri = "{}{}/{}".format(
    training_job_description["OutputDataConfig"]["S3OutputPath"],
    training_job_description["TrainingJobName"],
    "output/model.tar.gz",
)
print('model_data_s3_uri: ', model_data_s3_uri)

## Model Evaluation

In [2]:
# Using previous Evaluation.py

In [None]:
import json
from sagemaker.s3 import S3Downloader

sklearn_processor.run(
    code="evaluation.py",
    inputs=[
        ProcessingInput(source=model_data_s3_uri, destination="/opt/ml/processing/model"),
        ProcessingInput(source=preprocessed_test_data_custom, destination="/opt/ml/processing/test"),
    ],
    outputs=[ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
)
evaluation_job_description = sklearn_processor.jobs[-1].describe()

In [None]:
evaluation_output_config = evaluation_job_description["ProcessingOutputConfig"]
for output in evaluation_output_config["Outputs"]:
    if output["OutputName"] == "evaluation":
        evaluation_s3_uri = output["S3Output"]["S3Uri"] + "/evaluation.json"
        break

evaluation_output = S3Downloader.read_file(evaluation_s3_uri)
evaluation_output_dict = json.loads(evaluation_output)
print(json.dumps(evaluation_output_dict, sort_keys=True, indent=4))

## Batch Inference using Batch Transform

In [None]:
import sagemaker as sage
from time import gmtime, strftime

sagemaker_session = sage.Session()

transform_output_folder = "batch-transform-output"
#output_path = "s3://{}/{}".format(sagemaker_session.default_bucket(), transform_output_folder)
output_path = "s3://{snowflake-stage-area-bucket}/batch-transform-output"
print("output_path: ", output_path)
transformer = sklearn.transformer(
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=output_path,
    assemble_with="Line",
    accept="text/csv",
)

In [None]:
data_location = "{}/{}".format(preprocessed_test_data_custom, "test_features.csv") 
print("data_location: ", data_location)
df = pd.read_csv(data_location, nrows=5)
df.head(n=5)

In [None]:
transformer.transform(
    data_location, content_type="text/csv", split_type="Line"#, input_filter="$[1:]"
)
transformer.wait()

In [None]:
import pandas as pd

inference_output_data = "{}/{}".format(output_path, "test_features.csv.out")
df = pd.read_csv(inference_output_data, nrows=10)
df.head(n=10)

## Inference Output Write to Snowflake

In [4]:
%%writefile write_output.py

import argparse
import os
import warnings
import datetime
import pandas as pd
import numpy as np
import io
import joblib

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelBinarizer, KBinsDiscretizer
from sklearn.preprocessing import PolynomialFeatures
from sklearn.compose import make_column_transformer
from sklearn.exceptions import DataConversionWarning
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer

from snowflake.snowpark import (
    Column,
    DataFrame,
    Session,
    Window
)
from snowflake.snowpark import functions as f
from snowflake.snowpark.types import IntegerType, StringType, StructType, DateType, StructField, MapType
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, when, to_date


warnings.filterwarnings(action="ignore", category=DataConversionWarning)



CONNECTION_PARAMETERS = {
"account": "",
"user": "",
"password": "",
"role": "",
"warehouse": "",
"database": "",
"schema": "",
}
    
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    #parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()

    print("Received arguments {}".format(args))
    
   
    snowflake_conn_session = Session.builder.configs(CONNECTION_PARAMETERS).create()
    
    snowflake_conn_session.sql("select current_warehouse(), current_database(), current_schema(), current_role()").show()
    
    table_query = """create table inference_output (
    predictions varchar
    )"""
    
    snowflake_conn_session.sql(table_query).show()
    
    copy_data = """copy into inference_output
    from 's3://{snowflake-stage-area-bucket}/batch-transform-output/test_features.csv.out'
    credentials = (aws_key_id = '' aws_secret_key = '')
    file_format = (format_name='public.csv_file_format');"""
    
    snowflake_conn_session.sql(copy_data).show()

Overwriting write_output.py


In [None]:
script_processor.run(
    code="write_output.py",
    inputs=[ProcessingInput(source="s3://{snowflake-stage-area-bucket}/batch-transform-output/test_features.csv.out", destination="/opt/ml/processing/input")],
)