In [0]:
import os
import tarfile

import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy.stats import randint
from six.moves import urllib
from sklearn.ensemble import RandomForestRegressor
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.model_selection import (
    GridSearchCV,
    RandomizedSearchCV,
    StratifiedShuffleSplit,
    train_test_split,
)
from sklearn.tree import DecisionTreeRegressor

DOWNLOAD_ROOT = "https://raw.githubusercontent.com/ageron/handson-ml2/master/"
HOUSING_PATH = DOWNLOAD_ROOT + os.path.join("datasets", "housing")
HOUSING_URL = DOWNLOAD_ROOT + "datasets/housing/housing.tgz"



In [0]:
def fetch_housing_data(housing_url=HOUSING_URL, housing_path=HOUSING_PATH):
    os.makedirs(housing_path, exist_ok=True)
    tgz_path = os.path.join(housing_path, "housing.tgz")
    urllib.request.urlretrieve(housing_url, tgz_path)
    housing_tgz = tarfile.open(tgz_path)
    housing_tgz.extractall(path=housing_path)
    housing_tgz.close()


def load_housing_data(housing_path=HOUSING_PATH):
    csv_path = os.path.join(housing_path, "housing.csv")
    return pd.read_csv(csv_path)


housing = load_housing_data()

In [0]:
housing.head()

Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor

# COMMAND ----------

housing = spark.createDataFrame(housing)
#Check for null values
from pyspark.sql.functions import isnan, when, count, col,isnull

housing = housing.na.fill(value=0)

nullDF=housing.select([count(when(isnull(c), c)).alias(c) for c in housing.columns])

display(nullDF)

longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,0,0,0,0,0,0,0,0,0


In [0]:
housing.dtypes

Out[12]: [('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double'),
 ('ocean_proximity', 'string')]

In [0]:
#Print the dataframe schema
housing.printSchema()

# COMMAND ----------

# Filter for just numeric columns (and exclude median_house_value, our label)
numericCols = [field for (field, dataType) in housing.dtypes if (((dataType == "int") or (dataType == "double")) & (field != "median_house_value"))]

# Combine output of StringIndexer defined above and numeric columns
#assemblerInputs = indexOutputCols + numericCols

assemblerInputs = numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

#Split the data in train and test data
(trainDF, testDF) = housing.randomSplit([.8, .2], seed=42)

rfw = RandomForestRegressor(labelCol="median_house_value", featuresCol="features", numTrees=10, seed=42)

# Combine stages into pipeline
stages = [vecAssembler, rfw]

pipeline = Pipeline(stages=stages)

# Train model with Training Data
pipelineModel = pipeline.fit(trainDF)

#Scoring the test data

predDF = pipelineModel.transform(testDF)

root
 |-- longitude: double (nullable = false)
 |-- latitude: double (nullable = false)
 |-- housing_median_age: double (nullable = false)
 |-- total_rooms: double (nullable = false)
 |-- total_bedrooms: double (nullable = false)
 |-- population: double (nullable = false)
 |-- households: double (nullable = false)
 |-- median_income: double (nullable = false)
 |-- median_house_value: double (nullable = false)
 |-- ocean_proximity: string (nullable = true)



In [0]:
# run_id = run.info.run_id
# run_id

Out[19]: 'd89020e170db445e9caf399eab9c634e'

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

eval_rmse = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse")

rmse = eval_rmse.evaluate(predDF)

print(rmse)

registry_uri="databricks://rmr:rmr"

# Use ML Flow to register the experiments in the local registry and register the trained model in the shared registry
import mlflow
import mlflow.spark

mlflow.set_experiment("/SparkML")

with mlflow.start_run(run_name="SparkML-Test") as run:

  # Log the algorithm parameter num_trees to the run
  mlflow.log_param('num_trees', 10)


  # Train model with Training Data
  pipelineModel = pipeline.fit(trainDF)

  #Scoring the test data
  predDF = pipelineModel.transform(testDF)
  
  #Evaluate the accuracy metrics
  eval_rmse = RegressionEvaluator(labelCol="median_house_value", predictionCol="prediction", metricName="rmse")

  rmse = eval_rmse.evaluate(predDF)

  print('RMSE is', str(rmse))
  mlflow.log_param('rmse', rmse)
  

  # Log model
  print("Stage: Log Model Pipeline - Status: Started")
  mlflow.spark.log_model(pipelineModel, "model")
  print("Stage: Log Model Pipeline - Status: Complete")

  run_id = mlflow.active_run().info.run_id



# COMMAND ----------

# MAGIC %md
# MAGIC 
# MAGIC Register the model in the shared registry

# COMMAND ----------

mlflow.set_registry_uri(registry_uri)

# COMMAND ----------

#Register the mode with name: SparkModel
# run_id = run.info.run_id
# model_uri = f"runs:/{run_id}/model"
# model_details = mlflow.register_model(model_uri=model_uri, name="SparkModel")

74048.90538268468


2023/05/24 06:34:31 INFO mlflow.tracking.fluent: Experiment with name '/SparkML' does not exist. Creating a new experiment.


RMSE is 74048.90538268468
Stage: Log Model Pipeline - Status: Started


2023/05/24 06:34:43 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Stage: Log Model Pipeline - Status: Complete


In [0]:
# artifact_path = "model"
# model_uri = "runs:/{run_id}/{artifact_path}".format(run_id=run_id, artifact_path=artifact_path)
 
# model_details = mlflow.register_model(model_uri=model_uri, name="SparkModel")

# # COMMAND ----------

# #Set the status of the model to "Staging" as it's ready to be tested in the test environment


# from mlflow.tracking.client import MlflowClient

# client = MlflowClient()
# model_version_details = client.get_model_version(name="SparkModel", version=1)
# model_version_details.status


# # COMMAND ----------

# client.transition_model_version_stage(
#   name=model_details.name,
#   version=model_details.version,
#   stage="Staging",
# )