## Kaggle Titanic Dataset: Machine Learning using PySpark, MLflow ,Azure Databricks and Microsoft ML

<a href="https://www.kaggle.com/c/titanic/">Titanic Dataset on Kaggle</a>

* Data Engineering & Analysis
  - Extracting Data
  - Exploratory Analysis
  - Cleaning Data
* Data Science
  - ML Workflow
  - Training Models
  - Tuning Model Parameters
  - Model Registration
* Deployment
  - Overview of Model Serving Options
  - Serving in Batch and Streams
  - Serving via Web Service

In [0]:
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
# from pyspark.ml.feature import StringIndexer
# from pyspark.ml.feature import VectorAssembler
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# from pyspark.ml.feature import QuantileDiscretizer


In [0]:
# spark = SparkSession \
#     .builder \
#     .appName("Spark ML example on titanic data ") \
#     .getOrCreate()

#### Reading the Training File -> Read data from CSV using PySpark

In [0]:
titanicDF = spark.read.csv("dbfs:/FileStore/tables/titanic/train.csv",header = 'True',inferSchema='True')

In [0]:
display(titanicDF)

<h3>Data Dictionary</h3>
<table style="width: 100%;">
<tbody>
<tr><th><b>Variable</b></th><th><b>Definition</b></th><th><b>Key</b></th></tr>
<tr>
<td>survival</td>
<td>Survival</td>
<td>0 = No, 1 = Yes</td>
</tr>
<tr>
<td>pclass</td>
<td>Ticket class</td>
<td>1 = 1st, 2 = 2nd, 3 = 3rd</td>
</tr>
<tr>
<td>sex</td>
<td>Sex</td>
<td></td>
</tr>
<tr>
<td>Age</td>
<td>Age in years</td>
<td></td>
</tr>
<tr>
<td>sibsp</td>
<td># of siblings / spouses aboard the Titanic</td>
<td></td>
</tr>
<tr>
<td>parch</td>
<td># of parents / children aboard the Titanic</td>
<td></td>
</tr>
<tr>
<td>ticket</td>
<td>Ticket number</td>
<td></td>
</tr>
<tr>
<td>fare</td>
<td>Passenger fare</td>
<td></td>
</tr>
<tr>
<td>cabin</td>
<td>Cabin number</td>
<td></td>
</tr>
<tr>
<td>embarked</td>
<td>Port of Embarkation</td>
<td>C = Cherbourg, Q = Queenstown, S = Southampton</td>
</tr>
</tbody>
</table>

<h3>Variable Notes</h3>
<p><b>pclass</b>: A proxy for socio-economic status (SES)<br> 1st = Upper<br> 2nd = Middle<br> 3rd = Lower<br><br> <b>age</b>: Age is fractional if less than 1. If the age is estimated, is it in the form of xx.5<br><br> <b>sibsp</b>: The dataset defines family relations in this way...<br> Sibling = brother, sister, stepbrother, stepsister<br> Spouse = husband, wife (mistresses and fiancés were ignored)<br><br> <b>parch</b>: The dataset defines family relations in this way...<br> Parent = mother, father<br> Child = daughter, son, stepdaughter, stepson<br> Some children travelled only with a nanny, therefore parch=0 for them.</p>

Alternatively, we can register the DataFrame as a SQL table and run SQL commands against it

In [0]:
titanicDF.write.mode("overwrite").format("delta").saveAsTable("titanic")

### Summary of Data

In [0]:
display(titanicDF.describe())

### Exploratory Data Analysis

In [0]:
titanicDF.select("Survived","Pclass","Embarked").show()


In [0]:
display(titanicDF.groupBy('Survived').count())

In [0]:
display(titanicDF.groupBy('Survived', 'Sex').count())

In [0]:
titanicDF.groupBy("Sex","Survived").count().show()

In [0]:
titanicDF.groupBy("Pclass","Survived").count().show()

# Cleaning Data

In [0]:
#Checking Null values
# This function use to print feature with null values and null count 
def null_value_count(df):
  null_columns_counts = []
  numRows = df.count()
  for k in df.columns:
    nullRows = df.where(col(k).isNull()).count()
    if(nullRows > 0):
      temp = k,nullRows
      null_columns_counts.append(temp)
  return(null_columns_counts)

