#### Using MLflow in DataBricks
We will be using a dataset from [Kaggle](https://www.kaggle.com/datasets/aungpyaeap/fish-market/download?datasetVersionNumber=2) for predicting the weights of fish using a linear regression model. Using this model, we will learn how to track and register a machine learning model in DataBricks.

In [0]:
# Install the mlflow library if it's not already installed on your compute. 
# %pip install mlflow

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
import requests
import pyspark.sql.functions as F
import mlflow

In [0]:
# load data
url = "https://raw.githubusercontent.com/bridg3r/guide_template/main/Fish.csv"
r = requests.get(url)

# Split the content into lines and convert it into an RDD
rdd = spark.sparkContext.parallelize(r.text.splitlines())
df = spark.read.csv(rdd, header=True)

display(df)

Species,Weight,Length1,Length2,Length3,Height,Width
Bream,242.0,23.2,25.4,30.0,11.52,4.02
Bream,290.0,24.0,26.3,31.2,12.48,4.3056
Bream,340.0,23.9,26.5,31.1,12.3778,4.6961
Bream,363.0,26.3,29.0,33.5,12.73,4.4555
Bream,430.0,26.5,29.0,34.0,12.444,5.134
Bream,450.0,26.8,29.7,34.7,13.6024,4.9274
Bream,500.0,26.8,29.7,34.5,14.1795,5.2785
Bream,390.0,27.6,30.0,35.0,12.67,4.69
Bream,450.0,27.6,30.0,35.1,14.0049,4.8438
Bream,500.0,28.5,30.7,36.2,14.2266,4.9594


#### Preparing the Data
To prepare the data for the model, we need to one-hot encode the 'Species' column and make sure all the columns are integers. Then we will add all of our features to a single vector column.

In [0]:
# one hot encode the species column
# define the values in the species column
distinct_species = ['Perch', 'Bream', 'Roach', 'Pike', 'Smelt', 'Parkki', 'Whitefish']

# for each value create a column with 1 is species and 0 is not species
for value in distinct_species:
    df = df.withColumn(f"Species_{value}", F.expr(f"""CASE WHEN Species ='{value}' THEN '1' ELSE '0' END"""))
          
# drop the og species column
df = df.drop(*['Species'])  

# df.printSchema
# notice that all the columns are strings
# cast each column as integer
for column in df.columns:
    df = df.withColumn(column, F.col(column).cast("integer"))

display(df)

Weight,Length1,Length2,Length3,Height,Width,Species_Perch,Species_Bream,Species_Roach,Species_Pike,Species_Smelt,Species_Parkki,Species_Whitefish
242,23,25,30,11,4,0,1,0,0,0,0,0
290,24,26,31,12,4,0,1,0,0,0,0,0
340,23,26,31,12,4,0,1,0,0,0,0,0
363,26,29,33,12,4,0,1,0,0,0,0,0
430,26,29,34,12,5,0,1,0,0,0,0,0
450,26,29,34,13,4,0,1,0,0,0,0,0
500,26,29,34,14,5,0,1,0,0,0,0,0
390,27,30,35,12,4,0,1,0,0,0,0,0
450,27,30,35,14,4,0,1,0,0,0,0,0
500,28,30,36,14,4,0,1,0,0,0,0,0


In [0]:
# all features should be in one vector column
from pyspark.ml.feature import VectorAssembler

# define which cols are features and what the output column should be
vectorAssembler = VectorAssembler(inputCols = ['Length1', 'Length2', 'Length3', 'Height', 'Width', 'Species_Perch', 'Species_Bream', 'Species_Roach', 'Species_Pike', 'Species_Smelt', 'Species_Parkki', 'Species_Whitefish'], outputCol = 'features')

# apply to df
vdf = vectorAssembler.transform(df)

# select the features column and the weight coumn
vdf = vdf.select(['features', 'Weight'])

display(vdf)

features,Weight
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(23.0, 25.0, 30.0, 11.0, 4.0, 1.0))",242
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(24.0, 26.0, 31.0, 12.0, 4.0, 1.0))",290
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(23.0, 26.0, 31.0, 12.0, 4.0, 1.0))",340
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(26.0, 29.0, 33.0, 12.0, 4.0, 1.0))",363
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(26.0, 29.0, 34.0, 12.0, 5.0, 1.0))",430
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(26.0, 29.0, 34.0, 13.0, 4.0, 1.0))",450
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(26.0, 29.0, 34.0, 14.0, 5.0, 1.0))",500
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(27.0, 30.0, 35.0, 12.0, 4.0, 1.0))",390
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(27.0, 30.0, 35.0, 14.0, 4.0, 1.0))",450
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(28.0, 30.0, 36.0, 14.0, 4.0, 1.0))",500


