In [1]:
import os
databricks = "DATABRICKS_RUNTIME_VERSION" in os.environ

### Initialize Local

In [2]:
if not databricks:

    from pyspark.sql import SparkSession
        
    # Set up environment variables (optional, but sometimes needed)
    os.environ["PYSPARK_PYTHON"] = "python3"
    os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-11"
    
    # Initialize a Spark session
    spark = SparkSession.builder \
    .appName("LocalPySpark") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.execution.arrow.enabled", "true") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .getOrCreate()

### Initialize Databricks

In [13]:
if databricks:
    dbutils.widgets.text("city", "amsterdam") 
    city = dbutils.widgets.get("city")
    dbutils.widgets.dropdown("train_regression_tree", "false", ["true", "false"], "train regression-tree")
    dbutils.widgets.dropdown("train_neural_network", "false", ["true", "false"], "train neural-network")
    train_regression_tree = dbutils.widgets.get("train_regression_tree").lower() == "true"
    train_neural_network = dbutils.widgets.get("train_neural_network").lower() == "true"
else:
    city = "amsterdam"
    train_regression_tree = True
    train_neural_network = True

### Downloads and imports

In [4]:
#%pip install sentence-transformers

In [5]:
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_absolute_error, r2_score
from delta.tables import DeltaTable
from sentence_transformers import SentenceTransformer
import torch
import torch.nn as nn
import torch.optim as optim
import mlflow.pytorch
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder, MultiLabelBinarizer
import importlib
import urllib.parse
from pathlib import Path, PureWindowsPath
import json
from mlflow.models import infer_signature
from utils import embed, one_hot_encode, multi_hot_encode
from models import SklearnRegressorPreprocessWrapper, SimpleNNRegressor, TorchNNPreprocessWrapper

  from .autonotebook import tqdm as notebook_tqdm


### Prepocess

In [6]:
# Load dataset from catalog or csv depending on env
table_name = f"{city}_airbnb_dataset"
if databricks:
    spark_df = spark.read.table(table_name)
    delta_table = DeltaTable.forName(spark, f"workspace.default.{table_name}")
    dataset_version = delta_table.history().select("version").orderBy("version", ascending=False).first()["version"]
else:
    spark_df = spark.read.format("csv") \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("sep", ",") \
            .option("escape", '"') \
            .option("encoding", "UTF-8") \
            .option("quote", '"') \
            .option("multiLine", "true") \
            .load(f"{table_name}.csv").limit(100)
    dataset_version = 1.0
df = spark_df.toPandas()
example_input = df.head(1).copy()
example_input.columns = example_input.columns.astype(str)
example_input = example_input.drop(columns=["price"])

# Create one-hot-encodings categorical column
neighbourhoods = np.array(df[["neighbourhood_cleansed"]])
neighbourhood_encoder = OneHotEncoder()
neighbourhood_encoder.fit(neighbourhoods.reshape(-1, 1))
df = one_hot_encode("neighbourhood_cleansed", df, neighbourhood_encoder)

property_types = np.array(df[["property_type"]])
property_type_encoder = OneHotEncoder()
property_type_encoder.fit(property_types.reshape(-1, 1))
df = one_hot_encode("property_type", df, property_type_encoder)

room_types = np.array(df[["room_type"]])
room_type_encoder = OneHotEncoder()
room_type_encoder.fit(room_types.reshape(-1, 1))
df = one_hot_encode("room_type", df, room_type_encoder)

# Create multi-hot-encodings for multi-categorical column
amenities_encoder = MultiLabelBinarizer()
amenities_encoder.fit(np.array(df[["amenities"]]))
df = multi_hot_encode("amenities", df, amenities_encoder)

# Scale columns
accommodates_scaler = MinMaxScaler()
df["accommodates"] = accommodates_scaler.fit_transform(df[["accommodates"]])

bathrooms_scaler = MinMaxScaler()
df["bathrooms"] = bathrooms_scaler.fit_transform(df[["bathrooms"]])

bedrooms_scaler = MinMaxScaler()
df["bedrooms"] = bedrooms_scaler.fit_transform(df[["bedrooms"]])

beds_scaler = MinMaxScaler()
df["beds"] = beds_scaler.fit_transform(df[["beds"]])