In [0]:
# Calling function
null_columns_count_list = null_value_count(titanicDF)


In [0]:
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

In [0]:
mean_age = titanicDF.select(mean('Age')).collect()[0][0]
print(mean_age)

In [0]:
titanicDF.select("Name").show()

### Renaming Columns

In [0]:
#To replace these NaN values, we can assign them the mean age of the dataset.But the problem is, there were many people with many different ages. We #just cant assign a 4 year kid with the mean age that is 29 years.
#Using the Regex ""[A-Za-z]+)." we extract the initials from the Name. It looks for strings which lie between A-Z or a-z and followed by a .(dot).
titanicDF = titanicDF.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))


In [0]:
titanicDF.show()

In [0]:
titanicDF.select("Initial").distinct().show()


In [0]:
#There are some misspelled Initials like Mlle or Mme that stand for Miss. I will replace them with Miss and same thing for other values.
titanicDF = titanicDF.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
               ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])


In [0]:
#Let's check average age by initials
titanicDF.groupby('Initial').avg('Age').collect()

In [0]:
#Let's impute missing values in age feature based on average age of Initials
titanicDF = titanicDF.withColumn("Age",when((titanicDF["Initial"] == "Miss") & (titanicDF["Age"].isNull()), 22).otherwise(titanicDF["Age"]))
titanicDF = titanicDF.withColumn("Age",when((titanicDF["Initial"] == "Other") & (titanicDF["Age"].isNull()), 46).otherwise(titanicDF["Age"]))
titanicDF = titanicDF.withColumn("Age",when((titanicDF["Initial"] == "Master") & (titanicDF["Age"].isNull()), 5).otherwise(titanicDF["Age"]))
titanicDF = titanicDF.withColumn("Age",when((titanicDF["Initial"] == "Mr") & (titanicDF["Age"].isNull()), 33).otherwise(titanicDF["Age"]))
titanicDF = titanicDF.withColumn("Age",when((titanicDF["Initial"] == "Mrs") & (titanicDF["Age"].isNull()), 36).otherwise(titanicDF["Age"]))

In [0]:
#Check the imputation
titanicDF.select("Age").show()

In [0]:
#Embarked feature has only two missing values. Let's check values within Embarked
titanicDF.groupBy("Embarked").count().show()

In [0]:
titanicDF = titanicDF.na.fill({"Embarked" : 'S'})

In [0]:
titanicDF.groupBy("Embarked").count().show()

In [0]:
#We can drop Cabin features as it has lots of null values
titanicDF = titanicDF.drop("Cabin")

In [0]:
titanicDF.printSchema()

In [0]:
titanicDF.show(10)

In [0]:
#We can create a new feature called "Family_size" and "Alone" and analyse it. This feature is the summation of Parch(parents/children) and #SibSp(siblings/spouses). It gives us a combined data so that we can check if survival rate have anything to do with family size of the passengers
titanicDF = titanicDF.withColumn("Family_Size",col('SibSp')+col('Parch'))

In [0]:
titanicDF.show(10)

In [0]:
titanicDF.groupBy("Family_Size").count().show()


In [0]:
titanicDF = titanicDF.withColumn('Alone',lit(0))


In [0]:
titanicDF.show(50)

In [0]:
titanicDF = titanicDF.withColumn("Alone",when(titanicDF["Family_Size"] == 0, 1).otherwise(titanicDF["Alone"]))

In [0]:
titanicDF.show(50)

In [0]:
titanicDF.columns

### Convert Categorical Columns to Numerical
The [StringIndexer](https://spark.apache.org/docs/latest/ml-features.html#stringindexer) function convert a string column to an index column

In [0]:
#Lets convert Sex, Embarked & Initial columns from string to number using StringIndexer
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(titanicDF) for column in ["Sex","Embarked","Initial"]]
pipeline = Pipeline(stages=indexers)
titanicDF = pipeline.fit(titanicDF).transform(titanicDF)

