#### For better formatting and visualization, please check out the project/ notebook at:<br>
[Databricks notebook link](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6227236730275197/3490305292750161/5214049462728440/latest.html)

### Inference pipeline deployment (to AWS SageMaker)

- General description:
  Partially following examples from Databricks documentations [notebook1](https://docs.databricks.com/applications/mlflow/tracking-examples.html#train-a-pyspark-model-and-save-in-mleap-format) and [notebook2](https://docs.databricks.com/applications/mlflow/mleap-model-deployment-on-sagemaker.html), and AWS SageMaker documentations [notebook3](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/advanced_functionality/inference_pipeline_sparkml_blazingtext_dbpedia/inference_pipeline_sparkml_blazingtext_dbpedia.ipynb) and [notebook4](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/advanced_functionality/inference_pipeline_sparkml_xgboost_car_evaluation/inference_pipeline_sparkml_xgboost_car_evaluation.ipynb), the goal of this project is to demonstrate the main steps to deploy a trained inference pipeline to AWS SageMaker:
  - process features and train models(in particular a text classification model) using `Spark`
  - log / save trained model and construct inference pipeline (pre-processing, prediction, post-processing) in `mleap`- compatible format using `mlflow`
  - deploy to AWS SageMaker for real-time prediction requests.
  
- Data description:
  - The modeling trainning section uses the [20 Newsgroups dataset](http://kdd.ics.uci.edu/databases/20newsgroups/20newsgroups.html) which consists of articles from 20 Usenet newsgroups, for details see the link above. Simply put, there are 20000 messages in total, taken from 20 newsgroups, which can be interpreted as 'topics', or 'labels' (such as `rec.autos`, `sci.electronics`, `talk.politics.guns`, etc.), each observation consists of two columns: `text` and `topic`.
  - Note: If using Databricks, this dataset is **pre-loaded** into Databricks in parquet format under path `file:/dbfs/databricks-datasets/news20.binary/data-001/training`; otherwise, the data is accessible at [this link](http://kdd.ics.uci.edu/databases/20newsgroups/20_newsgroups.tar.gz) (17.3M; 61.6M uncompressed)

- Project outline:
  - PART 0: set up environment (Databricks, AWS, Docker, etc.)
  - PART I: Feature engineering, model training and logging
  - PART II: Deploy to AWS SageMaker (from scratch using `boto3` and concise implementation using `mlflow`)
      
- Framework / Language/ Libraries/ Others:
  - Spark APIS (datafram based ML API `spark.ml`) 
  - Python
  - PySpark, mleap, mlflow, boto3, sagemaker
  - Basic Linux knowledge and shell scripts

#### PS: all codes tested on runtime version: 6.4ML (includes Apache Spark 2.4.5, Scala 2.11)

#### For better formatting and visualization, please check out the project/ notebook at:<br>
[Databricks notebook link](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/6227236730275197/3490305292750161/5214049462728440/latest.html)

### PART 0: Set up environment (Databricks, AWS, Docker, etc.)

#### Databricks
- Launch a Python3 cluster with Databricks Runtime 6.4 (DBR64) with:
   - instance type: m4.large or any other instance type, igonre for Databricks community version
   - ~~spark config: `spark.databricks.mlflow.trackMLlib.enabled true` # helps to automatically track parameters when using `pyspark.ml.tuning.CrossValidator()`~~<br> no longer necessary
   - additional libraries installed on the cluster:
     - PyPI:
       - `boto3==1.9.215`
       - `mleap==0.8.1` As or Mar. 2020, version==0.15.0
       - `mlflow[extra]` see [MLflow doc](https://www.mlflow.org/docs/latest/tutorials-and-examples/tutorial.html#what-you-ll-need)
       - `sagemaker==1.42.4`
     - Maven:
       - `ml.combust.mleap:mleap-spark_2.11:0.13.0`<br>As of Mar. 2020, '...SageMaker SparkML Serving is powered by MLeap 0.13.0 and it is tested with Spark major version - 2.3' [ref](https://github.com/aws/sagemaker-sparkml-serving-container)
       - `ml.dmlc:xgboost4j-spark:0.90` required, even if no xgboost algorithm is used

#### AWS       
- AWS CLI version 2 installation and configuration, for details see [official doc](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2-mac.html)
- Configure an AWS IAM role with full access to AWS SageMaker, for details see [Databricks Guide](https://docs.databricks.com/administration-guide/cloud-configurations/aws/sagemaker.html)

#### Docker
- After AWS CLI is configured, autherize the ***Docker*** to access AWS ECR, details see [official doc](https://docs.aws.amazon.com/AmazonECR/latest/userguide/Registries.html)<br>
`$ aws ecr get-login-password | docker login --username AWS --password-stdin [aws_account_id].dkr.ecr.[us-west-2].amazonaws.com`

#### Boto3 (optional; but required if using Community Version of Databricks)
before moving on, if the following code is not run on Databricks, or if using a Databricks Community Version, an AWS configuration for boto3 is required, for detals see [Doc](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html#guide-configuration)

- Configure AWS on databricks community edition, or non-databricks environment. Run this only ONCE
details see [how to set environment variable](https://forums.databricks.com/questions/11116/how-to-set-an-environment-variable.html)

>`dbutils.fs.put("dbfs:/databricks/init/init.bash", """ #!/bin/bash`
`sudo echo export AWS_ACCESS_KEY_ID='xxxxxxxxxxxxxxxx' >> /databricks/spark/conf/spark-env.sh`<br>
`sudo echo export AWS_SECRET_ACCESS_KEY='xxxxxxxxxxxxxxx' >> /databricks/spark/conf/spark-env.sh`<br>
`""", True)`

substitute `XXXXXX` with *`AWS_ACCESS_KEY_ID`* and *`AWS_SECRET_ACCESS_KEY`* respectively

###### **Alternatively: set ENV variable by:**
>`os.environ['AWS_ACCESS_KEY_ID'] = xxxxxxxxxxxxxxxx`<br>
`os.environ['AWS_SECRET_ACCESS_KEY'] = xxxxxxxxxxxxxxxx`

###### **Check if set successfully:**
>`%sh less /databricks/spark/conf/spark-env.sh`

#### Import necessary libraries

In [5]:
# feature processing and modeling
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, IndexToString, Tokenizer, HashingTF
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# model logging
import mlflow         # 1.6.0
import mlflow.mleap
import mlflow.spark
import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

# model deployment
import boto3          # 1.9.162
import time
import sagemaker      # 1.50.17
import mlflow.sagemaker as mfs
import json

# os related
import os
import shutil

### PART I: Feature engineering, model training and logging

#### load training data

In [8]:
# ============================================================
# Load training data
df = spark.read.parquet("/databricks-datasets/news20.binary/data-001/training").select("text", "topic")
train, test_with_label = df.randomSplit([0.8, 0.2], 42)
train.cache().count()
test_with_label.cache().count()

# ============================================================
# to get a sense of what the data look like:
# sample = train.head(1)[0]
# print(f'text: \t{sample[0]} \n\ntopic: \t{sample[1]}')

# if in Databricks, simply do 
display(train.head(1))

text,topic
"cs.utexas.edu!geraldo.cc.utexas.edu!portal.austin.ibm.com!awdprime.austin.ibm.com!karner Subject: Re: Islamic marriage? From: karner@austin.ibm.com (F. Karner) <1993Apr2.103237.4627@Cadence.COM> Organization: IBM Advanced Workstation Division Originator: frank@karner.austin.ibm.com Lines: 50 In article <1993Apr2.103237.4627@Cadence.COM>, mas@Cadence.COM (Masud Khan) writes: > In article karner@austin.ibm.com (F. Karner) writes: > > > >Okay. So you want me to name names? There are obviously no official > >records of these pseudo-marriages because they are performed for > >convenience. What happens typically is that the woman is willing to move > >in with her lover without any scruples or legal contracts to speak of. > >The man is merely utilizing a loophole by entering into a temporary > >religious ""marriage"" contract in order to have sex. Nobody complains, > >nobody cares, nobody needs to know. > > > >Perhaps you should alert your imam. It could be that this practice is > >far more widespread than you may think. Or maybe it takes 4 muslim men > >to witness the penetration to decide if the practice exists! > >-- > > > > Again you astound me with the level of ignorance you display, Muslims > are NOT allowed to enter temporary marriages, got that? There is > no evidence for it it an outlawed practise so get your facts > straight buddy. Give me references for it or just tell everyone you > were lying. It is not a widespread as you may think (fantasise) in > fact contrary to your fantasies it is not practised at all amongst > Muslims. First of all, I'm not your buddy! Second, read what I wrote. I'm not talking about what muslims are ALLOWED to do, merely what *SOME* practice. They consider themselves as muslim as you, so don't retort with the old and tired ""they MUST NOT BE TRUE MUSLIMS"" bullshit. If I gave you the names what will you do with this information? Is a fatwa going to be leashed out against the perpetrators? Do you honestly think that someone who did it would voluntarily come forward and confess? With the kind of extremism shown by your co-religionaries? Fat chance. At any rate, there can be no conclusive ""proof"" by the very nature of the act. Perhaps people that indulge in this practice agree with you in theory, but hope that Allah will forgive them in the end. I think it's rather arrogant of you to pretend to speak for all muslims in this regard. Also, kind of silly. Are you insinuating that because the Koranic law forbids it, there are no criminals in muslim countries? This is as far as I care to go on this subject. The weakness of your arguments are for all netters to see. Over and out... -- DISCLAIMER: The opinions expressed in this posting are mine solely and do not represent my employer in any way. F. A. Karner AIX Technical Support | karner@austin.vnet.ibm.com",alt.atheism


#### Pre-processing and post-processing

##### Important note: 
It is crutial to ensure the **consistency** in data processing between the feature-processing + model-training phase and model-prediction phase; in particular, if the target variable needs to be pre-processed (e.g., apply `spark.ml.feature.labelIndexer` to non-numeric target variable in *multi-classification*), then this process should be ***left out of the pipeline***. Otherwise, when querying the endpoint, a 'dummy' column needs to be added to the payload.

An [AWS blog](https://aws.amazon.com/blogs/machine-learning/ensure-consistency-in-data-processing-code-between-training-and-inference-in-amazon-sagemaker/) also mentioned a similar situation.

##### Target variable and post-processing pipeline
Note I applied `StringIndexer` on the target variable and I would like to 'reverse' this process in the post-processing by `IndexToString`. However since the 'reverse' happens to the predicted column as opposed to the original indexed column (as is the case in feature engineering), the `IndexToString` cannot infer the label mapping from column metadata. Therefore if I do not specify the label-index mapping, an error will raise: <br>`"Java.lang.ClassCastException: org.apache.spark.ml.attribute.UnresolvedAttribute$ cannot be cast to org.apache.spark.ml.attribute.NominalAttribute"`.

To fix this, notice in `StringIndexer` the default stringOrderType is `'frequencyDesc'` (among `'frequencyAsc'`, `'alphabetDesc'`, `'alphabetAsc'`), I can then extract the label mapping either from fitted model, or according to `stringOrderType`, and pass it into `IndexToString`

In [12]:
# ============================================================
# process the target variable 
labelIndexer = StringIndexer(inputCol="topic", outputCol="label", handleInvalid="keep")              # stringOrderType=="frequencyDesc"
indexed = labelIndexer.fit(train)
train = indexed.transform(train)

# ============================================================
# Explicitly pass in label mapping to IndexToString
# 1) OPT 1, extract lable-index mapping from fitted model
label_mapping = indexed.labels

# 2) OPT 2, assuming stringOrderType=="frequencyDesc":
# label_mapping = [row.topic for row in train.select('topic').groupby('topic').count().sort('count', ascending=False).collect()]

# ============================================================

label_mapping += ['other']         # deal with unseen labels in the test data, in case we specify handleInvalid=='keep' in StringIndexer
postProcessor = IndexToString(inputCol="prediction", outputCol="orignal_topic", labels = label_mapping)  

# ============================================================
# Define preprocessing pipeline, construct a Pipeline object using the defined components, note to exclude target variable transformer

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="features")
dt = DecisionTreeClassifier()

pipeline = Pipeline(stages=[tokenizer, hashingTF, dt])

# ============================================================
# Train and Fine-tune the model, for simplicity purposes, only tune the 'numFeatures' parameter in HashingTF() vectorizer
paramGrid = (ParamGridBuilder()
             .addGrid(hashingTF.numFeatures, [50])
             .build()
            )
cv = CrossValidator(estimator=pipeline, 
                    evaluator=MulticlassClassificationEvaluator(), 
                    estimatorParamMaps=paramGrid
                   )

#### model training (including Cross-Validation) and logging

In [14]:
# ============================================================
# use Mlflow to log the model with optimal parameters into Mleap-compatible format

with mlflow.start_run() as run:
  cvmodel = cv.fit(train)
  
  # extract the optimal Pipeline model to be logged
  fitted_model = cvmodel.bestModel
  artifact_path = "model_dbr64"
  mlflow.mleap.log_model(spark_model=fitted_model
                         , sample_input=test_with_label.drop('topic')
                         , artifact_path=artifact_path
                        ) 
  # extract experiment_id and model_id
  # for other attributes, see: https://www.mlflow.org/docs/latest/python_api/mlflow.entities.html#mlflow.entities.RunInfo
  experiment_id = run.info.experiment_id    # recorde the experiment_id for later deployment
  run_id = run.info.run_uuid                # recorde the run_id for later deployment

print(f'experiment_id is {experiment_id}')
print(f'run_id is {run_id}')

### PART II: Deploy to AWS SageMaker

##### Test locally (local machine, or AWS EC2)

- setup proper AWS authorization and install Databricks CLI
  - pip install mlflow==1.6.0 locally 
  - install docker locally and start docker service
  - locally build a docker image and push it to AWS ECR:
    - `$ mlflow sagemaker build-and-push-container`
  - copy the logged model to a local directory:
    - use Databricks CLI: `$ databricks fs cp -r </databricks/path/to/logged/test_model/dbr63> </local/path/to/test_model_dbr64/>`
    - if using community version, CLI will not be avaible, instead:
      - `%sh`<br>
        `tar -zcvf ../../dbfs/FileStore/d.tar.gz ../../dbfs/databricks/mlflow/[path]/[to]/artifacts/model_dbr64/`<br>
        for path to model artifects, click 'Runs' at the top right corner and inspect the saved runs
      - the file can then be accessed by web browser at: `https://community.cloud.databricks.com/files/d.tar.gz?o=xxxxxxxxxx` where xxxxxxxxxx is the number after `?o=` in your databricks URL, for details, see [official doc](https://docs.databricks.com/data/filestore.html)
  - run local test:
    - `$ mlflow sagemaker run-local -m </local/path/to/test_model_dbr64/>`

##### Deploy remotely (to SageMaker)

***Deploy***: from here there are two ways to deploy the model, 
- 1) use `boto3` API to deploy model from scratch (create model, endpoint configuration, and endpoint in sagemaker); 
- 2) use `mlflow.sagemaker` for easier deployment. (for `mlflow.sagemaker.deploy()` soure code, see [here](https://github.com/mlflow/mlflow/blob/58be9c01b587344b953965492e4d6f8aa5476482/mlflow/sagemaker/__init__.py#L151))

The mainly difference is that with `boto3` we can have fine control over the deployment. For instance, if we have post-processing pipeline in addition to pre-processing pipeline and model, we can create a 'model' in sagemaker via `boto3` that integrate all three pieces together. Otherwise if we only have pre-processing and model, it is much easier to just take advantage of mlflow's sagemaker since it does all the dirty work under the hood.

In [20]:
experiment_id = '3490305292750161'
run_id = 'f327eb341ebb45f9ade72f8bb7b92232'
artifact_path = "model_dbr64"
bucket_name = 'databricks-mlflow-sagemaker'
job_prefix = 'mlflow-deploy'

In [21]:
# ============================================================
# define and extract info required by deployment

region = "us-west-2" # region of AWS account

boto_session = boto3.Session(region_name=region)
sess = sagemaker.Session(boto_session=boto_session)
                         
sagemaker_session = sess.boto_session.client('sagemaker')
aws_id = boto3.client('sts', region_name=region).get_caller_identity()['Account'] 
# account id of the AWS account associated with Databricks account OR AWS authentication(see setup in PART0); otherwise, substitute with AWS account where the model will be deployed under

arn = "arn:aws:iam::" + aws_id + ":role/sagemaker_full_access"   # change 'sagemaker_full_access' according to your own AWS IAM role set-up

model_uri = "/dbfs/databricks/mlflow/" + experiment_id + "/" + run_id +"/artifacts/" + artifact_path # this is databricks specific, use local path if needed
image_url = aws_id + ".dkr.ecr." + region + ".amazonaws.com/mlflow-pyfunc:1.6.0" 
# change the image tag (':1.6.0' part) according to the mlflow version used

os.environ["SAGEMAKER_DEPLOY_IMG_URL"] = image_url

###### Deploy from scratch using `boto3`

In [23]:
# ============================================================
# Save model(inlcuding pre-processor)
# model artifactes logged by mlflow:
mlflow_model_path = f'/dbfs/databricks/mlflow/{experiment_id}/{run_id}/artifacts/model_dbr64/'

import tarfile
import zipfile
# with mlflow logging the model while training, I can directly create the .tar.gz file with the logged model artifects
with tarfile.open("/databricks/driver/model.tar.gz", "w:gz") as tar:
    tar.add(f'{mlflow_model_path}mleap/model/bundle.json', arcname='bundle.json')
    tar.add(f'{mlflow_model_path}mleap/model/root', arcname='root')

# ============================================================
# Save post-processor
# for post-processor, a SparkML transformer, I need to serialize and repacked it 
SimpleSparkSerializer().serializeToBundle(postProcessor, "jar:file:/databricks/driver/postprocess.zip", postProcessor.transform(fitted_model.transform(train)))

with zipfile.ZipFile("/databricks/driver/postprocess.zip") as zf:
    zf.extractall("/databricks/driver/postprocess")

# Writing back the content as a .tar.gz file, since SageMaker only accepts .tar.gz file
with tarfile.open("/databricks/driver/postprocess.tar.gz", "w:gz") as tar:
    tar.add("/databricks/driver/postprocess/bundle.json", arcname='bundle.json')
    tar.add("/databricks/driver/postprocess/root", arcname='root')

# ============================================================
# upload model and post-processor to s3 
s3 = boto3.resource('s3')
bucket_name = 'databricks-mlflow-sagemaker'
job_prefix = 'mlflow-deploy'

file_name_1 = job_prefix + '/' + 'model.tar.gz'
s3.Bucket(bucket_name).upload_file('/databricks/driver/model.tar.gz', file_name_1)
file_name_2 = job_prefix + '/' + 'postprocess.tar.gz'
s3.Bucket(bucket_name).upload_file('/databricks/driver/postprocess.tar.gz', file_name_2)

# ============================================================
# clean up, 
# OR, `%sh ls /databricks/driver/` this folder will be purged every time the cluster is restarted, and thus could be used as a tmp folder, no need for clean-up

# os.remove('/databricks/driver/model.zip')
os.remove('/databricks/driver/postprocess.zip')
os.remove('/databricks/driver/model.tar.gz')
os.remove('/databricks/driver/postprocess.tar.gz')
# shutil.rmtree('/databricks/driver/model')
shutil.rmtree('/databricks/driver/postprocess')

In [24]:
# ============================================================
# define input and output schema for both model (including pre-processing) and post-processing,
# for detailed format for the schema, see https://github.com/aws/sagemaker-sparkml-serving-container

import json
model_schema = {"input":[{"type":"string","name":"text"}]
                ,"output":{"type":"double","name":"prediction"}}
model_schema_json = json.dumps(model_schema)

post_processor_schema = {"input": [{"type": "double", "name": "prediction"}]
                         , "output": {"type": "string", "name": "orignal_topic"}}
post_processor_schema_json = json.dumps(post_processor_schema)

In [25]:
from botocore.exceptions import ClientError
sgm = boto3.client('sagemaker',region_name='us-west-2')

timestamp_prefix = time.strftime("%Y%m%d-%H%M", time.gmtime())
app_name = "news20-boto3-"+timestamp_prefix

# all SparkML serving container image locations are published at: https://github.com/aws/sagemaker-sparkml-serving-container
sparkml_images = {
    'us-west-1': '746614075791.dkr.ecr.us-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-west-2': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-east-1': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-east-2': '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-northeast-1': '354813040037.dkr.ecr.ap-northeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-northeast-2': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-southeast-1': '121021644041.dkr.ecr.ap-southeast-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-southeast-2': '783357654285.dkr.ecr.ap-southeast-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ap-south-1': '720646828776.dkr.ecr.ap-south-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-west-1': '141502667606.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-west-2': '764974769150.dkr.ecr.eu-west-2.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'eu-central-1': '492215442770.dkr.ecr.eu-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'ca-central-1': '341280168497.dkr.ecr.ca-central-1.amazonaws.com/sagemaker-sparkml-serving:2.2',
    'us-gov-west-1': '414596584902.dkr.ecr.us-gov-west-1.amazonaws.com/sagemaker-sparkml-serving:2.2'
}

# ============================================================
# Create SageMaker models (separate models in separate containers)
try:
    sparkml_image = sparkml_images['us-west-2']

    response = sgm.create_model(
        ModelName=app_name,
        Containers=[
            {
                'Image': sparkml_image
                , 'ModelDataUrl': f's3://{bucket_name}/{job_prefix}/model.tar.gz'
                , 'Environment': {
                    'SAGEMAKER_SPARKML_SCHEMA': model_schema_json
                }
            },
            {
                'Image': sparkml_image
                , 'ModelDataUrl': f's3://{bucket_name}/{job_prefix}/postprocess.tar.gz'
                , 'Environment': {
                    'SAGEMAKER_SPARKML_SCHEMA': post_processor_schema_json
                }

            }
        ],
        ExecutionRoleArn=arn
    )

    print(f'{response}\n')
    
except ClientError:
    print('Model already exists, continuing...')

# ============================================================
# Create SageMaker endpoint configuration
try:
    response = sgm.create_endpoint_config(
        EndpointConfigName=app_name,
        ProductionVariants=[
            {
                'VariantName': 'DefaultVariant',
                'ModelName': 'news20-boto3',
                'InitialInstanceCount': 1,
                'InstanceType': 'ml.m5.large',
            },
        ],
    )
    print(f'{response}\n')

except ClientError:
    print('Endpoint config already exists, continuing...')

# ============================================================
# Create SageMaker endpoint
try:
    response = sgm.create_endpoint(
        EndpointName=app_name,
        EndpointConfigName='news20-boto3',
    )
#     # if there is need to update model or configuration or anything:
#     # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.update_endpoint
#     response = sgm.update_endpoint(
#         EndpointName='test-mlflow',
#         EndpointConfigName='news20-csv',
#         RetainAllVariantProperties=True,
#     )
    print(f'{response}\n')

except ClientError:
    print("Endpoint already exists, continuing...")
    
# ============================================================
# Monitor the status until completed
while True:
    endpoint_status = sgm.describe_endpoint(EndpointName=app_name)['EndpointStatus']
    print(endpoint_status)
    if endpoint_status in ('OutOfService','InService','Failed'):
        break
    time.sleep(60)

###### Concise deployment using `mlflow`

In [27]:
# mlflow integrated sagemaker deployment, API: https://www.mlflow.org/docs/latest/python_api/mlflow.sagemaker.html#mlflow.sagemaker.deploy
# this only applies to the case where there is only one model, for model composition and pre/post-processing,  check future work in the last part

timestamp_prefix = time.strftime("%Y%m%d-%H%M", time.gmtime())
app_name = "news20-mlflow-"+timestamp_prefix

mfs.deploy(app_name=app_name
           , model_uri=model_uri
           , region_name=region
           , mode="create"
           , flavor='mleap'
           , execution_role_arn=arn
           , image_url=image_url
           , instance_type='ml.m5.large'
          )
while True:
    endpoint_status = sgm.describe_endpoint(EndpointName=app_name)['EndpointStatus']
    print(endpoint_status)
    if endpoint_status in ('OutOfService','InService','Failed'):
        break
    time.sleep(60)

In [28]:
# ============================================================
# check the Endpoint status

def check_status(app_name):
  sage_client = boto3.client('sagemaker', region_name=region)
  endpoint_description = sage_client.describe_endpoint(EndpointName=app_name)
  endpoint_status = endpoint_description["EndpointStatus"]
  return endpoint_status

print(f"Application status is: {check_status(app_name)}")

##### Query the model

In [30]:
print(json.dumps(input_json_records))

In [31]:
# ============================================================
# prepare the querying data (aka the 'payload')

# Enable Arrow-based columnar data transfers for faster [Spark DF --> Pandas DF] transformation
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")

# Note that if using mlflow, the deployed MLeap models only process JSON-serialized Pandas dataframes with the split orientation:
# if using boto3, the deployed models only accepts 'text/csv' ContentType of request

query_df = test_with_label.drop('topic').sample(False, 0.002, 41).limit(1).toPandas()
input_json_records = query_df.to_json(orient='records')
input_json_split = query_df.to_json(orient='split')
input_csv = query_df.to_csv()
input_csv

In [32]:
# ============================================================
# query the model for boto3
def query_endpoint_boto3(app_name, input):
  client = boto3.session.Session().client("sagemaker-runtime", region)
  
  response = client.invoke_endpoint(
      EndpointName=app_name
      , Body=input
      , ContentType='text/csv'
    # or pass in schema together with data by specifying: 
    # Body='{"schema":{"input": [{"name": "text", "type": "string"}], "output": {"type": "double", "name": "prediction"}}, "data":['payload'}',
    # ContentType='application/json'
  )
    
print('Our result for this payload is: {}'.format(response['Body'].read().decode('ascii')))
  preds = response['Body'].read().decode("ascii")
  print(f"Received response: {preds}")
  return preds

print("Sending batch prediction request with input csv format...")
# ============================================================
# Evaluate the input by posting it to the deployed model
prediction = query_endpoint_boto3(app_name=app_name, input=input_csv)


In [33]:
# ============================================================
# query the model for mlflow
def query_endpoint_mlflow(app_name, input):
  client = boto3.session.Session().client("sagemaker-runtime", region)
  
  response = client.invoke_endpoint(
      EndpointName=app_name
      , Body=input
      , ContentType='application/json'
  )
  preds = response['Body'].read().decode("ascii")
  print(f"Received response: {preds}")
  return preds

print("Sending batch prediction request with input dataframe json...")

# ============================================================
# Evaluate the input by posting it to the deployed model

prediction = query_endpoint_mlflow(app_name=app_name, input=input_json_split)

##### clean up

In [35]:
# Environment cleanup

# for boto3:
def clean_up(app_name):
  client = boto3.session.Session().client("sagemaker", region)
  print('Deleting SageMaker endpoint...')
  print(client.delete_endpoint(EndpointName=app_name
                                 )
       )

  print('Deleting SageMaker endpoint config...')
  print(client.delete_endpoint_config(EndpointConfigName=app_name
                                        )
       )

  print('Deleting SageMaker model...')
  print(client.delete_model(ModelName=app_name
                              )
       )
clean_up(app_name)

# # for mlflow
# def clean_up(app_name):
#   mfs.delete(app_name=app_name, 
#              region_name=region, 
#              archive=False # this will clean associated Models, Endpoints and Endpoint Configs
#             )
# clean_up(app_name)

# to check if the clean_up is successful:
def get_active_endpoints(app_name):
  sage_client = boto3.client('sagemaker', region_name=region)
  app_endpoints = sage_client.list_endpoints(NameContains=app_name)["Endpoints"]
  return list(filter(lambda en : en == app_name, [str(endpoint["EndpointName"]) for endpoint in app_endpoints]))

time.sleep(5)
print("The following endpoints exist for the `{an}` application: {eps}".format(an=app_name, eps=get_active_endpoints(app_name)))

#### Future work / TODO list:
- sustitute the Spark Model with AWS SageMaker built-in model ( add pre-processing and training algorithms to separate containers)
>The MLflow Docker container for serving on SageMaker runs a Flask server internally that accepts JSON-formatted or CSV-formatted Pandas DataFrames. That's how the current content types and serving input schemas are enforced. Regarding model composition and pre/postprocessing, you can use custom pyfunc APIs to package multiple models together along with custom logic: https://mlflow.org/docs/latest/models.html#custom-python-models
- deploy pipeline for batch inference