In [0]:
sql_df = spark.sql("SELECT square_feet_float, num_rooms_float, age_float, distance_to_city_km_float, price_float FROM kaggle_ml_demo.house_prediction.train_data_house_prediction_float")
display(sql_df)

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

# Prepare features using the aliased columns
feature_columns = ['square_feet_float', 'num_rooms_float', 'age_float', 'distance_to_city_km_float']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
data = assembler.transform(sql_df)

# Split data into training and test sets
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

# Train a Linear Regression model
lr = LinearRegression(featuresCol='features', labelCol='price_float')
lr_model = lr.fit(train_data)

# Evaluate the model
predictions = lr_model.transform(test_data)
evaluator = RegressionEvaluator(labelCol='price_float', predictionCol='prediction', metricName='rmse')
rmse = evaluator.evaluate(predictions)

# Display RMSE
display(spark.createDataFrame([(rmse,)], ['RMSE']))

In [0]:
import mlflow
from mlflow.models.signature import infer_signature

# Set the registry URI to Unity Catalog
mlflow.set_registry_uri("databricks-uc")

# Define the catalog, schema, and model name
catalog_name = "kaggle_ml_demo"
schema_name = "house_prediction"
model_name = "ml_model_house_prediction"
full_model_name = f"{catalog_name}.{schema_name}.{model_name}"

# Infer the model signature
signature = infer_signature(train_data.select(feature_columns).toPandas(), lr_model.transform(train_data).select("prediction").toPandas())

# Log the model with the signature
with mlflow.start_run(nested=True) as run:
    mlflow.spark.log_model(
        lr_model, 
        "model", 
        signature=signature
    )

    # Register the model to Unity Catalog
    model_uri = f"runs:/{run.info.run_id}/model"
    mlflow.register_model(model_uri=model_uri, name=full_model_name)

In [0]:
from pyspark.ml.feature import VectorAssembler

feature_columns = ['square_feet_float', 'num_rooms_float', 'age_float', 'distance_to_city_km_float']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
rf_data = assembler.transform(sql_df)

train_data, test_data = rf_data.randomSplit([0.8, 0.2], seed=42)
display(train_data)
display(test_data)

In [0]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor(featuresCol='features', labelCol='price_float', seed=42)
rf_model = rf.fit(train_data)

rf_predictions = rf_model.transform(test_data)

evaluator = RegressionEvaluator(labelCol='price_float', predictionCol='prediction', metricName='rmse')
rf_rmse = evaluator.evaluate(rf_predictions)

display(rf_predictions.select('features', 'price_float', 'prediction'))
display(spark.createDataFrame([(rf_rmse,)], ['RF_RMSE']))

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Define feature columns and target column
feature_cols = ['square_feet_float', 'num_rooms_float', 'age_float', 'distance_to_city_km_float']
target_col = 'price_float'  # Replace with your actual target column name

# Check if "features" column already exists in the DataFrame
existing_cols = train_data.columns
if "features" in existing_cols:
    train_data = train_data.drop("features")
    test_data = test_data.drop("features")

# Assemble the features into a single vector column
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = assembler.transform(train_data)
test_data = assembler.transform(test_data)

# Train the RandomForestRegressor model
rf = RandomForestRegressor(featuresCol="features", labelCol=target_col, seed=42)
rf_model = rf.fit(train_data)

# Make predictions on the test data
predictions = rf_model.transform(test_data)

# Evaluate the model
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)

# Log the model and metrics using MLflow
with mlflow.start_run(run_name="Basic RF Run", nested=True) as run:
    mlflow.spark.log_model(rf_model, "random_forest_model")
    mlflow.log_metric("mse", mse)

    run_id = run.info.run_id
    experiment_id = run.info.experiment_id

    print(f"Inside MLflow Run with run_id `{run_id}` and experiment_id `{experiment_id}`")

In [0]:
import mlflow
from mlflow.models.signature import infer_signature