### Dropping Columns

In [0]:
titanicDF = titanicDF.drop("PassengerId","Name","Ticket","Cabin","Embarked","Sex","Initial")

### Remove Null Values

In [0]:
# Drop all rows containing any null or NaN values

titanicCleanDF = titanicCleanDF.na.drop()

# Saving Our Work
Let's register the new cleaned DataFrame as a Table

In [0]:
titanicCleanDF.write.mode("overwrite").format("delta").saveAsTable("titanic_clean")

In [0]:
#Let's put all features into vector

# Machine Learning

**MLlib is Spark’s machine learning (ML) library. Its goal is to make practical machine learning scalable and easy.**

**At a high level, it provides tools such as:**
* ML Algorithms: common learning algorithms such as classification, regression, clustering, and collaborative filtering
* Featurization: feature extraction, transformation, dimensionality reduction, and selection
* Pipelines: tools for constructing, evaluating, and tuning ML Pipelines
* Persistence: saving and load algorithms, models, and Pipelines
* Utilities: linear algebra, statistics, data handling, etc.

[Spark MLlib Guide](https://spark.apache.org/docs/latest/ml-guide.html)

In [0]:
feature = VectorAssembler(inputCols=titanicDF.columns[1:],outputCol="features")
feature_vector= feature.transform(titanicDF)

In [0]:
display(feature_vector)

In [0]:
trainDF, testDF = feature_vector.randomSplit([0.8, 0.2],seed = 11)

## Here is the list of few Classification Algorithms from Spark ML

### LogisticRegression

In [0]:
#LogisticRegression
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")
#Training algo
lrModel = lr.fit(trainDF)
lr_prediction = lrModel.transform(testDF)
#lr_prediction.select("prediction", "Survived", "features").show()
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
#Evaluating the accuracy of LogisticRegression.
lr_accuracy = evaluator.evaluate(lr_prediction)
print("Accuracy of LogisticRegression is = %g"% (lr_accuracy))
print("Test Error of LogisticRegression = %g " % (1.0 - lr_accuracy))

### DecisionTreeClassifier

In [0]:
#DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(trainDF)
dt_prediction = dt_model.transform(testDF)
#dt_prediction.select("prediction", "Survived", "features").show()
dt_accuracy = evaluator.evaluate(dt_prediction)
print("Accuracy of DecisionTreeClassifier is = %g"% (dt_accuracy))
print("Test Error of DecisionTreeClassifier = %g " % (1.0 - dt_accuracy))

### RandomForestClassifier

In [0]:
#RandomForestClassifier
from pyspark.ml.classification import RandomForestClassifier
rf = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(trainDF)
rf_prediction = rf_model.transform(testDF)
#rf_prediction.select("prediction", "Survived", "features").show()
rf_accuracy = evaluator.evaluate(rf_prediction)
print("Accuracy of RandomForestClassifier is = %g"% (rf_accuracy))
print("Test Error of RandomForestClassifier  = %g " % (1.0 - rf_accuracy))

### Gradient-boosted tree classifier

In [0]:
#Gradient-boosted tree classifier
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(trainDF)
gbt_prediction = gbt_model.transform(testDF)
#gbt_prediction.select("prediction", "Survived", "features").show()
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Accuracy of Gradient-boosted tree classifie is = %g"% (gbt_accuracy))
print("Test Error of Gradient-boosted tree classifie %g"% (1.0 - gbt_accuracy))

### NaiveBayes

In [0]:
#NaiveBayes
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(labelCol="Survived", featuresCol="features")
nb_model = nb.fit(trainDF)
nb_prediction = nb_model.transform(testDF)
#nb_prediction.select("prediction", "Survived", "features").show()
nb_accuracy = evaluator.evaluate(nb_prediction)
print("Accuracy of NaiveBayes is  = %g"% (nb_accuracy))
print("Test Error of NaiveBayes  = %g " % (1.0 - nb_accuracy))

### Support Vector Machine

In [0]:
#Support Vector Machine
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(labelCol="Survived", featuresCol="features")
svm_model = svm.fit(trainDF)
svm_prediction = svm_model.transform(testDF)
#svm_prediction.select("prediction", "Survived", "features").show()
svm_accuracy = evaluator.evaluate(svm_prediction)
print("Accuracy of Support Vector Machine is = %g"% (svm_accuracy))
print("Test Error of Support Vector Machine = %g " % (1.0 - svm_accuracy))

### Tracking Experiments and Registering Models using [MLflow](https://mlflow.org/)

**MLflow Tracking** is...<br><br>

* a logging API specific for machine learning 
* agnostic to libraries and environments that do the training
* organized around the concept of **runs**, which are executions of data science code
* runs are aggregated into **experiments** where many runs can be a part of a given experiment 
* An MLflow server can host many experiments

Each run can record the following information:<br><br>

- **Parameters:** Key-value pairs of input parameters such as the number of trees in a random forest model
- **Metrics:** Evaluation metrics such as RMSE or Area Under the ROC Curve
- **Artifacts:** Arbitrary output files in any format.  This can include images, pickled models, and data files
- **Source:** The code that originally ran the experiment

Experiments can be tracked using libraries in Python, R, and Java as well as by using the CLI and REST calls.

The **MLflow Model Registry** allows you to... <br><br>

* Discover registered models, experiment runs, and associated code with a registered model
* Transition models to deployment stages
* Deploy different versions of a registered model in different stages
* Archive older models for posterity and provenance
* Peruse model activities and annotations throughout model’s lifecycle
* Control granular access and permission for model registrations, transitions or modifications


See [MLflow Guide](https://docs.microsoft.com/en-us/azure/databricks/applications/mlflow/)
<div><img src="https://databricks.com/wp-content/uploads/2020/04/databricks-adds-access-control-to-mlflow-model-registry_01.jpg" style="height: 450px; margin: 20px"/></div>

In [0]:
import mlflow.spark
#DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(trainDF)
dt_prediction = dt_model.transform(testDF)
dt_prediction.select("prediction", "Survived", "features").show()

mlflow.start_run(run_name="decison_tree")
# Set decision tree `maxDepth` parameter to 2, logging with MLflow
maxDepth = 2
mlflow.log_param("maxDepth", maxDepth)

mlflow.spark.log_model(dt_model,"model")
print(dt_model.toDebugString)

dt_accuracy = evaluator.evaluate(dt_prediction)
mlflow.log_metric("accuracy", dt_accuracy)
print("Accuracy on the test set for the decision tree model: {}".format(dt_accuracy))
mlflow.end_run()

# Model Selection

Building machine learning solutions involves testing a number of different models.  This lesson explores tuning hyperparameters and cross-validation in order to select the optimal model as well as saving models and predictions.

### Tasks:
* Use MLflow to manage the model lifecycle
* Define hyperparameters and motivate their role in machine learning
* Tune hyperparameters using grid search
* Validate model performance using cross-validation
* Register a trained model in MLflow

-sandbox
### Hyperparameter Tuning

Hyperparameter tuning is the process of of choosing the optimal hyperparameters for a machine learning algorithm.  Each algorithm has different hyperparameters to tune.  You can explore these hyperparameters by using the `.explainParams()` method on a model.

**Grid search** is the process of exhaustively trying every combination of hyperparameters.  It takes all of the values we want to test and combines them in every possible way so that we test them using cross-validation.

In [0]:
trainDF, testDF = titanicDF.randomSplit([0.8, 0.2], seed=10)

assembler = VectorAssembler(inputCols=titanicDF.columns[1:], outputCol="features")

dtc = DecisionTreeClassifier(featuresCol="features", labelCol="Survived")

-sandbox
`ParamGridBuilder()` allows us to string together all of the different possible hyperparameters we would like to test.  In this case, we can test the maximum number of iterations, whether we want to use an intercept with the y axis, and whether we want to standardize our features.

In [0]:
from pyspark.ml.tuning import ParamGridBuilder

paramGrid = (ParamGridBuilder()
  .addGrid(dtc.maxDepth, [2, 3, 4, 5, 6])
  .addGrid(dtc.maxBins,  [16, 32, 48, 64])
  .build()
)

-sandbox
### Cross-Validation

There are a number of different ways of conducting cross-validation, allowing us to trade off between computational expense and model performance.  An exhaustive approach to cross-validation would include every possible split of the training set.  More commonly, _k_-fold cross-validation is used where the training dataset is divided into _k_ smaller sets, or folds.  A model is then trained on _k_-1 folds of the training data and the last fold is used to evaluate its performance.

Create a `MulticlassClassificationEvaluator()` to evaluate our grid search experiments and a `CrossValidator()` to build our models.

In [0]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="Survived", metricName="accuracy")

cv = CrossValidator(
  estimator = dtc,                  # Estimator (individual model or pipeline)
  estimatorParamMaps = paramGrid,   # Grid of parameters to try (grid search)
  evaluator = evaluator,            # Evaluator
  numFolds = 5,                     # Set k to 5
  seed = 10                         # Seed to sure our results are the same if ran again
)

-sandbox
Add `VectorAssembler()` and `CrossValidator()` to a `Pipeline()` and fit it to the training dataset.

In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [assembler, cv])

