In [None]:
import os
import sys
sys.path.append("/mnt/code")

In [None]:
# this code is generated by the Domino Code Assist toolbar button
import domino_code_assist as dca
dca.init()

## Spark and Mlflow

This is an example of using Spark with Mlflow. It currently *(upto 5.7) only works for on-demand Spark*.

**Starting 5.8, it will work for external spark as well.**

In [None]:
pip install pyspark

In [None]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.session import SparkSession

In [None]:
import os
import mlflow
from mlflow.entities import Param,Metric,RunTag

In [None]:
experiment_name = 'SPARK'+'-' + os.environ['DOMINO_STARTING_USERNAME'] + '-' + os.environ['DOMINO_PROJECT_NAME']
model_name = 'SPARK'+'-' + os.environ['DOMINO_PROJECT_NAME']
print("experiment_name:", experiment_name)
mlflow.set_experiment(experiment_name)

client = mlflow.tracking.MlflowClient()
experiment_id = client.get_experiment_by_name(experiment_name).experiment_id
print("experiment_id:", experiment_id)

import time
now = round(time.time())
time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(now))
tags={}

# HELLO WORLD

In [None]:
def run(alpha, run_origin, tags={}):
    with mlflow.start_run(run_name=run_origin,tags=tags) as run:
        print("runId:", run.info.run_uuid)
        print("artifact_uri:", mlflow.get_artifact_uri())
        print("alpha:", alpha)
        print("run_origin:", run_origin)
        mlflow.log_param("alpha", alpha)
        mlflow.log_metric("rmse", 0.789)
        mlflow.set_tag("run_origin", run_origin)
        with open("info.txt", "w") as f:
            f.write("Hi artifact")
        mlflow.log_artifact("info.txt")
        params = [ Param("p1","0.1"), Param("p2","0.2") ]
        metrics = [ Metric("m1",0.1,now,0), Metric("m2",0.2,now,0) ]
        tags = [ RunTag("t1","hi1"), RunTag("t2","hi2") ]
        client.log_batch(run.info.run_uuid, metrics, params, tags)

In [None]:

run("0.1", "JupyterLab", tags)


# MLflow for Spark

In [None]:
import mlflow.spark
import os
import shutil
#from pyspark.sql import SparkSession
# Create and persist some dummy data
# Note: On environments like Databricks with pre-created SparkSessions,
# ensure the org.mlflow:mlflow-spark:1.11.0 is attached as a library to
# your cluster
spark = (SparkSession.builder
            .config("spark.jars.packages", "org.mlflow:mlflow-spark:1.11.0")
            .master("local[*]")
            .getOrCreate())
df = spark.createDataFrame([
        (4, "spark i j k"),
        (5, "l m n"),
        (6, "spark hadoop spark"),
        (7, "apache hadoop")], ["id", "text"])
import tempfile
tempdir = tempfile.mkdtemp()
df.write.csv(os.path.join(tempdir, "my-data-path"), header=True)

# Enable Spark datasource autologging.
mlflow.spark.autolog()
loaded_df = spark.read.csv(os.path.join(tempdir, "my-data-path"),
                header=True, inferSchema=True)
# Call toPandas() to trigger a read of the Spark datasource. Datasource info
# (path and format) is logged to the current active run, or the
# next-created MLflow run if no run is currently active
with mlflow.start_run(run_name='spark.autolog sample', tags=tags) as active_run:
    pandas_df = loaded_df.toPandas()

mlflow.end_run()

spark.stop()

# PySpark Hyperparameter Tuning - Cross-Validation


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = (SparkSession.builder
            .config("spark.jars.packages", "org.mlflow:mlflow-spark:1.11.0")
            .master("local[*]")
            .getOrCreate())

mlflow.pyspark.ml.autolog(log_models=True, disable=False, exclusive=False, disable_for_unsupported_versions=False, silent=False, log_post_training_metrics=True)

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

mlflow.start_run(run_name='cross-validation', tags=tags)


# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()



crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)

mlflow.end_run()    
    
spark.stop()

# PySpark Hyperparameter Tuning - Train-Validation Split


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

spark = (SparkSession.builder
            .config("spark.jars.packages", "org.mlflow:mlflow-spark:1.11.0")
            .master("local[*]")
            .getOrCreate())

mlflow.pyspark.ml.autolog(log_models=True, disable=False, exclusive=False, disable_for_unsupported_versions=False, silent=False, log_post_training_metrics=True)

mlflow.start_run(run_name='train-validation split', tags=tags)

# Prepare training and test data.
data = spark.read.format("libsvm")\
    .load("/mnt/code/data/spark/data/mllib/sample_linear_regression_data.txt")
train, test = data.randomSplit([0.9, 0.1], seed=12345)

lr = LinearRegression(maxIter=10)

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
    .select("features", "label", "prediction")\
    .show()

mlflow.end_run()  


spark.stop()

In [None]:
#Call it to end an active run if there are exceptions
mlflow.end_run()