#### MLflow Autolog for Tracking
The logging function mlflow.autolog() allows for easy pyspark.ml model tracking in DataBricks. Since we are using pyspark.ml, the code in Cmd 9 connects your model to the MLflow experiments tab on the right panel. 

Notice that after running Cmd 9 and Cmd 10, a green dot appears on the MLflow experiments tab. Click on the tab and notice the features that show up. As the model runs, you can expand the orange square to see the parameters that were set and expand the blue square to see the metrics that we defined (it may take a moment for the metrics to show up). This is what MLflow tracking is. Adjust any of the parameters and rerun the model. You will see a new run show up in the experiments tab, where you can see the new parameters and metrics. 

If you are using a different model library such as sklearn, the logging function will be different, like mlflow.sklearn.autolog(). [This link](https://mlflow.org/docs/latest/tracking.html#automatic-logging) shows the logging functions you'll need for different libraries.

In [0]:
mlflow.autolog()

2023/06/18 02:49:21 INFO mlflow._spark_autologging: Autologging successfully enabled for spark.
2023/06/18 02:49:21 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.
2023/06/18 02:49:21 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.ml.


#### The Model
We are using a linear Regression model from pyspark.ml to predict the weights of fish.

In [0]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# split to training and testing data
splits = vdf.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

# parameters
maxIter = 15
regParam = 0.2
elasticNetParam = 0.9

# model
lr = LinearRegression(featuresCol = 'features', labelCol='Weight', maxIter = maxIter, regParam = regParam, elasticNetParam = elasticNetParam)
lr_model = lr.fit(train_df)
lr_predictions = lr_model.transform(test_df)

# metrics must be defined in mllib, but sklearn automatically calculates them
r2_metric = RegressionEvaluator(predictionCol="prediction",labelCol="Weight",metricName="r2")
r2_metric.evaluate(lr_predictions)
rmse_metric = RegressionEvaluator(predictionCol="prediction",labelCol="Weight",metricName="rmse")
rmse_metric.evaluate(lr_predictions)

2023/06/18 02:49:22 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'e91bafe2b9f24cfeb8e0c549248e83b5', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow
2023/06/18 02:49:49 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
Out[6]: 95.79737536112022

In [0]:
display(lr_predictions)

features,Weight,prediction
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(26.0, 29.0, 34.0, 14.0, 5.0, 1.0))",500,475.77720752861535
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(31.0, 34.0, 39.0, 15.0, 5.0, 1.0))",620,662.1406227269835
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(31.0, 35.0, 40.0, 15.0, 6.0, 1.0))",680,694.0666852170376
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 6), values -> List(32.0, 35.0, 40.0, 16.0, 6.0, 1.0))",720,719.4333175596445
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 7), values -> List(12.0, 14.0, 16.0, 4.0, 2.0, 1.0))",40,-179.19058619988857
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 7), values -> List(18.0, 19.0, 22.0, 5.0, 3.0, 1.0))",87,40.26570719979816
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 7), values -> List(19.0, 21.0, 23.0, 6.0, 3.0, 1.0))",120,99.18650037238888
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 7), values -> List(20.0, 22.0, 24.0, 5.0, 3.0, 1.0))",150,121.14894442151147
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 7), values -> List(20.0, 22.0, 24.0, 6.0, 3.0, 1.0))",145,133.9074769136373
"Map(vectorType -> sparse, length -> 12, indices -> List(0, 1, 2, 3, 4, 7), values -> List(20.0, 22.0, 25.0, 7.0, 3.0, 1.0))",160,157.33760195731406


#### Registering Models

Follow along as we learn how to register our models. Registering your model will produce an id that you can share with anyone. The two best models' ids will be sent in Slack. Then uncomment the code below, add the logged_model id, and run the code.

In [0]:
# import mlflow
# logged_model = ''

# # Load model
# loaded_model = mlflow.spark.load_model(logged_model)

# # Perform inference via model.transform()
# new_model = loaded_model.transform(test_df)

# display(new_model)

2023/06/18 02:51:09 INFO mlflow.spark: 'runs:/f3b6b7e7ab29494fba4049f23db707ce/model' resolved as 'dbfs:/databricks/mlflow-tracking/3381365783933738/f3b6b7e7ab29494fba4049f23db707ce/artifacts/model'
Out[8]: DataFrame[features: vector, Weight: int, prediction: double]