<img src="https://github.com/Microsoft/sqlworkshops/blob/master/graphics/solutions-microsoft-logo-small.png?raw=true" alt="Microsoft">
<br>

# **SQL Server 2019 big data cluster Tutorial**
## **07 - Using Spark For Machine Learning**

In this tutorial you will learn how to train Spark ML model in a SQL Server big data cluster and export is as a MLeap bundle. 

Wide World Importers has refridgerated trucks to deliver temperature-sensitive products. These are high-profit, and high-expense items. In the past, there have been failures in the cooling systems, and the primary culprit has been the deep-cycle batteries used in the system.

WWI began replacing the batteriess every three months as a preventative measure, but this has a high cost. Recently, the taxes on recycling batteries has increased dramatically. The CEO has asked the Data Science team if they can investigate creating a Predictive Maintenance system to more accurately tell the maintenance staff how long a battery will last, rather than relying on a flat 3 month cycle. 

The trucks have sensors that transmit data to a file location. The trips are also logged. In this Jupyter Notebook, you'll create, train and store a Machine Learning model using SciKit-Learn, so that it can be deployed to multiple hosts. 

> Switch your kernel to PySpark and run print("hello") or whatever you like to activate Spark context.  
>
> If it output error like this
> ```
> The code failed because of a fatal error:
>   Error sending http request and maximum retry encountered..
> ```
> Please switch to another kernel and switch back, and run again.


In [None]:
print("hello")

In [None]:
## train a pyspark model and export it as a mleap bundle
import os
import numpy as np

# parse command line arguments
import argparse
parser = argparse.ArgumentParser(description = 'train pyspark model and export mleap bundle')
parser.add_argument('hdfs_path', nargs='?', default = "/spark_ml", type = str)
parser.add_argument('model_name_export', nargs='?', default = "battery_life_pipeline.zip", type = str)
args = parser.parse_args()

hdfs_path = args.hdfs_path
model_name_export = args.model_name_export

In [None]:
# read the data into a spark data frame.
cwd = os.getcwd()
filename = "sensor-data.csv"

## NOTE: reading text file from local file path seems flaky!
#import urllib.request
#url = "https://amldockerdatasets.azureedge.net/" + filename
#local_filename, headers = urllib.request.urlretrieve(url, filename)
#datafile = "file://" + os.path.join(cwd, filename)

data_all = spark.read\
    .options(
        header='true', 
        inferSchema='true', 
        ignoreLeadingWhiteSpace='true', 
        ignoreTrailingWhiteSpace='true')\
    .csv(filename) #.load(datafile) for local file

print("Number of rows: {},  Number of coulumns : {}".format(data_all.count(), len(data_all.columns)))

In [None]:
data_all.printSchema() 

First, download the sensor data from the location where it is transmitted from the trucks, and load it into a Spark DataFrame.

After examining the data, the Data Science team selects certain columns that they believe are highly predictive of the battery life.

In [None]:
# choose feature columns and the label column for training.
label = data_all.columns[0]
xvars = data_all.columns[2:7]
xvars.extend(data_all.columns[9:73])

print("label: {}, features: {}".format(label, xvars))

select_cols = xvars
select_cols.append(label)
data = data_all.select(select_cols)

In [None]:
## split data into train and test.

train, test = data.randomSplit([0.75, 0.25], seed=123)

print("train ({}, {})".format(train.count(), len(train.columns)))
print("test ({}, {})".format(test.count(), len(test.columns)))

train_data_path = os.path.join(hdfs_path, "sensor-data-train")
test_data_path = os.path.join(hdfs_path, "sensor-data-test")

# write the train and test data sets to intermediate storage and then read
train.write.mode('overwrite').orc(train_data_path)
test.write.mode('overwrite').orc(test_data_path)

print("train and test datasets saved to {} and {}".format(train_data_path, test_data_path))

train_read = spark.read.orc(train_data_path)
test_read = spark.read.orc(test_data_path)

assert train_read.schema == train.schema and train_read.count() == train.count() 
assert test_read.schema == test.schema and test_read.count() == test.count()


The lead Data Scientist believes that a standard Regression algorithm would do the best predictions.

In [None]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Convert columns with string type to indexed string type
str_indexer_1 = StringIndexer(inputCol="Region", outputCol="Reg")
str_indexer_2 = StringIndexer(inputCol="Manufacture_Year", outputCol="Manu_Year")

# Add newly created columns
xvars.append("Reg")
xvars.append("Manu_Year")

# Remove duplicate columns
xvars.remove("Region")
xvars.remove("Manufacture_Year")

# Assemble the encoded feature columns in to a column named "features"
assembler = VectorAssembler(inputCols=xvars, outputCol="features", handleInvalid="skip")

# Create a Gradient Boosted Tree Reression model, which by default uses "features" and "label" columns for training
gbt = GBTRegressor(labelCol=label)

# Put together the pipeline
stages = []
stages.append(str_indexer_1)
stages.append(str_indexer_2)
stages.append(assembler)
stages.append(gbt)

pipe = Pipeline(stages=stages)
print("Pipeline created")

# Train the model.
model = pipe.fit(train_read)
print("Model Trained")
print("Model is ", model)
print("Model Stages", model.stages)

In [None]:
# Try making a single prediction and observe the result 
pred = model.transform(test_read)

After the model is trained, perform testing from labeled data.

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol=label)
print("Root mean squared error(RMSE) on test data = %g" % evaluator.evaluate(pred))

Calculate the average value of prediction.

In [None]:
pred.select("prediction").agg({"prediction": "avg"}).show()

In [None]:
## save and load the model with ML persistence
# https://spark.apache.org/docs/latest/ml-pipeline.html#ml-persistence-saving-and-loading-pipelines

##NOTE: by default the model is saved to and loaded from hdfs
model_name = "battery_life.mml"
model_fs = os.path.join(hdfs_path, model_name)

model.write().overwrite().save(model_fs)
print("saved model to {}".format(model_fs))

# load the model file (from hdfs)
print("load pyspark model from hdfs")
model_loaded = PipelineModel.load(model_fs)
assert str(model_loaded) == str(model)

print("loaded model from {}".format(model_fs))
print("Model is " , model_loaded)
print("Model stages", model_loaded.stages)

In [None]:
## export and import model with mleap

import mleap.pyspark
from mleap.pyspark.spark_support import SimpleSparkSerializer

# serialize the model to a local zip file in JSON format
#model_name_export = "adult_census_pipeline.zip"
model_name_path = cwd
model_file = os.path.join("/root", model_name_export)

# remove an old model file, if needed.
if os.path.isfile(model_file):
    os.remove(model_file)

model_file_path = "jar:file:{}".format(model_file)
model.serializeToBundle(model_file_path, model.transform(train))

## import mleap model
model_deserialized = PipelineModel.deserializeFromBundle(model_file_path)
assert str(model_deserialized) == str(model)

print("The deserialized model is ", model_deserialized)
print("The deserialized model stages are", model_deserialized.stages)
print("The deserialized model is in", model_file_path)

## **Next Steps: Continue on deploying model to SQL Server big data cluster**
Now you're ready to open the Jupyter Notebook in this tutorial series - `bdc_tutorial_08.ipynb` - to learn how to export Spark machine learning model and deploy it to SQL Server big data cluster.