In [5]:
from pysparkling.ml import H2OAutoML
from pysparkling import *

from pyspark.sql import SparkSession
import socket
import os

import mlflow
import mlflow.spark

mlflow.set_experiment('MLOps_Experiment')
#mlflow.pyspark.ml.autolog(log_models=False)


def get_sparkSession(appName = 'MLOps'):
    spark_master = os.environ.get('SPARK_MASTER') # "spark://spark-master:7077" 
    driver_host = socket.gethostbyname(socket.gethostname()) # setting driver host is important in k8s mode, ortherwise excutors cannot find diver host

    spark = SparkSession \
        .builder \
        .master(spark_master)\
        .appName(appName) \
        .config("spark.driver.host", driver_host) \
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1') \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
        
    ACCESS_KEY = os.environ.get('AWS_ACCESS_KEY_ID')
    SECRET_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
    MLFLOW_S3_ENDPOINT_URL = os.environ.get('MLFLOW_S3_ENDPOINT_URL')

    hadoopConf = spark.sparkContext._jsc.hadoopConfiguration()
    hadoopConf.set('fs.s3a.access.key', ACCESS_KEY)
    hadoopConf.set('fs.s3a.secret.key', SECRET_KEY)
    hadoopConf.set("fs.s3a.endpoint", MLFLOW_S3_ENDPOINT_URL)
    hadoopConf.set('fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    hadoopConf.set("fs.s3a.connection.ssl.enabled", "true")
    hadoopConf.set("fs.s3a.path.style.access", 'true')

    return spark



def clean_impute_dataframe(spark, file_uri, keep_cols, impute_cols, impute_strategy = "median"):
    
    raw_df = spark.read.csv(file_uri ,header="true", inferSchema="true", multiLine="true", escape='"')
    base_df = raw_df.select(*keep_cols)

    from pyspark.sql.functions import col, translate, when
    from pyspark.sql.types import IntegerType

    #cast datatypes into doubles & simply remove outliers with price beyond normal ranges
    doubles_df= base_df.withColumn("price", translate(col("price"), "$,", "").cast("double")) \
                            .filter(col("price") > 0).filter(col("minimum_nights") <= 365)

    integer_columns = [x.name for x in doubles_df.schema.fields if x.dataType == IntegerType()]

    for c in integer_columns:
        doubles_df = doubles_df.withColumn(c, col(c).cast("double"))

    for c in impute_cols:
        doubles_df = doubles_df.withColumn(c + "_na", when(col(c).isNull(), 1.0).otherwise(0.0))    

    from pyspark.ml.feature import Imputer
    imputer = Imputer(strategy=impute_strategy, inputCols=impute_cols, outputCols=impute_cols)
    imputer_model = imputer.fit(doubles_df)
    imputed_df = imputer_model.transform(doubles_df)

    return imputed_df




if __name__ == "__main__":

    file_uri = "s3://mlops/sf-listings.csv"


    keep_cols = [
        "host_is_superhost",
        "cancellation_policy",
        "instant_bookable",
        "host_total_listings_count",
        "neighbourhood_cleansed",
        "latitude",
        "longitude",
        "property_type",
        "room_type",
        "accommodates",
        "bathrooms",
        "bedrooms",
        "beds",
        "bed_type",
        "review_scores_rating",
        "review_scores_accuracy",
        "review_scores_cleanliness",
        "review_scores_checkin",
        "review_scores_communication",
        "review_scores_location",
        "review_scores_value",
        "price"
    ]



    impute_cols = [
        "bedrooms",
        "bathrooms",
        "beds", 
        "review_scores_rating",
        "review_scores_accuracy",
        "review_scores_cleanliness",
        "review_scores_checkin",
        "review_scores_communication",
        "review_scores_location",
        "review_scores_value"
    ]


    spark = get_sparkSession(appName = 'MLOps')
    imputed_df = clean_impute_dataframe(spark, file_uri, keep_cols, impute_cols, impute_strategy = "median")
    train_df, test_df = imputed_df.randomSplit([.8, .2] , seed=42)

    hc = H2OContext.getOrCreate()
    with mlflow.start_run(run_name="H2O-autoML") as run:
        
        automl = H2OAutoML(labelCol="price", convertUnknownCategoricalLevelsToNa=True)
        automl.setExcludeAlgos(["GLM","DeepLearning"])
        automl.setMaxModels(3)
        automl.setSortMetric("rmse")

        model = automl.fit(train_df)
        from pyspark.ml.evaluation import RegressionEvaluator

        pred_df = model.transform(test_df)
        regression_evaluator = RegressionEvaluator(labelCol='price', predictionCol="prediction")
        rmse = regression_evaluator.setMetricName("rmse").evaluate(pred_df)
        r2 = regression_evaluator.setMetricName("r2").evaluate(pred_df)


        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.spark.log_model(model, 'model')
    
    spark.stop()
    

:: loading settings :: url = jar:file:/spark-3.3.0-bin-hadoop3/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-78e4ff0a-7f28-4d49-a30c-b592b72b68bb;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-aws;3.3.1!hadoop-aws.jar (490ms)
downloading https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar ...
	[SUCCESSFUL ] com.amazonaws#aws-java-sdk-bundle;1.11.901!aws-java-sdk-bundle.jar (80787ms)
downloading https://repo1.maven.org/maven2/org/wildfly/openssl/wildfly-openssl/1.0.7.Final/wildfly-openssl-1.0.7.Final.jar ...
	[SUCCESSFUL ] org.wildfly.o

22/11/03 20:59:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

Connecting to H2O server at http://192.168.224.11:54321 ... successful.


0,1
H2O_cluster_uptime:,12 secs
H2O_cluster_timezone:,Etc/GMT
H2O_data_parsing_timezone:,UTC
H2O_cluster_version:,3.38.0.2
H2O_cluster_version_age:,"7 days, 5 hours and 3 minutes"
H2O_cluster_name:,sparkling-water-root_app-20221103205921-0001
H2O_cluster_total_nodes:,2
H2O_cluster_free_memory:,2 Gb
H2O_cluster_total_cores:,8
H2O_cluster_allowed_cores:,8



Sparkling Water Context:
 * Sparkling Water Version: 3.38.0.2-1-3.3
 * H2O name: root
 * cluster size: 2
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (0,192.168.224.8,54321)
  (1,192.168.224.9,54321)
  ------------------------

  Open H2O Flow in browser: http://192.168.224.11:54321 (CMD + click in Mac OSX)

    


                                                                                

|██████████████████████████████████████████████████| 100%
5 models trained. For more details use the getLeaderboard() method on the AutoML object.
Returning leader model and printing info about it below.
Model Details
H2OStackedEnsemble
Model Key: StackedEnsemble_AllModels_1_AutoML_1_20221103_205953_c9831601701f

Model summary

Training metrics
RMSLE: 0.3566731304299686
Nobs: 5780.0
RMSE: 228.51200608399907
ResidualDeviance: 3.0181851942380434E8
NullDeviance: 6.13603612626122E8
MAE: 63.67301663622237
MeanResidualDeviance: 52217.73692453362
ScoringTime: 1.667509218775E12
MSE: 52217.73692453362
R2: 0.508121345420263
NullDegreesOfFreedom: 5779.0
AIC: 79202.09541686333
ResidualDegreesOfFreedom: 5776.0

Cross validation metrics
RMSLE: 0.4530675802818606
Nobs: 5780.0
RMSE: 291.07635394398284
ResidualDeviance: 4.897130653103657E8
NullDeviance: 6.138676754658073E8
MAE: 80.87444317788187
MeanResidualDeviance: 84725.44382532279
ScoringTime: 1.667509218664E12
MSE: 84725.44382532279
R2: 0.20190648



In [None]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .master(spark_master)\
    .appName(appName) \
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1') \
    .getOrCreate()

In [1]:
!pyspark --packages org.apache.hadoop:hadoop-aws:3.3.1

Python 3.9.7 (default, Oct 16 2021, 10:16:06) 
[GCC 10.2.1 20210110] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/spark-3.3.0-bin-hadoop3/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9975c2f2-4a67-416a-bbcb-f9c68339a7cf;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
downloading https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar ...
	[SUCCESSFUL ] org.apache.hadoop#hadoop-aws;3.3.1!hadoop-aws.jar (954ms)
downloading https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-jav

In [5]:
wget -P /root/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.3.1.jar https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar
wget -P /root/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.901.jar https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.901/aws-java-sdk-bundle-1.11.901.jar
wget -P /root/.ivy2/jars/org.wildfly.openssl_wildfly-openssl-1.0.7.Final.jar https://repo1.maven.org/maven2/org/wildfly/openssl/wildfly-openssl/1.0.7.Final/wildfly-openssl-1.0.7.Final.jar

--2022-11-03 18:28:28--  https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.1/hadoop-aws-3.3.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.36.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.36.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 870644 (850K) [application/java-archive]
Saving to: ‘/root/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.3.1.jar/hadoop-aws-3.3.1.jar’


2022-11-03 18:28:30 (1.01 MB/s) - ‘/root/.ivy2/jars/org.apache.hadoop_hadoop-aws-3.3.1.jar/hadoop-aws-3.3.1.jar’ saved [870644/870644]



In [23]:
!ls /spark-3.3.0-bin-hadoop3/jars

HikariCP-2.5.1.jar
JLargeArrays-1.5.jar
JTransforms-3.1.jar
RoaringBitmap-0.9.25.jar
ST4-4.0.4.jar
activation-1.1.1.jar
aircompressor-0.21.jar
algebra_2.12-2.0.1.jar
annotations-17.0.0.jar
antlr-runtime-3.5.2.jar
antlr4-runtime-4.8.jar
aopalliance-repackaged-2.6.1.jar
arpack-2.2.1.jar
arpack_combined_all-0.1.jar
arrow-format-7.0.0.jar
arrow-memory-core-7.0.0.jar
arrow-memory-netty-7.0.0.jar
arrow-vector-7.0.0.jar
audience-annotations-0.5.0.jar
automaton-1.11-8.jar
avro-1.11.0.jar
avro-ipc-1.11.0.jar
avro-mapred-1.11.0.jar
blas-2.2.1.jar
bonecp-0.8.0.RELEASE.jar
breeze-macros_2.12-1.2.jar
breeze_2.12-1.2.jar
cats-kernel_2.12-2.1.1.jar
chill-java-0.10.0.jar
chill_2.12-0.10.0.jar
commons-cli-1.5.0.jar
commons-codec-1.15.jar
commons-collections-3.2.2.jar
commons-collections4-4.4.jar
commons-compiler-3.0.16.jar
commons-compress-1.21.jar
commons-crypto-1.1.0.jar
commons-dbcp-1.4.jar
commons-io-2.11.0.jar
commons-lang-2.6.jar
commons-lang3-3.12.0.jar
c

In [15]:
!cat /spark-3.3.0-bin-hadoop3/conf/spark-env.sh.template

#!/usr/bin/env bash

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# This file is sourced when running various Spark programs.
# Copy it as spark-env.sh and edit that to configure Spark for your site.

# Options read when launching program

In [3]:
from pyspark.sql import SparkSession
import socket
import os

import mlflow
import mlflow.spark
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, GeneralizedLinearRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

from hyperopt import fmin, tpe, Trials, hp
import numpy as np

import pandas as pd
import matplotlib.pyplot as plt
from pysparkling import *

mlflow.set_experiment('MLOps_Experiment')
mlflow.pyspark.ml.autolog(log_models=False)


def get_sparkSession(appName = 'MLOps'):
    spark_master = os.environ.get('SPARK_MASTER') # "spark://spark-master:7077" 
    driver_host = socket.gethostbyname(socket.gethostname()) # setting driver host is important in k8s mode, ortherwise excutors cannot find diver host

    spark = SparkSession \
        .builder \
        .master(spark_master)\
        .appName(appName) \
        .config("spark.driver.host", driver_host) \
        .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1') \
        .getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")
            
    ACCESS_KEY = 'admin'
    SECRET_KEY = 'sample_key'
    MLFLOW_S3_ENDPOINT_URL = 'http://s3:9000'

    hadoopConf = spark.sparkContext._jsc.hadoopConfiguration()
    hadoopConf.set('fs.s3a.access.key', ACCESS_KEY)
    hadoopConf.set('fs.s3a.secret.key', SECRET_KEY)
    hadoopConf.set("fs.s3a.endpoint", MLFLOW_S3_ENDPOINT_URL)
    hadoopConf.set('fs.s3.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    hadoopConf.set("fs.s3a.connection.ssl.enabled", "true")
    hadoopConf.set("fs.s3a.path.style.access", 'true')

    return spark



def clean_impute_dataframe(spark, file_uri, keep_cols, impute_cols, impute_strategy = "median"):
    
    raw_df = spark.read.csv(file_uri ,header="true", inferSchema="true", multiLine="true", escape='"')
    base_df = raw_df.select(*keep_cols)

    from pyspark.sql.functions import col, translate, when
    from pyspark.sql.types import IntegerType

    #cast datatypes into doubles & simply remove outliers with price beyond normal ranges
    doubles_df= base_df.withColumn("price", translate(col("price"), "$,", "").cast("double")) \
                            .filter(col("price") > 0).filter(col("minimum_nights") <= 365)

    integer_columns = [x.name for x in doubles_df.schema.fields if x.dataType == IntegerType()]

    for c in integer_columns:
        doubles_df = doubles_df.withColumn(c, col(c).cast("double"))

    for c in impute_cols:
        doubles_df = doubles_df.withColumn(c + "_na", when(col(c).isNull(), 1.0).otherwise(0.0))    

    from pyspark.ml.feature import Imputer
    imputer = Imputer(strategy=impute_strategy, inputCols=impute_cols, outputCols=impute_cols)
    imputer_model = imputer.fit(doubles_df)
    imputed_df = imputer_model.transform(doubles_df)

    return imputed_df



def run_H2OXGBoost(imputed_df, labelCol="price"):
    
    train_df, test_df = imputed_df.randomSplit([.8, .2] , seed=42)

    
    
    
    
    #######################
    conf = H2OConf().setExternalClusterMode().setH2OCluster("sparkling-water", 54321)\
                    .setClientIp("172.22.0.13").setCloudName("test")
    
    hc = H2OContext.getOrCreate(conf)
    #################################
    
    
    
    from pysparkling.ml import H2OGBMRegressor
    from pyspark.sql.functions import exp, col, log
    train_df, test_df = imputed_df.withColumn("label", col("price")).randomSplit([.8, .2], seed=42)


    with mlflow.start_run(run_name="Xgboost") as run:
        # Create pipeline
        categorical_cols = [field for (field, dataType) in train_df.dtypes if dataType == "string"]
        index_output_cols = [x + "Index" for x in categorical_cols]
        string_indexer = StringIndexer(inputCols=categorical_cols, outputCols=index_output_cols, handleInvalid="skip")    
        numeric_cols = [field for (field, dataType) in train_df.dtypes if ((dataType == "double") & (field != "price"))]
        assembler_inputs = index_output_cols + numeric_cols
        vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")


        #params = {"n_estimators": 100, "learning_rate": 0.1, "max_depth": 4, "random_state": 42, "missing": 0}
        xgboost = H2OGBMRegressor(labelCol = "price" )
        stages = [string_indexer, vec_assembler, xgboost]

        pipeline = Pipeline(stages=stages)
        pipeline_model = pipeline.fit(train_df)

        # Log pipeline
        mlflow.spark.log_model(pipeline_model, "model", input_example=train_df.limit(5).toPandas())

        # Log parameter
        mlflow.log_param("label", "price")
        mlflow.log_param("features", "all_features")

        # Create predictions and metrics
        pred_df = pipeline_model.transform(test_df)
        regression_evaluator = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="rmse")
        rmse = regression_evaluator.setMetricName("rmse").evaluate(pred_df)
        r2 = regression_evaluator.setMetricName("r2").evaluate(pred_df)

        # Log both metrics
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.spark.log_model(pipeline_model, "h2o_model")



if __name__ == "__main__":

    file_uri = "s3://mlops/sf-listings.csv"


    keep_cols = [
        "host_is_superhost",
        "cancellation_policy",
        "instant_bookable",
        "host_total_listings_count",
        "neighbourhood_cleansed",
        "latitude",
        "longitude",
        "property_type",
        "room_type",
        "accommodates",
        "bathrooms",
        "bedrooms",
        "beds",
        "bed_type",
        "review_scores_rating",
        "review_scores_accuracy",
        "review_scores_cleanliness",
        "review_scores_checkin",
        "review_scores_communication",
        "review_scores_location",
        "review_scores_value",
        "price"
    ]



    impute_cols = [
        "bedrooms",
        "bathrooms",
        "beds", 
        "review_scores_rating",
        "review_scores_accuracy",
        "review_scores_cleanliness",
        "review_scores_checkin",
        "review_scores_communication",
        "review_scores_location",
        "review_scores_value"
    ]


    spark = get_sparkSession(appName = 'MLOps')
    imputed_df = clean_impute_dataframe(spark, file_uri, keep_cols, impute_cols, impute_strategy = "median")
    
    run_H2OXGBoost(imputed_df, labelCol="price")
#     run_RandomForestCV(imputed_df, maxBins=40, labelCol="price")
#     run_RandomForest_Hyperopt(imputed_df, maxBins=40, labelCol="price")

2022/11/04 15:35:01 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '5c589177c61f447fa4f01e223def2c3a', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


|██████████████████████████████████████████████████| 100%


WARN: Dropping bad and constant columns: [review_scores_location_na, features.26, bedrooms_na, features.25, features.28, review_scores_accuracy_na, features.27, features.22, features.21, features.24, features.23, review_scores_checkin_na, features.29, review_scores_value_na, review_scores_rating_na, beds_na, review_scores_communication_na, bathrooms_na, review_scores_cleanliness_na, features.30] (field name: train)


Model Details
H2OGBM
Model Key: GBM_5223981cc4a7

Model summary
Number of Trees: 50
Number of Internal Trees: 50
Model Size in Bytes: 12629
Min. Depth: 5
Max. Depth: 5
Mean Depth: 5.0
Min. Leaves: 7
Max. Leaves: 17
Mean Leaves: 14.94

Training metrics
RMSLE: 0.04744695329165878
Nobs: 5780.0
RMSE: 95.03667085054103
MAE: 5.7693701874426075
MeanResidualDeviance: 9031.968806354076
ScoringTime: 1.667576104383E12
MSE: 9031.968806354076
R2: 0.91492100335378

More info available using methods like:
getFeatureImportances(), getScoringHistory(), getCrossValidationScoringHistory()


