# 03 — Listing-Level Demand Forecasting (Spark MLlib / Big-Data Strategy)

This notebook trains **listing-level demand models** directly in Spark using the engineered dataset from Notebook 02.

**Data source (single source of truth):**
- `data/processed/listing_features.parquet` (includes `elasticity_slope` already added in Notebook 02)

**Goal:** predict **booking demand** (`n_bookings`) using listing + pricing + market context features (including `elasticity_slope`).

**Approach (Spark-native):**
- Load Parquet into a Spark DataFrame (no pandas conversion)
- Build a Spark MLlib pipeline: type cleaning → imputation → vector assembly → model
- Train and evaluate multiple regressors (baseline + tree models)
- Save the best model and feature importances (where supported)

**Outputs**
- `outputs/spark_model_results.csv`
- `outputs/spark_feature_importance.csv` (tree models)
- `models/spark_best_model/` (Spark MLlib pipeline model)

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import os, sys
from pathlib import Path
import numpy as np

# Locate project root (folder containing 'src')
project_root = Path.cwd()
for p in [project_root] + list(project_root.parents):
    if (p / 'src').exists():
        project_root = p
        break
if not (project_root / 'src').exists():
    raise FileNotFoundError("Could not find 'src' directory.")

# Windows Spark/Hadoop helpers (assumes hadoop binaries already exist from Notebook 01)
hadoop_home = project_root / 'hadoop'
bin_dir = hadoop_home / 'bin'
os.environ['HADOOP_HOME'] = str(hadoop_home)
os.environ['hadoop.home.dir'] = str(hadoop_home)
os.environ['PATH'] = str(bin_dir) + os.pathsep + os.environ.get('PATH', '')

sys.path.insert(0, str(project_root))

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.storagelevel import StorageLevel

spark = (
    SparkSession.builder
    .appName('airbnb-demand-forecasting-sparkml')
    .master('local[*]')
    # Stability / memory headroom for ML on Windows
    .config('spark.driver.memory', '4g')
    .config('spark.sql.shuffle.partitions', '64')
    .config('spark.default.parallelism', '64')
    .getOrCreate()
 )

# Keep Spark execution stable on Windows
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "false")

print('Spark version:', spark.version)
np.random.seed(42)

Spark version: 3.5.0


In [3]:
# Persist datasets (Spark writes)
out_processed = project_root / 'data' / 'processed'
out_processed.mkdir(parents=True, exist_ok=True)
outputs_dir = project_root / 'outputs'
outputs_dir.mkdir(parents=True, exist_ok=True)

In [14]:
# Verify listing features file exists
listing_features_file = out_processed / 'listing_features.parquet'
print(f"Listing features path: data/processed/listing_features.parquet")

if listing_features_file.exists():
    print(f"File exists: {listing_features_file.exists()}")
else:
    print("File not found. Run Notebook 02 to generate listing_features.parquet")

Listing features path: data/processed/listing_features.parquet
File exists: True


In [6]:
# Load listing-level engineered features (Spark-only)
listing_features_path = str(out_processed / 'listing_features.parquet')
if not Path(listing_features_path).exists():
    raise FileNotFoundError(f"Missing {listing_features_path}. Run Notebook 02 first.")

df = (
    spark.read.option('mergeSchema', 'true')
    .parquet(listing_features_path)
    .persist(StorageLevel.MEMORY_AND_DISK)
 )

print(f"Loaded listing_features.parquet with {len(df.columns)} columns")
df.printSchema()

# Quick sanity checks
required_cols = ['n_bookings', 'elasticity_slope']
missing = [c for c in required_cols if c not in df.columns]
if missing:
    raise ValueError(f"Missing required columns in parquet: {missing}")

df.select(required_cols).summary().show(truncate=False)

