In [None]:
!pip install mlflow
!pip install hyperopt
!pip install holidays
!pip install pmdarima
!pip install xgboost

Collecting mlflow
  Downloading mlflow-3.6.0-py3-none-any.whl.metadata (31 kB)
Collecting mlflow-skinny==3.6.0 (from mlflow)
  Downloading mlflow_skinny-3.6.0-py3-none-any.whl.metadata (31 kB)
Collecting mlflow-tracing==3.6.0 (from mlflow)
  Downloading mlflow_tracing-3.6.0-py3-none-any.whl.metadata (19 kB)
Collecting Flask-CORS<7 (from mlflow)
  Downloading flask_cors-6.0.1-py3-none-any.whl.metadata (5.3 kB)
Collecting docker<8,>=4.0.0 (from mlflow)
  Downloading docker-7.1.0-py3-none-any.whl.metadata (3.8 kB)
Collecting graphene<4 (from mlflow)
  Downloading graphene-3.4.3-py2.py3-none-any.whl.metadata (6.9 kB)
Collecting gunicorn<24 (from mlflow)
  Downloading gunicorn-23.0.0-py3-none-any.whl.metadata (4.4 kB)
Collecting huey<3,>=2.5.0 (from mlflow)
  Downloading huey-2.5.4-py3-none-any.whl.metadata (4.6 kB)