availability_scaler = MinMaxScaler()
df["availability_365"] = availability_scaler.fit_transform(df[["availability_365"]])

review_scores_scaler = MinMaxScaler()
df["review_scores_value"] = review_scores_scaler.fit_transform(df[["review_scores_value"]])

scalers = [("accommodates", accommodates_scaler), ("bathrooms", bathrooms_scaler), ("bedrooms", bedrooms_scaler), ("beds", beds_scaler),("availability_365", availability_scaler),("review_scores_value", review_scores_scaler)]
one_hot_encoders = [("neighbourhood_cleansed", neighbourhood_encoder), ("property_type", property_type_encoder), ("room_type", room_type_encoder)]
multi_hot_encoders = [("amenities", amenities_encoder)]

# Create embeddings
embedding_columns = ["name", "description"]
embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
embedding_df = embed(df, embedding_columns, embedding_model)
df = df.drop(columns=embedding_columns)

df.head(100)

Unnamed: 0,accommodates,bathrooms,bedrooms,beds,availability_365,review_scores_value,price,neighbourhood_cleansed_Bijlmer-Oost,neighbourhood_cleansed_Bos en Lommer,neighbourhood_cleansed_Buitenveldert - Zuidas,...,"['Outdoor furniture', 'Children’s books and toys', 'Pack ’n play/Travel crib']","['Outdoor furniture', 'Free dryer – In unit', 'Board games', 'Baking sheet']","['Outdoor furniture', 'High chair', 'Children’s books and toys', 'Outdoor dining area']","['Outdoor furniture', 'Piano', 'Blender', 'Bathtub', 'Free dryer – In unit', 'City skyline view', 'Board games', 'Outdoor dining area', 'Baking sheet']","['Outdoor furniture', 'Piano', 'Blender', 'Bathtub', 'Outdoor dining area']",['Outdoor furniture'],"['Private backyard – Fully fenced', 'Babysitter recommendations', 'Outdoor furniture']","['Private backyard – Fully fenced', 'Outdoor furniture', 'Bathtub', 'Outdoor dining area']","['Private backyard – Fully fenced', 'Outdoor furniture', 'Outdoor dining area', 'Baking sheet']",[]
0,0.071429,0.285714,0.1,0.090909,0.897222,0.93750,269.0,0.0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,0
1,0.142857,0.285714,0.2,0.151515,0.763889,0.90500,254.0,0.0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,0
2,0.142857,0.428571,0.2,0.060606,0.027778,0.93500,203.0,0.0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,0
3,0.142857,0.571429,0.2,0.060606,0.166667,0.91111,375.0,0.0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,0
4,0.000000,0.285714,0.1,0.030303,0.686111,0.92750,599.0,0.0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,0.000000,0.285714,0.1,0.000000,0.383333,1.00000,200.0,0.0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,1
96,0.000000,0.428571,0.1,0.030303,0.175000,1.00000,250.0,0.0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,0
97,0.214286,1.000000,0.4,0.121212,0.872222,0.75000,549.0,0.0,0.0,0.0,...,0,0,0,0,0,0,0,0,0,0
98,0.142857,0.285714,0.2,0.060606,0.016667,1.00000,380.0,0.0,0.0,0.0,...,0,0,0,0,1,0,0,0,0,0


### Create Train/Validation/Test splits

In [7]:
# Separate input and output features 
target_column = "price"
x = df.drop(columns=[target_column]).values
y = df[target_column].values  # Extract target variable

# Split into Train (80%) and Temp (20%)
x_train, x_temp, y_train, y_temp = train_test_split(x, y, test_size=0.2, random_state=42)

# Split Temp into Validation (10%) and Test (10%)
x_val, x_test, y_val, y_test = train_test_split(x_temp, y_temp, test_size=0.5, random_state=42)

# Print shapes to verify
print(f"Train: x_train={x_train.shape}, y_train={y_train.shape}")
print(f"Validation: x_val={x_val.shape}, y_val={y_val.shape}")
print(f"Test: x_test={x_test.shape}, y_test={y_test.shape}")

Train: x_train=(80, 97), y_train=(80,)
Validation: x_val=(10, 97), y_val=(10,)
Test: x_test=(10, 97), y_test=(10,)


