# Imports

In [1]:
import numpy as np
import pandas as pd
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import accuracy_score
from mlflow.models import infer_signature

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 3, Finished, Available)

# Load data

In [2]:
# Load test data
test = spark.sql("SELECT * FROM Silver.test")
test.show(5)

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 4, Finished, Available)

+-----------+---------+----------+----------+-----------+--------+--------+------------+---------+
|OverallQual|GrLivArea|GarageCars|GarageArea|TotalBsmtSF|1stFlrSF|FullBath|TotRmsAbvGrd|YearBuilt|
+-----------+---------+----------+----------+-----------+--------+--------+------------+---------+
|          8|     1418|       3.0|     852.0|     1642.0|    1418|       1|           6|     2010|
|          4|     1362|       3.0|     768.0|     1040.0|    1362|       1|           6|     1957|
|          6|     1521|       3.0|     640.0|      741.0|     780|       1|           8|     1910|
|          5|     1072|       5.0|    1184.0|     1072.0|    1072|       1|           5|     1925|
|          9|     1680|       3.0|    1138.0|     1555.0|    1680|       1|           8|     2009|
+-----------+---------+----------+----------+-----------+--------+--------+------------+---------+
only showing top 5 rows



In [3]:
# Load train data
train = spark.sql("SELECT * FROM Silver.train")
train.show(5)

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 5, Finished, Available)

+---------+-----------+---------+----------+----------+-----------+--------+--------+------------+---------+
|SalePrice|OverallQual|GrLivArea|GarageCars|GarageArea|TotalBsmtSF|1stFlrSF|FullBath|TotRmsAbvGrd|YearBuilt|
+---------+-----------+---------+----------+----------+-----------+--------+--------+------------+---------+
|   206300|          7|     1344|         4|       784|       1344|    1344|       2|           8|     1997|
|   265979|          7|     2640|         4|       864|       1240|    1320|       1|           8|     1880|
|   168000|          4|     1622|         4|      1356|       1249|    1622|       1|           7|     1961|
|   123000|          4|      872|         4|       480|        858|     872|       1|           5|     1971|
|   200000|          5|     2634|         4|       968|       1248|    1338|       2|          12|     1969|
+---------+-----------+---------+----------+----------+-----------+--------+--------+------------+---------+
only showing top 5 

In [4]:
# Convert input data to pandas
train = train.toPandas()
test = test.toPandas()

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 6, Finished, Available)

In [5]:
# Drop Na values for test data
test.dropna(how='any', inplace=True)

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 7, Finished, Available)

# Split - Train, Test data

In [6]:
def split_dataset(dataset, test_ratio=0.30):
  test_indices = np.random.rand(len(dataset)) < test_ratio
  return dataset[~test_indices], dataset[test_indices]

train_ds_pd, valid_ds_pd = split_dataset(train)
print("{} examples in training, {} examples in testing.".format(
    len(train_ds_pd), len(valid_ds_pd)))

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 8, Finished, Available)

1017 examples in training, 441 examples in testing.


In [7]:
y_train = train.SalePrice.values
X_train = train.drop(['SalePrice'], axis=1)
y_test = valid_ds_pd.SalePrice.values
X_test = valid_ds_pd.drop(['SalePrice'], axis=1)

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 9, Finished, Available)

# Training

In [8]:
def train(X_train, X_test, y_train, y_test, max_detph, max_features, n_estimators):
    """
    :X_train: training data
    :X_test: testing data
    :y_train: sale prices for training data
    :y_test: testing data
    :max_detph: int Max tree depth
    :max_features: float percentage of features to use in classification
    :n_estimators: int number of trees to create
    :return: Trained Model
    """
    mod = RandomForestRegressor(
        max_depth=max_depth, max_features=max_features, n_estimators=n_estimators
    )

    mod.fit(X_train, y_train)
    preds = mod.predict(X_test)

    errors = abs(preds - y_test)
    MSE=round(np.mean(errors),2)
    MAPE=100*(errors/y_test)
    accuracy=round (100 - np.mean(MAPE),2)
    print(f'accuracy: {accuracy}')

    output_table = X_test.copy()
    output_table['SalePRice'] = y_test
    output_table['PredictedPrice'] = preds
    output_table = spark.createDataFrame(output_table)
    output_table.write.format("delta").mode('overwrite').save("abfss://HousingPrices@onelake.dfs.fabric.microsoft.com/Gold.Lakehouse/Tables/mlflow_rf_training")

    mlparams = {
        "max_depth": str(max_depth),
        "max_features": str(max_features),
        "n_estimators": str(n_estimators),
    }
    mlflow.log_params(mlparams)

    mlmetrics = {"accuracy": accuracy, "MSE": MSE}
    mlflow.log_metrics(mlmetrics)

    return mod, infer_signature(X_train, y_train)

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 10, Finished, Available)

In [9]:
max_depth = 300
# max_features{“sqrt”, “log2”, None}, int or float, default=1.0, The default of 1.0 is equivalent to bagged trees and more randomness can be achieved by setting smaller values, e.g. 0.3.
max_features = 1.0
n_estimators = 100

experiment_name = "HousingPricesExp"
registered_model_name = f"{experiment_name}-randomforestmodel"
artifact_path = "housing-price-artifact"

mlflow.set_experiment(experiment_name)

with mlflow.start_run(run_name="MLFlowModel") as run:
    model, signature = train(X_train, X_test, y_train, y_test, max_depth, max_features, n_estimators)
    
    mlflow.sklearn.log_model(
        model,
        signature=signature,
        artifact_path=artifact_path,
        registered_model_name=registered_model_name,
    )

    artifact_uri = mlflow.get_artifact_uri(artifact_path=artifact_path)
    
print(artifact_uri)
    #test_prediction = model.predict(test)
    #output_table = test.copy()
    #output_table['PredictedPrice'] = test_prediction
    #output_table = spark.createDataFrame(output_table)
    #output_table.write.format("delta").mode('overwrite').save("abfss://HousingPrices@onelake.dfs.fabric.microsoft.com/Gold.Lakehouse/Tables/mlflow_rf_prediction")

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 11, Finished, Available)

accuracy: 95.44
sds://lake.trident.com/502d5a7b-1acf-4ad8-8b70-01ead5cbd1d3/3bbf8ea2-a690-47fe-a614-9513fc600e7a/63a3bec0-5d3f-4438-83cb-e81676c376a5/artifacts/housing-price-artifact


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]
  inputs = _infer_schema(model_input)
Successfully registered model 'HousingPricesExp-randomforestmodel'.
2023/08/04 09:17:16 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: HousingPricesExp-randomforestmodel, version 5
Created version '5' of model 'HousingPricesExp-randomforestmodel'.


In [11]:
test_prediction = model.predict(test)
print(test_prediction.shape)
output_table = test.copy()
output_table['PredictedPrice'] = test_prediction
output_table = spark.createDataFrame(output_table)
output_table.write.format("delta").mode('overwrite').save("abfss://HousingPrices@onelake.dfs.fabric.microsoft.com/Gold.Lakehouse/Tables/mlflow_rf_prediction")

StatementMeta(, dd692d29-346b-488c-bdc2-81dc22d19d50, 13, Finished, Available)

(1457,)


  [(c, t) for (_, c), t in zip(pdf_slice.iteritems(), arrow_types)]