Collecting databricks-sdk<1,>=0.20.0 (from mlflow-skinny==3.6.0->mlflow)
  Downloading databricks_sdk-0.73.0-py3-none-any.whl.metadata (40 kB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚î

In [None]:
import os
#os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import mlflow
import mlflow.spark
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
import numpy as np
import pandas as pd
import holidays
import xgboost as xgb
from sklearn.metrics import r2_score, mean_absolute_error, mean_squared_error
from statsmodels.tsa.statespace.sarimax import SARIMAX
import pmdarima as pm
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import GRU, Dense, Dropout, Input
from sklearn.preprocessing import MinMaxScaler
import warnings

warnings.filterwarnings('ignore')


In [None]:
# --- 1. Spark Session ---
spark = (SparkSession.builder
         .appName("Advanced_Modeling_Spark")
         .master("local[*]")
         .config("spark.driver.memory", "8g")
         .getOrCreate())
spark.sparkContext.setLogLevel("ERROR")
print(f"‚úÖ Spark Session initiated: {spark.version}")


‚úÖ Spark Session initiated: 3.5.1


In [None]:
# --- 2. Data Loading and Feature Engineering ---
@F.udf(IntegerType())
def is_holiday(date):
    mx_holidays = holidays.Mexico()
    return 1 if date in mx_holidays else 0

parquet_path = "/content/drive/MyDrive/Big Data/delitos_cdmx.parquet"
df = spark.read.parquet(parquet_path)
df_filtered = df.filter(F.col("anio_hecho") >= 2016)
df_ts = df_filtered.select("fecha_hecho").withColumn("fecha", F.to_date(F.col("fecha_hecho")))
daily_counts = df_ts.groupBy("fecha").agg(F.count("*").alias("total_delitos")).orderBy("fecha")

windowSpec = Window.orderBy("fecha")
lags = [1, 2, 3, 4, 5, 6, 7, 14, 30]
for lag in lags:
    daily_counts = daily_counts.withColumn(f"lag_{lag}", F.lag("total_delitos", lag).over(windowSpec))

windowRolling7 = Window.orderBy("fecha").rowsBetween(-7, -1)
daily_counts = daily_counts.withColumn("rolling_mean_7", F.avg("total_delitos").over(windowRolling7))
daily_counts = daily_counts.withColumn("rolling_std_7", F.stddev("total_delitos").over(windowRolling7))
windowRolling30 = Window.orderBy("fecha").rowsBetween(-30, -1)
daily_counts = daily_counts.withColumn("rolling_mean_30", F.avg("total_delitos").over(windowRolling30))

daily_counts = daily_counts.withColumn("day_of_week", F.dayofweek("fecha")) \
                           .withColumn("day_of_month", F.dayofmonth("fecha")) \
                           .withColumn("month", F.month("fecha")) \
                           .withColumn("is_weekend", F.when(F.col("day_of_week").isin([1, 7]), 1).otherwise(0)) \
                           .withColumn("is_holiday", is_holiday(F.col("fecha")))
df_model = daily_counts.na.drop()


In [None]:
# --- 3. Train/Test Split ---
split_date = "2024-01-01"
train_df_spark = df_model.filter(F.col("fecha") < split_date)
test_df_spark = df_model.filter(F.col("fecha") >= split_date)
print(f"Train set: {train_df_spark.count()} records. Test set: {test_df_spark.count()} records.")

train_df_pd = train_df_spark.toPandas().set_index("fecha")
test_df_pd = test_df_spark.toPandas().set_index("fecha")

feature_cols = [f"lag_{lag}" for lag in lags] + \
               ["rolling_mean_7", "rolling_std_7", "rolling_mean_30"] + \
               ["day_of_week", "day_of_month", "month", "is_weekend", "is_holiday"]

X_train = train_df_pd[feature_cols]
y_train = train_df_pd['total_delitos']
X_test = test_df_pd[feature_cols]
y_test = test_df_pd['total_delitos']


Train set: 2892 records. Test set: 398 records.


In [None]:
# --- 4. Model Implementations ---

def evaluate_model(name, y_true, y_pred):
    r2 = r2_score(y_true, y_pred)
    mae = mean_absolute_error(y_true, y_pred)
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    print(f"‚úÖ {name} - R¬≤: {r2:.4f}, MAE: {mae:.2f}, RMSE: {rmse:.2f}, MAPE: {mape:.2f}%")
    return {"R2": r2, "MAE": mae, "RMSE": rmse, "MAPE": mape}

def train_gru():
    print("\n--- Training GRU Model ---")
    #os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
    scaler_X = MinMaxScaler()
    scaler_y = MinMaxScaler()

    X_train_scaled = scaler_X.fit_transform(X_train)
    y_train_scaled = scaler_y.fit_transform(y_train.values.reshape(-1, 1))
    X_test_scaled = scaler_X.transform(X_test)

    X_train_reshaped = X_train_scaled.reshape((X_train_scaled.shape[0], 1, X_train_scaled.shape[1]))
    X_test_reshaped = X_test_scaled.reshape((X_test_scaled.shape[0], 1, X_test_scaled.shape[1]))

    model = Sequential([
        Input(shape=(X_train_reshaped.shape[1], X_train_reshaped.shape[2])),
        GRU(128, return_sequences=True), Dropout(0.2),
        GRU(64), Dropout(0.2),
        Dense(32, activation='relu'),
        Dense(1)
    ])
    model.compile(optimizer='adam', loss='mse')

    model.fit(X_train_reshaped, y_train_scaled, epochs=300, batch_size=16, verbose=1)

    y_pred_scaled = model.predict(X_test_reshaped)
    y_pred = scaler_y.inverse_transform(y_pred_scaled)

    return evaluate_model("GRU", y_test, y_pred.flatten())

def train_xgboost():
    print("\n--- Tuning XGBoost Model ---")
    mlflow.set_experiment("XGBoost_Tuning")

    def objective(params):
        with mlflow.start_run(nested=True):
            model = xgb.XGBRegressor(
                objective='reg:squarederror',
                seed=42,
                **params
            )
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            r2 = r2_score(y_test, y_pred)
            mlflow.log_params(params)
            mlflow.log_metric("R2_Test", r2)
            return {'loss': -r2, 'status': STATUS_OK}

    search_space = {
        'n_estimators': hp.choice('n_estimators', range(100, 1000, 20)),
        'max_depth': hp.choice('max_depth', range(3, 10)),
        'learning_rate': hp.loguniform('learning_rate', np.log(0.005), np.log(0.3)),
        'subsample': hp.uniform('subsample', 0.6, 1.0),
        'colsample_bytree': hp.uniform('colsample_bytree', 0.6, 1.0)
    }

    with mlflow.start_run(run_name="XGBoost_Hyperopt"):
        trials = Trials()
        best = fmin(fn=objective, space=search_space, algo=tpe.suggest, max_evals=100, trials=trials)
        best_run = trials.best_trial

        # Train final model with best params
        best_params = {k: v[0] if isinstance(v, list) else v for k, v in best.items()}
        from hyperopt import space_eval
        best_params_values = space_eval(search_space, best)

        final_model = xgb.XGBRegressor(objective='reg:squarederror', seed=42, **best_params_values)
        final_model.fit(X_train, y_train)
        y_pred = final_model.predict(X_test)

        print("\nüèÜ Best XGBoost Hyperparameters:")
        print(best_params_values)

        return evaluate_model("XGBoost", y_test, y_pred)

def train_sarima():
    print("\n--- Finding best SARIMA order ---")
    exog_cols = ['is_holiday', 'is_weekend']
    # auto_arima can be slow, so we use a smaller sample to find the order
    auto_arima_model = pm.auto_arima(
        train_df_pd['total_delitos'],
        exogenous=train_df_pd[exog_cols],
        start_p=1, start_q=1,
        test='adf',
        max_p=3, max_q=3,
        m=7, # Weekly seasonality
        start_P=0, seasonal=True,
        d=1, D=1,
        trace=True,
        error_action='ignore',
        suppress_warnings=True,
        stepwise=True
    )
    print(f"Best SARIMA order: {auto_arima_model.order} and seasonal order: {auto_arima_model.seasonal_order}")

    print("\n--- Training SARIMA Model ---")
    model = SARIMAX(
        train_df_pd['total_delitos'],
        exog=train_df_pd[exog_cols],
        order=auto_arima_model.order,
        seasonal_order=auto_arima_model.seasonal_order
    )
    results = model.fit(disp=False)

    y_pred = results.get_forecast(steps=len(test_df_pd), exog=test_df_pd[exog_cols]).predicted_mean

    return evaluate_model("SARIMA", y_test, y_pred)


In [None]:
# --- 5. Main Execution ---
if __name__ == "__main__":
    results = {}
    results["XGBoost"] = train_xgboost()
    results["SARIMA"] = train_sarima()
    results["GRU"] = train_gru()

    print("\n\n--- Final Model Comparison ---")
    results_df = pd.DataFrame(results).T
    print(results_df)

    spark.stop()



--- Tuning XGBoost Model ---
100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 100/100 [05:18<00:00,  3.18s/trial, best loss: -0.6460247039794922]

üèÜ Best XGBoost Hyperparameters:
{'colsample_bytree': 0.9168547678893675, 'learning_rate': 0.021059153017535113, 'max_depth': 7, 'n_estimators': 400, 'subsample': 0.6000826333288183}
‚úÖ XGBoost - R¬≤: 0.6460, MAE: 37.46, RMSE: 58.22, MAPE: 10.24%

--- Finding best SARIMA order ---
Performing stepwise search to minimize aic
 ARIMA(1,1,1)(0,1,1)[7]             : AIC=inf, Time=3.86 sec
 ARIMA(0,1,0)(0,1,0)[7]             : AIC=35160.487, Time=0.09 sec
 ARIMA(1,1,0)(1,1,0)[7]             : AIC=33427.006, Time=0.83 sec
 ARIMA(0,1,1)(0,1,1)[7]             : AIC=inf, Time=2.61 sec
 ARIMA(1,1,0)(0,1,0)[7]             : AIC=34496.497, Time=0.11 sec
 ARIMA(1,1,0)(2,1,0)[7]             : AIC=33109.899, Time=2.33 sec
 ARIMA(1,1,0)(2,1,1)[7]             : AIC=inf, Time=5.50 sec
 ARIMA(1,1,0)(1,1,1)[7]             : AIC=inf, Time=2.72 sec
 ARIMA(0,1,0)(2,1,0)[7] 