### Initialize MLFlow

In [8]:
# Initialize MLflow

if databricks:
    mlflow.tracking._model_registry.utils._get_registry_uri_from_spark_session = lambda: "databricks-uc"
    mlflow.set_tracking_uri("databricks")  # Use Databricks' MLflow tracking
    mlflow.set_experiment("/Users/vanthiel.erwin@gmail.com/airbnb-price-regression")
else:
    dataset_version = 1.0
    mlflow.set_tracking_uri("file:///C:/Users/evanthiel/source/repos/airbnb-demo/mlruns")
    experiment_name = "airbnb-price-regression"
    if not mlflow.get_experiment_by_name(experiment_name):
        mlflow.create_experiment(experiment_name)
    mlflow.set_experiment(experiment_name)

### Train Regression Tree Model

In [9]:
if train_regression_tree:
    with mlflow.start_run():
        mlflow.log_param("dataset_version", dataset_version)
          
        # Train Model
        model = DecisionTreeRegressor()
        model.fit(x_train, y_train)
        
        # Make Predictions
        y_train_pred = model.predict(x_train)
        y_val_pred = model.predict(x_val)
        y_test_pred = model.predict(x_test)
        
        # Calculate Metrics
        train_mae = mean_absolute_error(y_train, y_train_pred)
        val_mae = mean_absolute_error(y_val, y_val_pred)
        test_mae = mean_absolute_error(y_test, y_test_pred)
        
        # Log Metrics in MLflow
        mlflow.log_metric("train_mae", train_mae)
        mlflow.log_metric("val_mae", val_mae)
        mlflow.log_metric("test_mae", test_mae)
        
        # Log Model in MLflow
        run = mlflow.active_run()
        artifact_uri = run.info.artifact_uri
        parsed_uri = urllib.parse.urlparse(artifact_uri)
        if parsed_uri.scheme == "file":
            artifact_base_path = Path(parsed_uri.path[1:]).resolve()
        else:
            artifact_base_path = Path(parsed_uri.path[1:]).resolve()
        metadata_artifact_path = artifact_base_path / "metadata.json"
        wrapper = SklearnRegressorPreprocessWrapper(model, ["name", "description"], one_hot_encoders, multi_hot_encoders, scalers, artifact_base_path)
        metadata = {
            "text_columns": wrapper.text_columns,
            "ohe_paths": wrapper.scaler_paths,
            "scaler_paths": wrapper.scaler_paths,
            "onehot_encoder_paths": wrapper.onehot_encoder_paths,
            "multihot_encoder_paths": wrapper.multihot_encoder_paths,
            "model_path": wrapper.model_path
        }
        with open(metadata_artifact_path, "w") as f:
            json.dump(metadata, f)
        
        example_output = wrapper.predict(None, example_input)
        signature = infer_signature(["input"], params=example_input.to_dict(orient="records")[0])
        mlflow.pyfunc.log_model("regression_tree", python_model=wrapper, artifacts={"meta_data" : metadata_artifact_path.resolve().as_uri()}, signature=signature)


Downloading artifacts: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 47.28it/s]


### Train Neural Network

In [10]:
#%pip install torch

