# Build a Scikit Learn Sentiment Analysis Model Using MLFlow and Datasets Labeled by Amazon SageMaker Ground Truth

## What is Amazon SageMaker Ground Truth?

Amazon SageMaker Ground Truth is a fully managed data labeling service that makes it easy to build highly accurate training datasets for machine learning. Through the SageMaker Ground Truth console, you can create custom or built-in data labeling workflows in minutes. These workflows support a variety of use cases including 3D point clouds, video, images, and text. In addition, Ground Truth offers automatic data labeling which uses a machine learning model to label your data.

In this content, we will use [Amazon Customer Reviews Dataset](https://s3.amazonaws.com/amazon-reviews-pds/readme.html) to classify whether the review text's sentiment is positive, negative or neutral. The demo of how to create Grouth Truth labeling job should precede this notebook.

After the labeling is done, the resulted dataset will be saved in a specified S3 bucket. This botebook will pick up the dataset from there and continue building the model using Sckit Learn and MLFlow.

## Sentiment Analysis on Amazon Review Dataset

First, we will download the dataset from S3. The dataset is in .manifest extention but it is saved in JSON format. After the labeling job completeed, SageMaker Ground Truth put the output in manifest file. The fisrt cell reads the file and load it to pyspark DataFrame.

In [0]:
df=spark.read.json("s3://XXXX-XXX-XXX/tko/output.manifest") #reivew-output.txt")
display(df)

Then, we are selecting 2 columns, the review text (called "source") and label (called "amazon-review-camera") to be used for model training. The mapping for the sentiment is 0 for positive, 1 for negative, and 2 for neutral.

In [0]:
# Extract only the review test and sentiment
# positive=0, nagative=1, neutral=2
df2 =df.select("source", "amazon-review-camera-10000") #amazon-review-camera-metadata.class-name
display(df2)

The pyspark DataFrame will be converted to panda's DataFrame because that is the format reuiqred for data tranformation as well as for the training.

In [0]:
import pandas as pd

train_set_pd = df2.toPandas()

display(train_set_pd)

The next cell transforms the review text into bigram vector. We will further transform the bigram vector using TF-IDF to prepare the training dataset. Scikit Learn has TfidfTransformer class to use out of box (no need to implement on your own!). We will compare the validation results of both datasets: bigram with and without TF-IDF transformation.

In [0]:
from os import system, listdir
from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
from joblib import dump, load # used for saving and loading sklearn objects
from scipy.sparse import save_npz, load_npz # used for saving and loading sparse matrices

system("mkdir 'data_preprocessors'")
system("mkdir 'vectorized_data'")

# Train data

# Bigram Counts
bigram_vectorizer = CountVectorizer(ngram_range=(1, 2))
bigram_vectorizer.fit(train_set_pd['source'].values)
dump(bigram_vectorizer, 'data_preprocessors/bigram_vectorizer.joblib')

# bigram_vectorizer = load('data_preprocessors/bigram_vectorizer.joblib')

X_train_bigram = bigram_vectorizer.transform(train_set_pd['source'].values)
save_npz('vectorized_data/X_train_bigram.npz', X_train_bigram)

# X_train_bigram = load_npz('vectorized_data/X_train_bigram.npz')


# Bigram Tf-Idf
bigram_tf_idf_transformer = TfidfTransformer()
bigram_tf_idf_transformer.fit(X_train_bigram)

dump(bigram_tf_idf_transformer, 'data_preprocessors/bigram_tf_idf_transformer.joblib')

# bigram_tf_idf_transformer = load('data_preprocessors/bigram_tf_idf_transformer.joblib')

X_train_bigram_tf_idf = bigram_tf_idf_transformer.transform(X_train_bigram)

save_npz('vectorized_data/X_train_bigram_tf_idf.npz', X_train_bigram_tf_idf)


# Set up MLFlow Experiment

MLFlow experiment will log hyperparameters and metrics, and save model as an artifact. We are using mlflow.sklearn.autolog() that does all of the logging in a single line of code.

In [0]:
import mlflow.sklearn
from mlflow import spark

mlflow.sklearn.autolog()

The following cell defines the function that executes training of a Linear classifiers with Stochastic Gradient Descent (SGD) learning. With SGD, the gradient of the loss is estimated each sample at a time and the model is updated along the way with a decreasing strength schedule. 

In the last 2 lines of code, the function is called by passing 2 different training datasets, **X_train_bigram** and **X_train_bigram_tf_idf** at a time. Compare the training and validation scores after the execution completes.

In [0]:
import mlflow
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import train_test_split
from scipy.sparse import csr_matrix
import numpy as np
from mlflow.models.signature import infer_signature


def train_and_show_scores(X: csr_matrix, y: np.array, title: str):
    X_train, X_valid, y_train, y_valid = train_test_split(
        X, y, train_size=0.75, stratify=y
    )

    clf = SGDClassifier()
    clf.fit(X_train, y_train)
    train_score = clf.score(X_train, y_train)
    valid_score = clf.score(X_valid, y_valid)
    
    print(f'{title}\nTrain score: {round(train_score, 2)} ; Validation score: {round(valid_score, 2)}\n')
    
    return clf

# Extract the label values for y data for training.  
y_train = train_set_pd['amazon-review-camera-10000'].values


# Call the training function for each of the transformed dataset, bigram and bigram with TF-IDF
model1 = train_and_show_scores(X_train_bigram, y_train, 'Bigram Counts')
model2 = train_and_show_scores(X_train_bigram_tf_idf, y_train, 'Bigram Tf-Idf')

In [0]:
# we are chooing model1 which performed better in validation 
registered_model_name = "camera_review_nlp_model"
mlflow.sklearn.log_model(model2, artifact_path='model', registered_model_name=registered_model_name)

In [0]:
client = mlflow.tracking.MlflowClient()

registered_model_name = "camera_review_nlp_model"
model = client.get_registered_model(registered_model_name)
model_stage = 'Production'
model_version = 30

# we'll capture the latest version from the above cell and change the stage to Production in order to deploy as a SageMaker model endpoint
client.transition_model_version_stage(registered_model_name, version=model_version, stage=model_stage)

Once the chosen model is saved as an artifact, it is time to build a container that will host the model on SageMaker. Running the following command will build and push the Docker container to ECR on your AWS account.

`mlflow sagemaker build-and-push-container`

Go to the ECR console and find the image URI. 

Call mlflow.sagemaker.deploy() to deploy the model as a SageMaker endpoint

In [0]:
import mlflow.sagemaker as mfs

model_uri = "models:/" + registered_model_name + "/" + str(model_version) 
image_ecr_uri = "XXXXXX.dkr.ecr.us-east-1.amazonaws.com/mypyfunc11:1.11.0"
app_name = "camera-review"
region = "us-east-1"
"""
Deploy the model from the run version. 
ECR image is created in the previous step. Basically, you will add the model to the image and then deploy it as SageMaker endpoint"""
#mlflow.sagemaker.push_image_to_ecr(image='mlflow-pyfunc')
#mlflow.sagemaker.run_local(model_uri, port=5000, image='mlflow-pyfunc', flavor=None)

""" app_name here is going to be the name of the SageMaker endpoint"""
# Use DEPLOYMENT_MODE_CREATE to create a new endpoint
# mfs.deploy(app_name=app_name, model_uri=model_uri, image_url=image_ecr_url, region_name=region, mode=DEPLOYMENT_MODE_CREATE) 

# Use DEPLOYMENT_MODE_REPLACE to update the existing endpoint
mfs.deploy(app_name=app_name, model_uri=model_uri, image_url=image_ecr_uri, region_name=region, mode=mlflow.sagemaker.DEPLOYMENT_MODE_REPLACE) 

The query_endpoint() function calls invoke_endpoint() API for an inference.

In [0]:
import json
import boto3

# positive=0, nagative=1, neutral=2
def map_sentiment(prediction):
  if prediction == 0 or "0":
      return "positive"
  elif prediction == 1 or "1": 
      return "negative"
  elif prediction == 2 or "2": 
      return "neutral"
    
    
def query_endpoint(app_name, inference_data):
  client = boto3.session.Session().client("sagemaker-runtime", region)
  
  response = client.invoke_endpoint(
      EndpointName=app_name,
      Body=inference_data,
      ContentType='application/json; format=pandas-split'
  )
  preds = response['Body'].read().decode("ascii")
  preds = json.loads(preds)
  
  print("Received prediction: {}".format(map_sentiment(preds)))
  #print("Raw response: {}".format(response))

  return preds


Just for testing purposes, we are using the tranformed data to pass to the endpoint to see the result. 

Following prints the review to be sent for sentiment prediction and its expected result.

In [0]:
""" Display the test data and expected sentiment before sending it to the endpoint """
index = 101

print("Expected sentiment of the test data: " + map_sentiment(train_set_pd.loc[index,'amazon-review-camera-10000']))
print("Text of the test data: " + str(train_set_pd.loc[index,'source']))


Making an inference call and we got the expected inference result.

In [0]:
import io

#X_train_bigram = X_train_bigram_tf_idf
data_df = pd.DataFrame.sparse.from_spmatrix(X_train_bigram_tf_idf[index]).to_json(orient='split')

# Evaluate the input by posting it to the deployed model
# positive=0, nagative=1, neutral=2

prediction1 = query_endpoint(app_name=app_name, inference_data=data_df)
print(prediction1)

Clean up the endpoint after testing.

In [0]:
mfs.delete(app_name=app_name, region_name=region)

reference: https://towardsdatascience.com/building-a-sentiment-classifier-using-scikit-learn-54c8e7c5d2f0