# Set the registry URI to Unity Catalog
mlflow.set_registry_uri("databricks-uc")

# Define the catalog, schema, and model name
catalog_name = "kaggle_ml_demo"
schema_name = "house_prediction"
model_name = "ml_model_house_prediction_rf"
full_model_name = f"{catalog_name}.{schema_name}.{model_name}"

# Infer the model signature
signature = infer_signature(train_data.select(feature_columns).toPandas(), rf_model.transform(train_data).select("prediction").toPandas())

# Log the model with the signature
with mlflow.start_run(nested=True) as run:
    mlflow.spark.log_model(
        rf_model, 
        "model", 
        signature=signature
    )

    # Register the model to Unity Catalog
    model_uri = f"runs:/{run.info.run_id}/model"
    mlflow.register_model(model_uri=model_uri, name=full_model_name)

In [0]:
import pandas as pd

# Get feature importances from the trained RandomForest model
importances = rf_model.featureImportances.toArray()
feature_importance_df = pd.DataFrame({
    'feature': feature_columns,
    'importance': importances
}).sort_values('importance', ascending=False)

display(feature_importance_df)

In [0]:
import pandas as pd
import tempfile
import os

# Get feature importances from the trained RandomForest model
importances = rf_model.featureImportances.toArray()
feature_importance_df = pd.DataFrame({
    'feature': feature_columns,
    'importance': importances
}).sort_values('importance', ascending=False)

# Save feature importances to a temporary CSV file
with tempfile.TemporaryDirectory() as tmpdir:
    csv_path = os.path.join(tmpdir, "feature_importance.csv")
    feature_importance_df.to_csv(csv_path, index=False)
    mlflow.log_artifact(csv_path, artifact_path="feature_importance")

In [0]:
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
from hyperopt.pyll.base import scope
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow

search_space = {
    'numTrees': scope.int(hp.quniform('numTrees', 20, 100, 1)),
    'maxDepth': scope.int(hp.quniform('maxDepth', 2, 10, 1)),
    'maxBins': scope.int(hp.quniform('maxBins', 16, 64, 1))
}

def train_rf(params):
    with mlflow.start_run(run_name="Hyperparameter Tuning RF Run", nested=True):
        rf = RandomForestRegressor(
            featuresCol="features",
            labelCol=target_col,
            numTrees=params['numTrees'],
            maxDepth=params['maxDepth'],
            maxBins=params['maxBins'],
            seed=42
        )
        model = rf.fit(train_data)
        preds = model.transform(test_data)
        evaluator = RegressionEvaluator(labelCol=target_col, predictionCol="prediction", metricName="rmse")
        rmse = evaluator.evaluate(preds)
        mlflow.log_params(params)
        mlflow.log_metric("rmse", rmse)
        return {'loss': rmse, 'status': STATUS_OK}

trials = Trials()
best_params = fmin(
    fn=train_rf,
    space=search_space,
    algo=tpe.suggest,
    max_evals=20,
    trials=trials
)

display(best_params)

In [0]:
import mlflow

# Set registry URI to Unity Catalog
mlflow.set_registry_uri("databricks-uc")

# Find the best run (lowest rmse)
runs_df = mlflow.search_runs(order_by=['metrics.rmse ASC', 'start_time DESC'])
best_run = runs_df.iloc[0]
best_run_id = best_run.run_id

In [0]:
artifact_uri=f"runs:/{best_run_id}/model"

In [0]:
print(f"runs:/{best_run_id}/model")

In [0]:
    # Register the model to Unity Catalog
    model_uri = f"runs:/{run.info.run_id}/model"
    mlflow.register_model(model_uri=model_uri, name=full_model_name)

In [0]:
import mlflow
#mlflow.end_run()

Run with UUID d18b9a9e7ad74f42b584773a66e3b742 is already active. To start a new run, first end the current run with mlflow.end_run(). To start a nested run, call start_run with nested=True