Loaded listing_features.parquet with 49 columns
root
 |-- city: string (nullable = true)
 |-- weekend: integer (nullable = true)
 |-- cat_city_amsterdam: integer (nullable = true)
 |-- cat_city_athens: integer (nullable = true)
 |-- cat_city_barcelona: integer (nullable = true)
 |-- cat_city_berlin: integer (nullable = true)
 |-- cat_city_budapest: integer (nullable = true)
 |-- cat_city_lisbon: integer (nullable = true)
 |-- cat_city_london: integer (nullable = true)
 |-- cat_city_paris: integer (nullable = true)
 |-- cat_city_rome: integer (nullable = true)
 |-- cat_city_vienna: integer (nullable = true)
 |-- is_weekend: integer (nullable = true)
 |-- listing_price: double (nullable = true)
 |-- room_shared: integer (nullable = true)
 |-- room_private: integer (nullable = true)
 |-- person_capacity: double (nullable = true)
 |-- host_is_superhost: integer (nullable = true)
 |-- multi: integer (nullable = true)
 |-- biz: integer (nullable = true)
 |-- cleanliness_rating: double (nulla

In [15]:
df.columns

['city',
 'weekend',
 'cat_city_amsterdam',
 'cat_city_athens',
 'cat_city_barcelona',
 'cat_city_berlin',
 'cat_city_budapest',
 'cat_city_lisbon',
 'cat_city_london',
 'cat_city_paris',
 'cat_city_rome',
 'cat_city_vienna',
 'is_weekend',
 'listing_price',
 'room_shared',
 'room_private',
 'person_capacity',
 'host_is_superhost',
 'multi',
 'biz',
 'cleanliness_rating',
 'guest_satisfaction_overall',
 'bedrooms',
 'city_center_dist',
 'metro_dist',
 'n_bookings',
 'cat_room_type_entire_home_apt',
 'cat_room_type_private_room',
 'cat_room_type_shared_room',
 'log_price',
 'price_per_person',
 'price_per_bedroom',
 'capacity_bin',
 'quality_score',
 'price_x_satisfaction',
 'price_per_dist_km',
 'log_metro_dist',
 'log_bookings',
 'segment_listing_count',
 'segment_avg_price',
 'segment_median_price',
 'segment_p90_price',
 'segment_p10_price',
 'segment_price_std',
 'segment_price_spread',
 'price_vs_segment_median',
 'price_vs_segment_avg',
 'relative_price_volatility',
 'elasticity_

In [21]:
# Basic cleaning: ensure label is double, and keep only usable feature columns
from pyspark.sql.types import (
    StringType, BooleanType, NumericType
 )

target_col = 'n_bookings'
label_col = 'label'

df2 = df.withColumn(label_col, F.col(target_col).cast('double'))

# Cast booleans to ints (Spark ML prefers numeric features)
for field in df2.schema.fields:
    if isinstance(field.dataType, BooleanType):
        df2 = df2.withColumn(field.name, F.col(field.name).cast('int'))

# Drop redundant columns if they exist
_redundant = ['city', 'weekend', 'room_shared', 'room_private', 'log_bookings']
_to_drop = [c for c in _redundant if c in df2.columns]
if _to_drop:
    df2 = df2.drop(*_to_drop)
    print(f"Dropped columns: {_to_drop}")

# Drop rows with null label
df2 = df2.filter(F.col(label_col).isNotNull())

print('Rows after label cleaning (lazy):')
df2.select(F.count('*').alias('rows')).show()

Dropped columns: ['city', 'weekend', 'room_shared', 'room_private', 'log_bookings']
Rows after label cleaning (lazy):
+-----+
| rows|
+-----+
|51707|
+-----+



In [22]:
# Build a Spark ML pipeline (handles numeric + string cols)
from pyspark.ml import Pipeline
from pyspark.ml.feature import (
    Imputer, StringIndexer, OneHotEncoder, VectorAssembler
 )
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

# Columns to exclude from features
exclude_cols = {target_col, label_col}
for maybe_id in ['id', 'listing_id']:
    if maybe_id in df2.columns:
        exclude_cols.add(maybe_id)

string_cols = []
numeric_cols = []
for field in df2.schema.fields:
    if field.name in exclude_cols:
        continue
    if isinstance(field.dataType, StringType):
        string_cols.append(field.name)
    elif isinstance(field.dataType, NumericType):
        numeric_cols.append(field.name)

print(f"String feature cols: {len(string_cols)}")
print(f"Numeric feature cols: {len(numeric_cols)}")
if len(string_cols) > 0:
    print('Example string cols:', string_cols[:10])
print('Example numeric cols:', numeric_cols[:10])

# Split
train_df, test_df = df2.randomSplit([0.8, 0.2], seed=42)
train_df = train_df.persist(StorageLevel.MEMORY_AND_DISK)
test_df = test_df.persist(StorageLevel.MEMORY_AND_DISK)

# Stages: index + encode for strings, impute for numerics, assemble features
stages_common = []

indexed_cols = [f"{c}__idx" for c in string_cols]
ohe_cols = [f"{c}__ohe" for c in string_cols]
for c, idx in zip(string_cols, indexed_cols):
    stages_common.append(StringIndexer(inputCol=c, outputCol=idx, handleInvalid='keep'))
if string_cols:
    stages_common.append(OneHotEncoder(inputCols=indexed_cols, outputCols=ohe_cols, handleInvalid='keep'))

imputed_numeric_cols = [f"{c}__imp" for c in numeric_cols]
if numeric_cols:
    stages_common.append(Imputer(inputCols=numeric_cols, outputCols=imputed_numeric_cols, strategy='median'))

feature_cols = []
feature_cols.extend(imputed_numeric_cols)
feature_cols.extend(ohe_cols)

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features', handleInvalid='keep')
stages_common.append(assembler)

# Evaluators
evaluator_rmse = RegressionEvaluator(labelCol=label_col, predictionCol='prediction', metricName='rmse')
evaluator_mae = RegressionEvaluator(labelCol=label_col, predictionCol='prediction', metricName='mae')
evaluator_r2 = RegressionEvaluator(labelCol=label_col, predictionCol='prediction', metricName='r2')

print('Pipeline stages ready.')

String feature cols: 1
Numeric feature cols: 42
Example string cols: ['capacity_bin']
Example numeric cols: ['cat_city_amsterdam', 'cat_city_athens', 'cat_city_barcelona', 'cat_city_berlin', 'cat_city_budapest', 'cat_city_lisbon', 'cat_city_london', 'cat_city_paris', 'cat_city_rome', 'cat_city_vienna']
Pipeline stages ready.


In [23]:
# Train and compare Spark MLlib models (safe defaults for local Windows)
def fit_eval(model_name: str, regressor):
    pipeline = Pipeline(stages=stages_common + [regressor])
    model = pipeline.fit(train_df)
    preds = model.transform(test_df)
    rmse = evaluator_rmse.evaluate(preds)
    mae = evaluator_mae.evaluate(preds)
    r2 = evaluator_r2.evaluate(preds)
    return model, {'model': model_name, 'rmse': float(rmse), 'mae': float(mae), 'r2': float(r2)}

# Start with lighter models; scale up once stable
candidates = [
    ('LinearRegression', LinearRegression(featuresCol='features', labelCol=label_col, regParam=0.0, elasticNetParam=0.0, maxIter=50)),
    ('GBTRegressor', GBTRegressor(featuresCol='features', labelCol=label_col, maxIter=80, maxDepth=5, stepSize=0.1, subsamplingRate=0.8, seed=42)),
    # RandomForest can be heavier on some Windows setups; keep it last and modest
    ('RandomForest', RandomForestRegressor(featuresCol='features', labelCol=label_col, numTrees=50, maxDepth=8, maxBins=64, subsamplingRate=0.8, featureSubsetStrategy='sqrt', seed=42)),
 ]

results = []
trained = {}
for name, reg in candidates:
    print(f"Training {name}...")
    m, r = fit_eval(name, reg)
    trained[name] = m
    results.append(r)
    print(f"  RMSE={r['rmse']:.4f} | MAE={r['mae']:.4f} | R2={r['r2']:.4f}")

results_sorted = sorted(results, key=lambda x: x['rmse'])
print('')
print('Model comparison (sorted by RMSE):')
for r in results_sorted:
    print(f"- {r['model']}: RMSE={r['rmse']:.4f} | MAE={r['mae']:.4f} | R2={r['r2']:.4f}")

results_sorted

Training LinearRegression...
  RMSE=3.9581 | MAE=2.8923 | R2=0.8126
Training GBTRegressor...
  RMSE=3.5081 | MAE=2.4404 | R2=0.8528
Training RandomForest...
  RMSE=3.7543 | MAE=2.6411 | R2=0.8314

Model comparison (sorted by RMSE):
- GBTRegressor: RMSE=3.5081 | MAE=2.4404 | R2=0.8528
- RandomForest: RMSE=3.7543 | MAE=2.6411 | R2=0.8314
- LinearRegression: RMSE=3.9581 | MAE=2.8923 | R2=0.8126


[{'model': 'GBTRegressor',
  'rmse': 3.5080533376905705,
  'mae': 2.44036625075162,
  'r2': 0.8527987728557824},
 {'model': 'RandomForest',
  'rmse': 3.754254425849336,
  'mae': 2.6410885734848866,
  'r2': 0.831412078030317},
 {'model': 'LinearRegression',
  'rmse': 3.958121125413672,
  'mae': 2.8922900050809393,
  'r2': 0.8126053382269172}]

In [24]:
# Save best model + export metrics + feature importance (driver-side, no Spark createDataFrame)
import csv

best_name = results_sorted[0]['model']
best_model = trained[best_name]
print(f"Best model by RMSE: {best_name}")

models_dir = project_root / 'models'
models_dir.mkdir(parents=True, exist_ok=True)
best_path = models_dir / 'spark_best_model'
best_model.write().overwrite().save(str(best_path))
print(f"Saved Spark PipelineModel to: {best_path}")

# Save metrics (driver-side CSV)
outputs_dir = project_root / 'outputs'
outputs_dir.mkdir(parents=True, exist_ok=True)
metrics_path = outputs_dir / 'spark_model_results.csv'
with open(metrics_path, 'w', newline='', encoding='utf-8') as f:
    w = csv.DictWriter(f, fieldnames=['model', 'rmse', 'mae', 'r2'])
    w.writeheader()
    for r in results_sorted:
        w.writerow(r)
print(f"Saved metrics to: {metrics_path}")

# Feature importance (only for tree models)
from pyspark.ml.regression import RandomForestRegressionModel, GBTRegressionModel

assembler_stage = [s for s in best_model.stages if s.__class__.__name__ == 'VectorAssembler'][0]
assembled_inputs = assembler_stage.getInputCols()
last_stage = best_model.stages[-1]

if isinstance(last_stage, (RandomForestRegressionModel, GBTRegressionModel)):
    fi = last_stage.featureImportances
    rows = [(assembled_inputs[i], float(fi[i])) for i in range(len(assembled_inputs))]
    rows = sorted(rows, key=lambda t: t[1], reverse=True)
    fi_path = outputs_dir / 'spark_feature_importance.csv'
    with open(fi_path, 'w', newline='', encoding='utf-8') as f:
        w = csv.writer(f)
        w.writerow(['feature', 'importance'])
        w.writerows(rows)
    print(f"Saved feature importances to: {fi_path}")
    print('Top 25 features:')
    for feat, imp in rows[:25]:
        print(f"- {feat}: {imp:.6f}")
else:
    print('Best model does not provide feature importances (expected for LinearRegression).')

Best model by RMSE: GBTRegressor
Saved Spark PipelineModel to: c:\Users\Andres\Documents\Jupyter\github portfolio\PySpark LTSM\airbnb\models\spark_best_model
Saved metrics to: c:\Users\Andres\Documents\Jupyter\github portfolio\PySpark LTSM\airbnb\outputs\spark_model_results.csv
Saved feature importances to: c:\Users\Andres\Documents\Jupyter\github portfolio\PySpark LTSM\airbnb\outputs\spark_feature_importance.csv
Top 25 features:
- is_weekend__imp: 0.362807
- listing_price__imp: 0.195027
- price_per_person__imp: 0.086064
- host_is_superhost__imp: 0.081060
- cleanliness_rating__imp: 0.072426
- guest_satisfaction_overall__imp: 0.063346
- quality_score__imp: 0.021954
- price_per_bedroom__imp: 0.018370
- price_x_satisfaction__imp: 0.012651
- metro_dist__imp: 0.012297
- elasticity_slope__imp: 0.011111
- city_center_dist__imp: 0.010720
- price_per_dist_km__imp: 0.008504
- price_vs_segment_median__imp: 0.007070
- price_vs_segment_avg__imp: 0.006178
- segment_price_std__imp: 0.004302
- relativ

## Test Deep Learning

Use PyTorch to train on a sample collected from Spark.

- Uses the already-engineered features (including `elasticity_slope`)
- Trains only a few epochs to test API usage before scaling

In [25]:
# Create a small training sample from Spark for DL demos
# (keep it small so it runs fast on CPU)

# Reuse the same preprocessing stages to build a features vector, then collect a sample
demo_regressor = LinearRegression(featuresCol='features', labelCol=label_col, maxIter=1)
demo_pipeline = Pipeline(stages=stages_common + [demo_regressor])
demo_model = demo_pipeline.fit(train_df.limit(1000))  # quick fit to materialize transformers
demo_features_df = demo_model.transform(train_df).select('features', label_col)

# Sample to the driver
demo_n = 8000
sample_df = demo_features_df.orderBy(F.rand(seed=42)).limit(demo_n)
rows = sample_df.collect()
X = np.vstack([r['features'].toArray() for r in rows]).astype('float32')
y = np.array([r[label_col] for r in rows], dtype='float32')

# Light target transform (optional, often helps neural nets for count-like targets)
y_log = np.log1p(y)

print('Demo sample shapes:', X.shape, y.shape)
print('Feature dim:', X.shape[1])

Demo sample shapes: (8000, 47) (8000,)
Feature dim: 47


In [None]:
# --- PyTorch: MLP regressor (showcase) ---
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(X, y_log, test_size=0.2, random_state=42)
X_train_t = torch.from_numpy(X_train)
y_train_t = torch.from_numpy(y_train).view(-1, 1)
X_val_t = torch.from_numpy(X_val)
y_val_t = torch.from_numpy(y_val).view(-1, 1)

train_loader = DataLoader(TensorDataset(X_train_t, y_train_t), batch_size=256, shuffle=True)

model = nn.Sequential(
    nn.Linear(X.shape[1], 64),
    nn.ReLU(),
    nn.Linear(64, 32),
    nn.ReLU(),
    nn.Linear(32, 1),
)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()

for epoch in range(5):
    model.train()
    total = 0.0
    for xb, yb in train_loader:
        opt.zero_grad()
        pred = model(xb)
        loss = loss_fn(pred, yb)
        loss.backward()
        opt.step()
        total += float(loss) * len(xb)
    model.eval()
    with torch.no_grad():
        val_pred = model(X_val_t)
        val_loss = float(loss_fn(val_pred, y_val_t))
    print(f"PyTorch epoch {epoch+1}/5 | train_mse={total/len(X_train):.4f} | val_mse={val_loss:.4f}")

# Convert back from log1p for a quick, human-readable metric
with torch.no_grad():
    yhat_val = torch.expm1(model(X_val_t)).squeeze().numpy()
y_val_raw = np.expm1(y_val)
rmse = float(np.sqrt(np.mean((yhat_val - y_val_raw) ** 2)))
print(f"PyTorch demo RMSE (raw bookings): {rmse:.3f}")

PyTorch epoch 1/5 | train_mse=417.5935 | val_mse=104.5468
PyTorch epoch 2/5 | train_mse=25.1397 | val_mse=12.3923
PyTorch epoch 3/5 | train_mse=5.7534 | val_mse=3.6118
PyTorch epoch 4/5 | train_mse=2.0758 | val_mse=2.3189
PyTorch epoch 5/5 | train_mse=1.4751 | val_mse=2.5375
PyTorch demo RMSE (raw bookings): 5525.016