In [14]:
if train_neural_network:

    # Convert NumPy arrays to PyTorch tensors
    x_train, y_train = torch.tensor(x_train, dtype=torch.float32), torch.tensor(y_train, dtype=torch.float32)
    x_val, y_val = torch.tensor(x_val, dtype=torch.float32), torch.tensor(y_val, dtype=torch.float32)
    x_test, y_test = torch.tensor(x_test, dtype=torch.float32), torch.tensor(y_test, dtype=torch.float32)
    
    # Create PyTorch DataLoaders
    batch_size = 4
    train_dataset = TensorDataset(x_train, y_train)
    val_dataset = TensorDataset(x_val, y_val)
    test_dataset = TensorDataset(x_test, y_test)
    
    train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False)
    
    # Train the model and log with MLflow
    input_size = x.shape[1]
    num_epochs = 10
    learning_rate = 0.001
    
    model = SimpleNNRegressor(input_size)
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    
    with mlflow.start_run():
        mlflow.log_param("dataset_version", dataset_version)
        mlflow.log_param("input_size", input_size)
        mlflow.log_param("learning_rate", learning_rate)
        mlflow.log_param("num_epochs", num_epochs)
    
        # Move model to GPU if available
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        model.to(device)
        
        # Define the learning rate scheduler
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2, verbose=True)
        
        # Training Loop
        model.train()
        for epoch in range(num_epochs):
            total_loss = 0
            
            for features, labels in train_loader:
                features, labels = features.to(device), labels.to(device).view(-1, 1)
                optimizer.zero_grad()
                outputs = model(features)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
            
            train_loss = total_loss / len(train_loader)
            mlflow.log_metric("train_loss", train_loss, step=epoch)
            print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}")
        
            # Evaluate Model on Validation Set
            model.eval()
            total_loss = 0
            with torch.no_grad():
                for features, labels in val_loader:
                    features, labels = features.to(device), labels.to(device).view(-1, 1)
                    outputs = model(features)
                    loss = criterion(outputs, labels)
                    total_loss += loss.item()
            
            val_loss = total_loss / len(val_loader)
            mlflow.log_metric("val_loss", val_loss, step=epoch)
            print(f"Validation Loss: {val_loss:.4f}")
            
            # Adjust learning rate based on validation loss
            scheduler.step(val_loss)
        
        # Evaluate Model on Test Set
        total_loss = 0
        with torch.no_grad():
            for features, labels in test_loader:
                features, labels = features.to(device), labels.to(device).view(-1, 1)
                features.requires_grad = True
                outputs = model(features)
                loss = criterion(outputs, labels)
                total_loss += loss.item()
        
        test_loss = total_loss / len(test_loader)
        mlflow.log_metric("test_loss", test_loss)
        print(f"Test Loss: {test_loss:.4f}")
        
            
        # Log Model in MLflow
        run = mlflow.active_run()
        artifact_uri = run.info.artifact_uri
        parsed_uri = urllib.parse.urlparse(artifact_uri)
        if parsed_uri.scheme == "file":
            artifact_base_path = Path(parsed_uri.path[1:]).resolve()
        else:
            artifact_base_path = Path(parsed_uri.path[1:]).resolve()
        metadata_artifact_path = artifact_base_path / "metadata.json"
        wrapper = TorchNNPreprocessWrapper(model, ["name", "description"], one_hot_encoders, multi_hot_encoders, None, scalers, artifact_base_path)
        metadata = {
            "text_columns": wrapper.text_columns,
            "ohe_paths": wrapper.scaler_paths,
            "scaler_paths": wrapper.scaler_paths,
            "onehot_encoder_paths": wrapper.onehot_encoder_paths,
            "multihot_encoder_paths": wrapper.multihot_encoder_paths,
            "model_path": wrapper.model_path,
            "embedding_model_path": wrapper.embedding_model_path
        }
        
        with open(metadata_artifact_path, "w") as f:
            json.dump(metadata, f)
        
        example_output = wrapper.predict(None, example_input)
        signature = infer_signature(["input"], params=example_input.to_dict(orient="records")[0])
        mlflow.pyfunc.log_model("neural_network", python_model=wrapper, artifacts={"meta_data" : metadata_artifact_path.resolve().as_uri()}, signature=signature)

Epoch [1/10], Train Loss: 68381.0013
Validation Loss: 143026.4323
Epoch [2/10], Train Loss: 68159.7223
Validation Loss: 142612.1328
Epoch [3/10], Train Loss: 67777.5381
Validation Loss: 141875.6172
Epoch [4/10], Train Loss: 67131.2003
Validation Loss: 140670.1589
Epoch [5/10], Train Loss: 66101.0330
Validation Loss: 138978.4844
Epoch [6/10], Train Loss: 64755.5597
Validation Loss: 136687.0299
Epoch [7/10], Train Loss: 62990.7757
Validation Loss: 133893.5938
Epoch [8/10], Train Loss: 60861.2716
Validation Loss: 130689.1719
Epoch [9/10], Train Loss: 58481.0065
Validation Loss: 126911.3620
Epoch [10/10], Train Loss: 55769.5889
Validation Loss: 122750.6836
Test Loss: 69539.8385


Downloading artifacts: 100%|█████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 30.62it/s]
