In [6]:
import os
import sys
sys.path.append("/mnt/code")

## Spark and Mlflow

This is an example of using Spark with Mlflow. It currently *(upto 5.7) only works for on-demand Spark*.

**Starting 5.8, it will work for external spark as well.**

In [7]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m46.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488496 sha256=85da07e8a39ab48b267f0b70541bb048e184b5a3de18de354f7df6910ad419be
  Stored in directory: /home/ubuntu/.cache/pip/wheels/92/09/11/aa01d01a7f005fda8a66ad71d2be7f8aa341bddafb27eee3c7
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.1
Note:

In [8]:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.session import SparkSession

In [9]:
import os
import mlflow
from mlflow.entities import Param,Metric,RunTag

In [10]:
experiment_name = 'SPARK'+'-' + os.environ['DOMINO_STARTING_USERNAME'] + '-' + os.environ['DOMINO_PROJECT_NAME']
model_name = 'SPARK'+'-' + os.environ['DOMINO_PROJECT_NAME']
print("experiment_name:", experiment_name)
mlflow.set_experiment(experiment_name)

client = mlflow.tracking.MlflowClient()
experiment_id = client.get_experiment_by_name(experiment_name).experiment_id
print("experiment_id:", experiment_id)

import time
now = round(time.time())
time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(now))
tags={}

2024/08/07 16:46:45 INFO mlflow.tracking.fluent: Experiment with name 'SPARK-integration-test-Experiment-Manager-Demos' does not exist. Creating a new experiment.


experiment_name: SPARK-integration-test-Experiment-Manager-Demos
experiment_id: 23


# HELLO WORLD

In [11]:
def run(alpha, run_origin, tags={}):
    with mlflow.start_run(run_name=run_origin,tags=tags) as run:
        print("runId:", run.info.run_uuid)
        print("artifact_uri:", mlflow.get_artifact_uri())
        print("alpha:", alpha)
        print("run_origin:", run_origin)
        mlflow.log_param("alpha", alpha)
        mlflow.log_metric("rmse", 0.789)
        mlflow.set_tag("run_origin", run_origin)
        with open("info.txt", "w") as f:
            f.write("Hi artifact")
        mlflow.log_artifact("info.txt")
        params = [ Param("p1","0.1"), Param("p2","0.2") ]
        metrics = [ Metric("m1",0.1,now,0), Metric("m2",0.2,now,0) ]
        tags = [ RunTag("t1","hi1"), RunTag("t2","hi2") ]
        client.log_batch(run.info.run_uuid, metrics, params, tags)

In [12]:

run("0.1", "JupyterLab", tags)


runId: 3bb7883b53414995a48e273fa48450c1
artifact_uri: mlflow-artifacts:/mlflow/3bb7883b53414995a48e273fa48450c1/artifacts
alpha: 0.1
run_origin: JupyterLab


# MLflow for Spark

In [13]:
import mlflow.spark
import os
import shutil
#from pyspark.sql import SparkSession
# Create and persist some dummy data
# Note: On environments like Databricks with pre-created SparkSessions,
# ensure the org.mlflow:mlflow-spark:1.11.0 is attached as a library to
# your cluster
spark = (SparkSession.builder
            .config("spark.jars.packages", "org.mlflow:mlflow-spark:1.11.0")
            .master("local[*]")
            .getOrCreate())
df = spark.createDataFrame([
        (4, "spark i j k"),
        (5, "l m n"),
        (6, "spark hadoop spark"),
        (7, "apache hadoop")], ["id", "text"])
import tempfile
tempdir = tempfile.mkdtemp()
df.write.csv(os.path.join(tempdir, "my-data-path"), header=True)

# Enable Spark datasource autologging.
mlflow.spark.autolog()
loaded_df = spark.read.csv(os.path.join(tempdir, "my-data-path"),
                header=True, inferSchema=True)
# Call toPandas() to trigger a read of the Spark datasource. Datasource info
# (path and format) is logged to the current active run, or the
# next-created MLflow run if no run is currently active
with mlflow.start_run(run_name='spark.autolog sample', tags=tags) as active_run:
    pandas_df = loaded_df.toPandas()

mlflow.end_run()

spark.stop()

:: loading settings :: url = jar:file:/opt/conda/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
org.mlflow#mlflow-spark added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b81ed4d1-0346-4acb-b381-3f620705d97a;1.0
	confs: [default]
	found org.mlflow#mlflow-spark;1.11.0 in central
	found org.slf4j#slf4j-api;1.7.25 in central
downloading https://repo1.maven.org/maven2/org/mlflow/mlflow-spark/1.11.0/mlflow-spark-1.11.0.jar ...
	[SUCCESSFUL ] org.mlflow#mlflow-spark;1.11.0!mlflow-spark.jar (103ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.25/slf4j-api-1.7.25.jar ...
	[SUCCESSFUL ] org.slf4j#slf4j-api;1.7.25!slf4j-api.jar (24ms)
