In [1]:
import findspark
findspark.init('/home/kris/spark-3.3.0-bin-hadoop3')

from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('lr_model').getOrCreate()

22/07/20 12:02:19 WARN Utils: Your hostname, kris-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
22/07/20 12:02:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/20 12:02:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.ml.regression import RandomForestRegressor, LinearRegression

from sklearn import datasets
from sklearn.metrics import mean_squared_error, r2_score
import mlflow
from math import sqrt
 
import pandas as pd

In [4]:
dataset = datasets.load_diabetes()
df_x =  pd.DataFrame(dataset.data)
df_y = pd.DataFrame(dataset.target, columns = ['target'])
df = pd.concat([df_x, df_y], axis = 1)
df.to_csv('data.csv', index = False)

In [5]:
df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,target
0,0.038076,0.05068,0.061696,0.021872,-0.044223,-0.034821,-0.043401,-0.002592,0.019907,-0.017646,151.0
1,-0.001882,-0.044642,-0.051474,-0.026328,-0.008449,-0.019163,0.074412,-0.039493,-0.068332,-0.092204,75.0
2,0.085299,0.05068,0.044451,-0.00567,-0.045599,-0.034194,-0.032356,-0.002592,0.002861,-0.02593,141.0
3,-0.089063,-0.044642,-0.011595,-0.036656,0.012191,0.024991,-0.036038,0.034309,0.022688,-0.009362,206.0
4,0.005383,-0.044642,-0.036385,0.021872,0.003935,0.015596,0.008142,-0.002592,-0.031988,-0.046641,135.0


In [6]:
data = spark.read.csv('data.csv',
                      inferSchema = True,
                      header= True)



In [7]:
data.printSchema()

root
 |-- 0: double (nullable = true)
 |-- 1: double (nullable = true)
 |-- 2: double (nullable = true)
 |-- 3: double (nullable = true)
 |-- 4: double (nullable = true)
 |-- 5: double (nullable = true)
 |-- 6: double (nullable = true)
 |-- 7: double (nullable = true)
 |-- 8: double (nullable = true)
 |-- 9: double (nullable = true)
 |-- target: double (nullable = true)



In [8]:
for item in data.head(1)[0]:
    print(item)

0.038075906433423026
0.05068011873981862
0.061696206518683294
0.0218723855140367
-0.04422349842444599
-0.03482076283769895
-0.04340084565202491
-0.002592261998183278
0.019907486170462722
-0.01764612515980379
151.0


In [9]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [10]:
data.columns

