In [None]:
import mlflow

# Test experiment

In [None]:
experiment_name = "/Shared/experiments/test-from-jl"
mlflow.set_experiment(experiment_name)

In [None]:
import time
with mlflow.start_run(run_name="test_run") as run:
    mlflow.log_param("x", 2)
    time.sleep(2)
    mlflow.log_param("a", 2)
    mlflow.log_param("s", "value")
    mlflow.log_metric("mse", 2.34)
    
    run_id = run.info.run_id

print(run_id)

**Results for this run**

In [None]:
mb = dbbrowser.experiments(experiment_name)
runs = mb.get_runs(run_id)
mb.display(runs)

**Results for all runs**

In [None]:
runs = mb.get_runs()
mb.display(runs)

In [None]:
dbutils.notebook.exit("End of 1st example")

# Keras experiment

In [None]:
experiment_name = "/Shared/experiments/keras-from-jl"
mlflow.set_experiment(experiment_name)

In [None]:
import numpy as np
import tensorflow.keras as keras
from tensorflow.keras.datasets import reuters
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.preprocessing.text import Tokenizer

# The following import and function call are the only additions to code required
# to automatically log metrics and parameters to MLflow.
import mlflow.tensorflow

mlflow.tensorflow.autolog() 

max_words = 1000
batch_size = 32
epochs = 5

print('Loading data...')
(x_train, y_train), (x_test, y_test) = reuters.load_data(num_words=max_words,
                                                         test_split=0.2)

print(len(x_train), 'train sequences')
print(len(x_test), 'test sequences')

num_classes = np.max(y_train) + 1
print(num_classes, 'classes')

print('Vectorizing sequence data...')
tokenizer = Tokenizer(num_words=max_words)
x_train = tokenizer.sequences_to_matrix(x_train, mode='binary')
x_test = tokenizer.sequences_to_matrix(x_test, mode='binary')
print('x_train shape:', x_train.shape)
print('x_test shape:', x_test.shape)

print('Convert class vector to binary class matrix '
      '(for use with categorical_crossentropy)')
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
print('y_train shape:', y_train.shape)
print('y_test shape:', y_test.shape)

print('Building model...')
model = Sequential()
model.add(Dense(512, input_shape=(max_words,)))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(num_classes))
model.add(Activation('softmax'))

model.compile(loss='categorical_crossentropy',
              optimizer='adam',
              metrics=['accuracy'])

with mlflow.start_run(run_name="keras_run") as run:
    history = model.fit(x_train, y_train,
                        batch_size=batch_size,
                        epochs=epochs,
                        verbose=1,
                        validation_split=0.1)

    run_id = run.info.run_id

print(run_id)

score = model.evaluate(x_test, y_test,
                       batch_size=batch_size, verbose=1)
print('Test score:', score[0])
print('Test accuracy:', score[1])

In [None]:
mb = dbbrowser.experiments(experiment_name)
runs = mb.get_runs(run_id)
mb.display(runs)

In [None]:
dbutils.notebook.exit("End of 2nd example")

# Spark MLlib with auto logging

In [None]:
experiment_name = "/Shared/experiments/mllib-from-jl"
mlflow.set_experiment(experiment_name)

In [None]:
training = spark.read.format("libsvm").load("/databricks-datasets/mnist-digits/data-001/mnist-digits-train.txt")
test = spark.read.format("libsvm").load("/databricks-datasets/mnist-digits/data-001/mnist-digits-test.txt")

training.cache()
test.cache()

print("There are {} training images and {} test images.".format(training.count(), test.count()))

In [None]:
# Import the ML classification, indexer, and pipeline classes 
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [None]:
# StringIndexer: Read input column "label" (digits) and annotate them as categorical values.
indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")

# DecisionTreeClassifier: Learn to predict column "indexedLabel" using the "features" column.
dtc = DecisionTreeClassifier(labelCol="indexedLabel")

# Chain indexer + dtc together into a single ML Pipeline.
pipeline = Pipeline(stages=[indexer, dtc])

In [None]:
spark.conf.set("spark.databricks.mlflow.trackMLlib.enabled", "true")

In [None]:
# Define an evaluation metric.  In this case, use "weightedPrecision", which is equivalent to 0-1 accuracy.
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", metricName="weightedPrecision")


In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
grid = ParamGridBuilder() \
  .addGrid(dtc.maxDepth, [2, 4, 6, 8]) \
  .addGrid(dtc.maxBins, [2, 4, 8]) \
  .build()


In [None]:
cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=grid, numFolds=3)

In [None]:
import mlflow
import mlflow.mleap

with mlflow.start_run() as run:
    cvModel = cv.fit(training)
    mlflow.set_tag('owner_team', 'UX Data Science') # Logs user-defined tags
    
    test_metric = evaluator.evaluate(cvModel.transform(test))
    
    mlflow.log_metric('test_' + evaluator.getMetricName(), test_metric) # Logs additional metrics
    
    mlflow.mleap.log_model(spark_model=cvModel.bestModel, sample_input=test, artifact_path='best-model') # Logs the best model via mleap

    run_id = run.info.run_id
    
print(run_id)

In [None]:
mb = dbbrowser.experiments(experiment_name)
runs = mb.get_runs(run_id)
mb.display(runs)

In [None]:
best_runs = runs[runs["M.avg_weightedPrecision"] > 0.6]
best_runs

In [None]:
mb.compare(best_runs)

In [None]:
dbutils.notebook.exit("End of 3rd example")