cvModel = pipeline.fit(trainDF)

You can then access the best model using the `.bestModel` attribute.

In [0]:
bestModel = cvModel.stages[-1].bestModel
print(bestModel)

# get the best value for maxDepth parameter
bestDepth = bestModel.getOrDefault("maxDepth")
bestBins = bestModel.getOrDefault("maxBins")

Build final model using the entire training dataset and evaluate its performance using the test set

Log parameters, metrics, and the model iteself in MLflow

In [0]:
import mlflow.spark

with mlflow.start_run(run_name="final_model") as run:
  runID = run.info.run_uuid
  
  # train model
  dtc = DecisionTreeClassifier(featuresCol="features", labelCol="Survived", maxDepth=bestDepth, maxBins=bestBins)
  pipeline = Pipeline(stages = [assembler, dtc])
  finalModel = pipeline.fit(trainDF)
  
  # log parameters and model
  mlflow.log_param("maxDepth", bestDepth)
  mlflow.log_param("maxBins", bestBins)
  mlflow.spark.log_model(finalModel, "model")
  
  # generate and log metrics
  testPredictionDF = finalModel.transform(testDF)
  accuracy = evaluator.evaluate(testPredictionDF)
  mlflow.log_metric("accuracy", accuracy)
  print("Accuracy on the test set for the decision tree model: {}".format(accuracy))