['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'target']

In [11]:
assembler = VectorAssembler(inputCols = ['0', '1', '2', '3', '4', '5',
                                           '6', '7', '8', '9'],
                           outputCol = 'features')

In [12]:
output = assembler.transform(data)

In [13]:
output.head()

Row(0=0.038075906433423026, 1=0.05068011873981862, 2=0.061696206518683294, 3=0.0218723855140367, 4=-0.04422349842444599, 5=-0.03482076283769895, 6=-0.04340084565202491, 7=-0.002592261998183278, 8=0.019907486170462722, 9=-0.01764612515980379, target=151.0, features=DenseVector([0.0381, 0.0507, 0.0617, 0.0219, -0.0442, -0.0348, -0.0434, -0.0026, 0.0199, -0.0176]))

In [14]:
final_data = output.select('features', 'target')

In [15]:
final_data.show()

+--------------------+------+
|            features|target|
+--------------------+------+
|[0.03807590643342...| 151.0|
|[-0.0018820165277...|  75.0|
|[0.08529890629667...| 141.0|
|[-0.0890629393522...| 206.0|
|[0.00538306037424...| 135.0|
|[-0.0926954778032...|  97.0|
|[-0.0454724779400...| 138.0|
|[0.06350367559055...|  63.0|
|[0.04170844488444...| 110.0|
|[-0.0709002470971...| 310.0|
|[-0.0963280162542...| 101.0|
|[0.02717829108036...|  69.0|
|[0.01628067572730...| 179.0|
|[0.00538306037424...| 185.0|
|[0.04534098333546...| 118.0|
|[-0.0527375548420...| 171.0|
|[-0.0055145549788...| 166.0|
|[0.07076875249259...| 144.0|
|[-0.0382074010379...|  97.0|
|[-0.0273097856849...| 168.0|
+--------------------+------+
only showing top 20 rows



In [16]:
train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [17]:
train_data.describe().show()

+-------+------------------+
|summary|            target|
+-------+------------------+
|  count|               303|
|   mean|153.34653465346534|
| stddev| 76.70098935315586|
|    min|              25.0|
|    max|             332.0|
+-------+------------------+



In [18]:
test_data.describe().show()

+-------+-----------------+
|summary|           target|
+-------+-----------------+
|  count|              139|
|   mean|149.4892086330935|
| stddev|78.15409962695261|
|    min|             39.0|
|    max|            346.0|
+-------+-----------------+



## Model Development

In [19]:
lr = LinearRegression(labelCol = 'target')

In [20]:
lr_model = lr.fit(train_data)

22/07/20 12:02:39 WARN Instrumentation: [0dbe3d7e] regParam is zero, which might cause numerical instability and overfitting.
22/07/20 12:02:39 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/07/20 12:02:39 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/07/20 12:02:39 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [21]:
test_results = lr_model.evaluate(test_data)
test_results.residuals.show()

+-------------------+
|          residuals|
+-------------------+
|  6.077991406765818|
|-12.057857287358203|
|  7.878753437343036|
|  58.42505976720159|
| 25.687452080731916|
|-12.670299409171832|
|-15.172670149890635|
| 30.187824890399526|
| 24.778835349218696|
|-33.257202698089685|
|  70.89622094772128|
| -88.96219412537084|
|-131.26688188382184|
| -52.99157936810451|
|  91.45154842271742|
| 24.220940970961493|
| 24.445749450805536|
| 19.364291662551352|
| -6.106761249435323|
|-22.150789775871786|
+-------------------+
only showing top 20 rows



In [22]:
test_results.rootMeanSquaredError

52.32771395317113

In [23]:
test_results.r2

0.5484605395746912

In [24]:
 test_data.describe().show()

+-------+-----------------+
|summary|           target|
+-------+-----------------+
|  count|              139|
|   mean|149.4892086330935|
| stddev|78.15409962695261|
|    min|             39.0|
|    max|            346.0|
+-------+-----------------+



In [25]:
predictions = lr_model.transform(test_data.select('features'))
predictions.show()

+--------------------+------------------+
|            features|        prediction|
+--------------------+------------------+
|[-0.1072256316073...|133.92200859323418|
|[-0.1035930931563...|  83.0578572873582|
|[-0.0963280162542...| 93.12124656265696|
|[-0.0963280162542...| 75.57494023279841|
|[-0.0926954778032...|59.312547919268084|
|[-0.0926954778032...|109.67029940917183|
|[-0.0854304009012...|58.172670149890635|
|[-0.0781653239991...|111.81217510960047|
|[-0.0781653239991...| 208.2211646507813|
|[-0.0745327855481...|102.25720269808969|
|[-0.0745327855481...|45.103779052278725|
|[-0.0745327855481...|189.96219412537084|
|[-0.0745327855481...|199.26688188382184|
|[-0.0709002470971...|113.99157936810451|
|[-0.0709002470971...|218.54845157728258|
|[-0.0709002470971...| 175.7790590290385|
|[-0.0709002470971...| 79.55425054919446|
|[-0.0636351701951...|164.63570833744865|
|[-0.0600026317441...| 78.10676124943532|
|[-0.0600026317441...|118.15078977587179|
+--------------------+------------

## MLFlow Experiments

In [26]:
from math import exp
import xgboost as xgb
from hyperopt import fmin, hp, tpe, SparkTrials, STATUS_OK
from sklearn.model_selection import train_test_split
import numpy as np
from pyspark.ml.evaluation import RegressionEvaluator

In [27]:
try:
  import mlflow.pyspark.ml
  mlflow.pyspark.ml.autolog()
except:
  print(f"Your version of MLflow ({mlflow.__version__}) does not support pyspark.ml for autologging. To use autologging, upgrade your MLflow client version or use Databricks Runtime for ML 8.3 or above.")

In [29]:
def train_reg(maxDepth, numTrees):
    # Use MLflow to track training.
    with mlflow.start_run(nested=True):
        rfr = RandomForestRegressor(featuresCol="features",
                                    labelCol="target",
                                    maxDepth=maxDepth,
                                    numTrees=numTrees)
        model = rfr.fit(train_data)
    
        pred = model.transform(test_data)
        evaluator_rmse = RegressionEvaluator(
            labelCol="target", predictionCol="prediction", metricName="rmse")
        val_rmse = evaluator_rmse.evaluate(pred)
        evaluator_r2 = RegressionEvaluator(
            labelCol="target", predictionCol="prediction", metricName="r2")
        val_r2 = evaluator_r2.evaluate(pred)
        mlflow.log_metric("val_rmse", val_rmse)
        mlflow.log_metric("val_r2", val_r2)
    return model, val_rmse, val_r2

In [30]:
def train_with_hyperopt(params):
    maxDepth = int(params['maxDepth'])
    numTrees = int(params['numTrees'])
    
    model, val_rmse, val_r2 = train_reg(maxDepth, numTrees)
    loss = -val_r2
    return {'loss': loss, 'status': STATUS_OK}

In [31]:
space = {
  'maxDepth': hp.uniform('max_depth', 2, 4),
  'numTrees': hp.uniform('n_estimators', 10, 15),
}

In [32]:
algo=tpe.suggest
with mlflow.start_run():
    best_params = fmin(
    fn=train_with_hyperopt,
    space=space,
    algo=algo,
    max_evals=8)

The git executable must be specified in one of the following ways:
    - be included in your $PATH
    - be set via $GIT_PYTHON_GIT_EXECUTABLE
    - explicitly set via git.refresh()

All git commands will error until this is rectified.

$GIT_PYTHON_REFRESH environment variable. Use one of the following values:
    - error|e|raise|r|2: for a raised exception

Example:
    export GIT_PYTHON_REFRESH=quiet



  0%|                                    | 0/8 [00:00<?, ?trial/s, best loss=?]

                                                                                

 62%|█████   | 5/8 [02:33<01:22, 27.56s/trial, best loss: -0.46052664915693164]

[Stage 145:>                                                        (0 + 1) / 1]                                                                                

100%|████████| 8/8 [04:09<00:00, 31.22s/trial, best loss: -0.47132825886509655]


In [37]:
!mlflow ui  # mlflow ui launch

[2022-07-20 12:19:24 -0400] [9634] [INFO] Starting gunicorn 20.1.0
[2022-07-20 12:19:24 -0400] [9634] [INFO] Listening at: http://127.0.0.1:5000 (9634)
[2022-07-20 12:19:24 -0400] [9634] [INFO] Using worker: sync
[2022-07-20 12:19:24 -0400] [9636] [INFO] Booting worker with pid: 9636
^C
[2022-07-20 12:20:00 -0400] [9634] [INFO] Handling signal: int
[2022-07-20 12:20:00 -0400] [9636] [INFO] Worker exiting (pid: 9636)


## Final Model

In [34]:
best_params

{'max_depth': 3.1058622797243496, 'n_estimators': 12.118337684837792}

In [36]:
maxDepth = int(best_params['max_depth'])
numTrees = int(best_params['n_estimators'])
model, val_rmse, val_r2 = train_reg(maxDepth, numTrees)  # We can train this on full dataset
print('Validation RMSE:', val_rmse)
print('Validation R2:', val_r2)

[Stage 240:>                                                        (0 + 3) / 3]                                                                                

Validation RMSE: 56.62092443189912
Validation R2: 0.47132825886509655