:: resolution report :: resolve 1982ms :: artifacts dl 130ms
	:: modules in use:
	org.mlflow#mlflow-spark;1.11.0 from central in [default]
	org.slf4j#slf4j-api;1.7.25 from central in [default]
	---------------------------------------------------------------------
	|                  |       

# PySpark Hyperparameter Tuning - Cross-Validation


In [14]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = (SparkSession.builder
            .config("spark.jars.packages", "org.mlflow:mlflow-spark:1.11.0")
            .master("local[*]")
            .getOrCreate())

mlflow.pyspark.ml.autolog(log_models=True, disable=False, exclusive=False, disable_for_unsupported_versions=False, silent=False, log_post_training_metrics=True)

# Prepare training documents, which are labeled.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0),
    (4, "b spark who", 1.0),
    (5, "g d a y", 0.0),
    (6, "spark fly", 1.0),
    (7, "was mapreduce", 0.0),
    (8, "e spark program", 1.0),
    (9, "a e c l", 0.0),
    (10, "spark compile", 1.0),
    (11, "hadoop software", 0.0)
], ["id", "text", "label"])

mlflow.start_run(run_name='cross-validation', tags=tags)


# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# We now treat the Pipeline as an Estimator, wrapping it in a CrossValidator instance.
# This will allow us to jointly choose parameters for all Pipeline stages.
# A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 3 values for hashingTF.numFeatures and 2 values for lr.regParam,
# this grid will have 3 x 2 = 6 parameter settings for CrossValidator to choose from.
paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()



crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  # use 3+ folds in practice

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "mapreduce spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    print(row)

mlflow.end_run()    
    
spark.stop()

24/08/07 16:47:32 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/08/07 16:48:55 ERROR Instrumentation: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "mlflow-artifacts"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:673)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.Pipeline

Row(id=4, text='spark i j k', probability=DenseVector([0.2665, 0.7335]), prediction=1.0)
Row(id=5, text='l m n', probability=DenseVector([0.9204, 0.0796]), prediction=0.0)
Row(id=6, text='mapreduce spark', probability=DenseVector([0.4438, 0.5562]), prediction=1.0)
Row(id=7, text='apache hadoop', probability=DenseVector([0.8587, 0.1413]), prediction=0.0)


# PySpark Hyperparameter Tuning - Train-Validation Split


In [15]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

spark = (SparkSession.builder
            .config("spark.jars.packages", "org.mlflow:mlflow-spark:1.11.0")
            .master("local[*]")
            .getOrCreate())

mlflow.pyspark.ml.autolog(log_models=True, disable=False, exclusive=False, disable_for_unsupported_versions=False, silent=False, log_post_training_metrics=True)

mlflow.start_run(run_name='train-validation split', tags=tags)

# Prepare training and test data.
data = spark.read.format("libsvm")\
    .load("/mnt/code/data/spark/data/mllib/sample_linear_regression_data.txt")
train, test = data.randomSplit([0.9, 0.1], seed=12345)

lr = LinearRegression(maxIter=10)

# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

# In this case the estimator is simply the linear regression.
# A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
tvs = TrainValidationSplit(estimator=lr,
                           estimatorParamMaps=paramGrid,
                           evaluator=RegressionEvaluator(),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
    .select("features", "label", "prediction")\
    .show()

mlflow.end_run()  


spark.stop()

24/08/07 16:49:20 WARN LibSVMFileFormat: 'numFeatures' option not specified, determining the number of features by going though the input. If you know the number in advance, please specify it via 'numFeatures' option to avoid the extra scan.
24/08/07 16:49:22 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/08/07 16:49:57 ERROR Instrumentation: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "mlflow-artifacts"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.ml.u

+--------------------+--------------------+--------------------+
|            features|               label|          prediction|
+--------------------+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...| -17.026492264209548|  -1.780062242348691|
|(10,[0,1,2,3,4,5,...|  -16.71909683360509| -0.1893325701092588|
|(10,[0,1,2,3,4,5,...| -15.375857723312297|  0.7252323736487188|
|(10,[0,1,2,3,4,5,...| -13.772441561702871|  3.2696413241677718|
|(10,[0,1,2,3,4,5,...| -13.039928064104615| 0.18817684046065775|
|(10,[0,1,2,3,4,5,...|   -9.42898793151394|  -3.449987079269568|
|(10,[0,1,2,3,4,5,...|    -9.2679651250406|-0.33109075490696316|
|(10,[0,1,2,3,4,5,...|  -9.173693798406978|-0.42727135281551937|
|(10,[0,1,2,3,4,5,...| -7.1500991588127265|   2.936884251408867|
|(10,[0,1,2,3,4,5,...|  -6.930603551528371|-0.02839768193150...|
|(10,[0,1,2,3,4,5,...|  -6.456944198081549| -0.9224776887934015|
|(10,[0,1,2,3,4,5,...| -3.2843694575334834| -1.0821208483033875|
|(10,[0,1,2,3,4,5,...|   

In [None]:
#Call it to end an active run if there are exceptions
mlflow.end_run()