### Register Model

#### Create a new registered model using the API

The following cells use the `mlflow.register_model()` function to create a new registered model whose name begins with the string `Titanic-Model`. This also creates a new model version (e.g., `Version 1` of `Titanic-Model`).

In [0]:
%sql set spark.databricks.userInfoFunctions.enabled = true; select current_user();

In [0]:
userName=dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags().apply('user')
userName="durga"

In [0]:
import time

modelName = "Titanic-Model__" + userName

artifactPath = "model"
modelURI = "runs:/{run_id}/{artifact_path}".format(run_id=runID, artifact_path=artifactPath)

modelDetails = mlflow.register_model(model_uri=modelURI, name=modelName)
time.sleep(5)

### Perform a model stage transition

The MLflow Model Registry defines several model stages: **None**, **Staging**, **Production**, and **Archived**. Each stage has a unique meaning. For example, **Staging** is meant for model testing, while **Production** is for models that have completed the testing or review processes and have been deployed to applications.

In [0]:
from mlflow.tracking.client import MlflowClient
client = MlflowClient()

client.transition_model_version_stage(
  name = modelDetails.name,
  version = modelDetails.version,
  stage='Production',
)

The MLflow Model Registry allows multiple model versions to share the same stage. When referencing a model by stage, the Model Registry will use the latest model version (the model version with the largest version ID). The `MlflowClient.get_latest_versions()` function fetches the latest model version for a given stage or set of stages. The following cell uses this function to print the latest version of the power forecasting model that is in the `Production` stage.

In [0]:
latestVersionInfo = client.get_latest_versions(modelName, stages=["Production"])
latestVersion = latestVersionInfo[0].version
print("The latest production version of the model '%s' is '%s'." % (modelName, latestVersion))

