In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.ml import Pipeline

In [None]:
spark = SparkSession.builder.appName('app').getOrCreate()

## Create SparkML Model

In [None]:
data = (spark.read.csv('./data/boston_housing.csv', header=True, inferSchema=True)
        .withColumnRenamed("medv", "medv_gt"))
data.printSchema()
data.count()

In [None]:
train, test = data.randomSplit([0.7, 0.3])

In [None]:
from pyspark.ml.feature import VectorAssembler

feature_columns = data.columns[:-1] # here we omit the final column
assembler = VectorAssembler(inputCols=feature_columns,outputCol="features")

In [None]:
from pyspark.ml.regression import LinearRegression

algo = LinearRegression(featuresCol="features", labelCol="medv_gt", predictionCol="medv_pred",
                        maxIter=10, regParam=0.3, elasticNetParam=0.8)

## Create and Save Model Pipeline

In [None]:
pipeline = Pipeline(stages=[assembler, algo]) 

fitted_pipeline = pipeline.fit(train)

fitted_pipeline.transform(test).select('medv_pred').show(5)

In [None]:
fitted_pipeline.write().overwrite().save('./data/models/boston_housing_spark_model_pipeline')

In [None]:
from pyspark.ml import PipelineModel

loaded_model_pipeline = PipelineModel.load("./data/models/boston_housing_spark_model_pipeline")

predicted_train_data = loaded_model_pipeline.transform(test).drop("features")

predicted_test_data = loaded_model_pipeline.transform(test).drop("features")
predicted_test_data.select('medv_pred').show(5)

## On-board to Arthur

In [None]:
from arthurai import ArthurAI
from arthurai.common.constants import InputType, OutputType, Stage, ValueType
from numpy.random import randint

In [None]:
# credentials are being passed to the client via environment variables ARTHUR_API_KEY & ARTHUR_ENDPOINT_URL
connection = ArthurAI()

In [None]:
# Initialize the model with overall metadata about its input and output types
MODEL_METADATA = {
    "partner_model_id": f"SparkBostonHousingModel_FG-{datetime.now().strftime('%Y%m%d%H%M%S')}",
    "description": "Spark Boston Housing Model",
    "input_type": InputType.Tabular,
    "model_type": OutputType.Regression,
    "tags": ['Spark'],
    "is_batch": True
}

model = connection.model(**MODEL_METADATA)

In [None]:
pred_to_ground_truth_map = {"medv_pred": "medv_gt"}
model.build(predicted_train_data.toPandas(), pred_to_ground_truth_map)

In [None]:
# chas and rad are categorical, check the inferred possible categories
print(model.get_attribute('chas').categories)
print(model.get_attribute('rad').categories)

In [None]:
model_id = model.save()

with open("fullguide_model_id.txt", "w") as f:
    f.write(model_id)

In [None]:
# you can fetch a model by ID. for example pull the last-created model:
# with open("fullguide_model_id.txt", "r") as f:
#     model_id = f.read()
# arthur_model = connection.get_model(model_id)

### Enabling Explainability

In [None]:
# When using a spark model be sure to allocate at least 2 cpus to the model server.
# This can scale as you change the configurations of the spark session in your entrypoint
# script.
import os

model.enable_explainability(df=predicted_train_data.toPandas(), project_directory=os.path.abspath(""),
                            user_predict_function_import_path='entrypoint',
                            streaming_explainability_enabled=False,
                            requirements_file='requirements.txt')

## Send an inference batch:

In [None]:
import uuid
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# In order to send ground truth we must use an external id to match up rows in the ground truth dataframe and
# inferences dataframe
predicted_test_data = (predicted_test_data.withColumn('inference_timestamp', F.current_timestamp())
                                          .withColumn("partner_inference_id", F.rand().cast(StringType())))
predicted_test_data.show(5)

In [None]:
# Now we separate out the inference input dataframe frame and the ground truth dataframe
pipeline_input_attr_names = [attr.name for attr in model.get_attributes(Stage.ModelPipelineInput)]
columns_to_select = pipeline_input_attr_names + ['medv_pred', 'partner_inference_id', 'inference_timestamp']
batch_inferences = predicted_test_data.select(columns_to_select)

In [None]:
# getting ground truth batch dataframe
columns_to_select = ['medv_gt', 'partner_inference_id']
ground_truth_batch = predicted_test_data.select(columns_to_select).withColumn('ground_truth_timestamp', F.current_timestamp())

In [None]:
# write inferences dataframe to parquet file
batch_inferences.write.mode('overwrite').parquet("./data/batch_inference_files/batch_inferences.parquet")
ground_truth_batch.write.mode('overwrite').parquet("./data/batch_ground_truth_files/ground_truth.parquet")

In [None]:
model.send_bulk_inferences(directory_path='./data/batch_inference_files/', batch_id="batch1")

In [None]:
model.send_bulk_ground_truths(directory_path='./data/batch_ground_truth_files/')