In [None]:
%load_ext autoreload
%autoreload 2

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

import ads
import os 
from ads.common.model_export_util import prepare_generic_model

# create a spark session: 
spark = SparkSession \
    .builder \
    .appName('Titanic') \
    .getOrCreate()

# Read a csv from object storage

convert to spark dataframe

In [None]:
df = spark.read.format("csv").option('header', 'true').load("oci://hosted-ds-datasets@bigdatadatasciencelarge/titanic/titanic.csv")

In [None]:
df.show(5)

In [None]:
# Column types: 
df.dtypes

## Data preparation and feature engineering

In [None]:
# Typecast columns. Survived is the target variable: 
dataset = df.select(col('Survived').cast('float'),
                         col('Pclass').cast('float'),
                         col('Sex'),
                         col('Age').cast('float'),
                         col('Fare').cast('float')
                        )

dataset.show()

In [None]:
# Index categorical columns with StringIndexer

dataset = StringIndexer(inputCol='Sex', 
                        outputCol='Gender', 
                        handleInvalid='keep').fit(dataset).transform(dataset)

# drop columns 
dataset = dataset.drop('Sex')
dataset = dataset.drop('Embarked')

dataset.show()

# Build a simple Pipeline Object

In [None]:
# Input features: 
required_features = ['Pclass',
                    'Age',
                    'Fare',
                    'Gender']

# 80/20 data split 
(training_data, test_data) = dataset.randomSplit([0.8,0.2])

assembler = VectorAssembler(inputCols=required_features, outputCol='features')
rf = RandomForestClassifier(labelCol='Survived', 
                            featuresCol='features',
                            maxDepth=5)

pipeline = Pipeline(stages=[assembler, rf])

In [None]:
pipeline_model = pipeline.fit(training_data)

In [None]:
results = pipeline_model.transform(dataset)
results.limit(10).show()

# Saving the Model Object to disk

In [None]:
artifact_dir = "./model-artifact"

if not os.path.exists(artifact_dir):
    os.makedirs(artifact_dir)
pipeline_model.write().overwrite().save(f"{artifact_dir}/my-model")
#pipeline_model.write().overwrite().save("oci://<my-bucket>@<my-namespace/my-model")

# Preparing a model artifact with ADS

In [None]:
ads_artifact = prepare_generic_model(artifact_dir, 
                                     force_overwrite=True, 
                                     function_artifacts=False, 
                                     data_science_env=True)

## Overwriting score.py to support spark models 

Added a bunch of print statements for logging purposes. 

In [None]:
%%writefile {artifact_dir}/score.py

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql.functions import col


sc = SparkSession \
    .builder \
    .appName('Titanic2') \
    .getOrCreate()

"""
   Inference script. This script is used for prediction by scoring server when schema is known.
"""


def load_model():
    """
    Loads model from the serialized format

    Returns
    -------
    model:  a model instance on which predict API can be invoked
    """
    print('loading model')
    # use this path for model deployment: 
    pm2 = PipelineModel.load('/home/datascience/model-server/app/deployed_model/my-model')
    # use this path for testing locally: 
    #pm2 = PipelineModel.load('/home/datascience/sparkml/model-artifact/my-model/')
 
    print('done reading model')

    print(pm2.__class__)
    return pm2


def predict(data, model=load_model()) -> dict:
    """
    Returns prediction given the model and data to predict

    Parameters
    ----------
    model: Model instance returned by load_model API
    data: Data format as expected by the predict API of the core estimator. For eg. in case of sckit models it could be numpy array/List of list/Panda DataFrame

    Returns
    -------
    predictions: Output from scoring server
        Format: { 'prediction': output from `model.predict` method }

    """
    print('before reading data')
    tmp = sc.read.json(sc.sparkContext.parallelize([data]))
    print(tmp.show())
    print(tmp.printSchema())
    print('after reading data')
    tmp = tmp.select(col('Pclass').cast('float'),
                      col('Gender').cast('double'),
                      col('Age').cast('float'),
                      col('Fare').cast('float')
                     )
    print('after selecting subset of columns and typecasting')   
    results = model.transform(tmp)
    print('after prediction') 
    print(results.show())
    res = results.toPandas()['prediction']
    print("prediction = ", res)
    return { 'prediction': str(list(res)) }

# Testing the model locally 

In [None]:
import sys 
sys.path.insert(0, artifact_dir)

from score import load_model, predict

In [None]:
_ = load_model()

In [None]:
# Create a sample payload as a json string
test = test_data.toPandas()
testjson = test[:1].to_json(orient='records',lines=True)

In [None]:
testjson

In [None]:
res = predict(testjson, _)

In [None]:
# Saving the artifact to the Catalog 

In [None]:
ads_artifact.save(display_name="my-spark-model", 
                  description="pipeline object",
                  project_id=os.environ['PROJECT_OCID'],
                  compartment_id=os.environ['NB_SESSION_COMPARTMENT_OCID'])

# Deploy the model through the console 

# Invoke the model 

In [None]:
import requests
import oci
from oci.signer import Signer

# User principal auth: 
#config = oci.config.from_file("~/.oci/config") # replace with the location of your oci config file
#auth = Signer(
#  tenancy=config['tenancy'],
#  user=config['user'],
#  fingerprint=config['fingerprint'],
#  private_key_file_location=config['key_file'],
#  pass_phrase=config['pass_phrase'])

# Resource principal auth:
auth = oci.auth.signers.get_resource_principals_signer()

# replace with your own endpoint: 
endpoint = ''

requests.post(endpoint, json=testjson, auth=auth).json()

In [None]:
%%time 
requests.post(endpoint, json=testjson, auth=auth).json()