# Model Serving

When we move to talking about actually operationalizing the machine learning models we've built so far, this is where the discussion becomes tricky. Many organizations have yet to reach this step, as it can become quite complex to get here. Depending on the use-case at hand, there are several options for deploying a model and using it to make predictions on new data.

### Agenda:
* Review model serving options
* Load a registered production model
* Perform batch scoring
* Perform stream scoring
* Discuss serving model as a web service

By using Databricks to create your models, you can then choose your serving layer. Whether that's **batch** (where you score data on a regular interval), **streaming** (scoring non-stop data), or via a **web service** (where you make "random" calls to be scored), you can achieve the first 2 options using Databricks directly (or, for more complex pipelines, using scheduling via [Azure Data Factory](https://docs.microsoft.com/en-us/azure/data-factory/solution-template-databricks-notebook)), while the latter can easily be covered by integrating Databricks with [Azure Machine Learning Service](https://docs.microsoft.com/en-us/azure/machine-learning/service/how-to-use-mlflow), for an easy way to deploy to an auto-scalable, containerized API.

See **Azure Reference Architecture** below:
![](https://raw.githubusercontent.com/ddgope/Kaggle-Titanic-Dataset-using-MLFlow-PySpark-Azure-Databricks/master/img/azure_reference_architecture.PNG)

## Load versions of the registered model

In [0]:
from mlflow.tracking.client import MlflowClient
client = MlflowClient()

modelName = "Titanic-Model__" + userName
latestVersionInfo = client.get_latest_versions(modelName, stages=["Production"])
latestVersion = latestVersionInfo[0].version

print("The latest production version of the model '%s' is '%s'." % (modelName, latestVersion))

The following cell uses the `mlflow.spark.load_model()` API to load the latest version of production stage model.

In [0]:
import mlflow.spark

modelURI = latestVersionInfo[0].source
modelPipeline = mlflow.spark.load_model(modelURI)

print("Loading registered model version from URI: '{model_uri}'".format(model_uri=modelURI))

## Predication - Using Bacth Scoring

In [0]:
# Make predictions in batch
predictions = modelPipeline.transform(titanicDF)
display(predictions)

## Stream Scoring
Another option of scoring would be through real-time streams. Imagine a scenario where you have sensor data from different machinery coming in, and, in a predictive maintenance situation, you'd like to be notified on the first occurence of a sensor being out of bounds, to minimise repair times and costs. Due to the throughput of that type of information, a Spark Streaming job is ideal to score data on-the-go, and showcase any potential anomalies.

While in our scenario we don't have frequent updates of information, we can still leverage Spark Streaming to score our batch dataset. We only have to read it as a stream!

In [0]:
# Read data as a stream

streamDF = spark.readStream.table("titanic_clean")

In [0]:
# Make real-time predictions on streaming data

scoredStream = modelPipeline.transform(streamDF)

display(scoredStream)

In [0]:
# Stop streaming jobs
for s in spark.streams.active:
    s.stop()

## Serving Model as a Web Service

#### [Track model metrics and deploy ML models with MLflow and Azure Machine Learning](https://docs.microsoft.com/en-us/azure/machine-learning/how-to-use-mlflow#deploy-mlflow-models-as-a-web-service)
* Track and log experiment metrics and artifacts in your Azure Machine Learning workspace. If you already use MLflow Tracking for your experiments, the workspace provides a centralized, secure, and scalable location to store training metrics and models.
* Deploy your MLflow experiments as an Azure Machine Learning web service. By deploying as a web service, you can apply the Azure Machine Learning monitoring and data drift detection functionalities to your production models.
* Azure deployment infrastructure options:
  * Azure Container Instance - suitable choice for a quick dev-test deployment
  * Azure Kubernetes Service - suitable for scalable production deployments

![Azure ML Arch](https://raw.githubusercontent.com/ddgope/Kaggle-Titanic-Dataset-using-MLFlow-PySpark-Azure-Databricks/master/img/azure_ml_arch.JPG)