In [None]:
# =============================================================================
# User Activity Prediction - Feature Store & Experiment Tracking
# =============================================================================
# This notebook demonstrates an end-to-end ML workflow using Snowflake's native
# ML capabilities: Feature Store, Experiment Tracking, and Model Registry.
# =============================================================================

# Import python packages
import logging
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

from snowflake.snowpark.functions import (
    col, count, avg, max as max_, min as min_, dateadd, lit, sum as sum_, 
    coalesce, datediff, any_value, when, iff
)
from snowflake.ml.feature_store import FeatureStore, Entity, FeatureView, CreationMode

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Get Snowpark session
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# =============================================================================
# Configuration - Centralized parameters for easy management
# =============================================================================
CONFIG = {
    "database": "dev",
    "feature_store_name": "user_activity_feature_store",
    "warehouse": "ds_wh_medium",
    "feature_refresh_freq": "24 hours",
    "lookback_days": 7,  # For rolling window features
    "test_size": 0.2,
    "random_state": 42
}

logger.info(f"Configuration loaded: {CONFIG}")

## 1. Data Preparation

Features are computed at **daily granularity** (`date_utc`) to:
- Avoid `CURRENT_DATE()` dependencies (better for reproducibility)
- Enable point-in-time correct feature retrieval
- Support incremental updates efficiently

#### Call quality

In [None]:
Each feature query includes `date_utc` for daily-level aggregation and uses parameterized lookback windows.


In [None]:
# Call Quality Features - Daily granularity
user_features_call_quality_df = session.sql(f"""
SELECT 
    call_date AS date_utc,
    USER_ID_HEX,
    
    -- All-time cumulative features (up to this date)
    CAST(COUNT_IF(COALESCE(num_bad_mos_periods, 0) > 0) AS FLOAT) AS calls_with_bad_mos,
    CAST(AVG(computed_mos) AS FLOAT) AS average_mos,
    CAST(MAX(RTP_SETUP_TIME) AS FLOAT) AS max_rtp_setup_time,
    
    -- Rolling 7-day features
    CAST(COUNT_IF(
        call_date >= DATEADD('day', -{CONFIG['lookback_days']}, call_date) 
        AND COALESCE(num_bad_mos_periods, 0) > 0
    ) AS FLOAT) AS calls_with_bad_mos_7d,
    CAST(AVG(CASE 
        WHEN call_date >= DATEADD('day', -{CONFIG['lookback_days']}, call_date) 
        THEN computed_mos 
    END) AS FLOAT) AS average_mos_7d,
    CAST(MAX(CASE 
        WHEN call_date >= DATEADD('day', -{CONFIG['lookback_days']}, call_date) 
        THEN RTP_SETUP_TIME 
    END) AS FLOAT) AS max_rtp_setup_time_7d

FROM dev.public.legacy_call_end
WHERE USER_ID_HEX != '000-00-000-000000000'
    AND USER_ID_HEX IS NOT NULL
GROUP BY call_date, USER_ID_HEX
""")

logger.info(f"Call Quality features: {user_features_call_quality_df.count()} rows")
user_features_call_quality_df.show(5)

#### Call rating

In [None]:
# Call Rating Features - Daily granularity
user_features_call_rating_df = session.sql(f"""
SELECT 
    date_utc,
    user_id_hex,
    
    -- All-time cumulative features
    CAST(COUNT(call_rating) AS FLOAT) AS call_rating_count,
    CAST(AVG(call_rating) AS FLOAT) AS avg_call_rating,
    CAST(MAX(call_rating) AS FLOAT) AS max_call_rating,
    CAST(MIN(call_rating) AS FLOAT) AS min_call_rating,
    
    -- Rolling 7-day features
    CAST(COUNT(CASE 
        WHEN date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, date_utc) 
        THEN call_rating 
    END) AS FLOAT) AS call_rating_count_7d,
    CAST(AVG(CASE 
        WHEN date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, date_utc) 
        THEN call_rating 
    END) AS FLOAT) AS avg_call_rating_7d,
    CAST(MAX(CASE 
        WHEN date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, date_utc) 
        THEN call_rating 
    END) AS FLOAT) AS max_call_rating_7d,
    CAST(MIN(CASE 
        WHEN date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, date_utc) 
        THEN call_rating 
    END) AS FLOAT) AS min_call_rating_7d

FROM dev.public.call_ratings_combined_sources
WHERE call_rating > 0
    AND user_id_hex != '000-00-000-000000000'
    AND user_id_hex IS NOT NULL
GROUP BY date_utc, user_id_hex
""")

logger.info(f"Call Rating features: {user_features_call_rating_df.count()} rows")
user_features_call_rating_df.show(5)

#### Data usage

In [None]:
# Data Usage Features - Daily granularity
user_features_data_usage_df = session.sql(f"""
SELECT 
    c.date_utc,
    up.user_id_hex,
    
    -- All-time cumulative features
    CAST(SUM(c.mb_usage) AS FLOAT) AS data_usage_mb,
    
    -- Rolling 7-day features
    CAST(SUM(CASE 
        WHEN c.date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, c.date_utc) 
        THEN c.mb_usage 
        ELSE 0 
    END) AS FLOAT) AS data_usage_mb_7d

FROM dev.public.cost_user_daily_tmobile_cost c
JOIN dev.public.user_profiles up ON c.username = up.latest_username
WHERE up.user_id_hex IS NOT NULL
    AND up.user_id_hex != '000-00-000-000000000'
GROUP BY c.date_utc, up.user_id_hex
""")

logger.info(f"Data Usage features: {user_features_data_usage_df.count()} rows")
user_features_data_usage_df.show(5)

#### Session

In [None]:
# Session Features - Daily granularity
user_features_sessions_df = session.sql(f"""
SELECT 
    m.date_utc,
    up.user_id_hex,
    
    -- All-time cumulative features
    CAST(SUM(m.time_in_app_mins_per_day) AS FLOAT) AS time_in_app_mins,
    CAST(DATEDIFF('day', ANY_VALUE(up.registered_at), m.date_utc) AS FLOAT) AS tenure_days,
    CAST(SUM(m.num_sessions) AS FLOAT) AS session_count,
    
    -- Rolling 7-day features
    CAST(SUM(CASE 
        WHEN m.date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, m.date_utc) 
        THEN m.time_in_app_mins_per_day 
        ELSE 0 
    END) AS FLOAT) AS time_in_app_mins_7d,
    CAST(SUM(CASE 
        WHEN m.date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, m.date_utc) 
        THEN m.num_sessions 
        ELSE 0 
    END) AS FLOAT) AS session_count_7d

FROM dev.public.metrics_daily_userlevel_app_time_sessions m
JOIN dev.public.user_profiles up ON m.username = up.latest_username
WHERE up.user_id_hex IS NOT NULL
    AND up.user_id_hex != '000-00-000-000000000'
GROUP BY m.date_utc, up.user_id_hex
""")

logger.info(f"Session features: {user_features_sessions_df.count()} rows")
user_features_sessions_df.show(5)

#### NPS ratings

In [None]:
# NPS Rating Features - Daily granularity
user_features_nps_rating_df = session.sql(f"""
SELECT 
    date_utc,
    user_id_hex,
    
    -- All-time cumulative features
    CAST(COUNT(*) AS FLOAT) AS nps_count,
    CAST(AVG(score) AS FLOAT) AS nps_avg_rating,
    CAST(MAX(score) AS FLOAT) AS nps_max_rating,
    
    -- Rolling 7-day features
    CAST(COUNT(CASE 
        WHEN date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, date_utc) 
        THEN 1 
    END) AS FLOAT) AS nps_count_7d,
    CAST(AVG(CASE 
        WHEN date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, date_utc) 
        THEN score 
    END) AS FLOAT) AS nps_avg_rating_7d,
    CAST(MAX(CASE 
        WHEN date_utc >= DATEADD('day', -{CONFIG['lookback_days']}, date_utc) 
        THEN score 
    END) AS FLOAT) AS nps_max_rating_7d

FROM dev.public.nps_combined_sources
WHERE user_id_hex != '000-00-000-000000000'
    AND user_id_hex IS NOT NULL
GROUP BY date_utc, user_id_hex
""")

logger.info(f"NPS Rating features: {user_features_nps_rating_df.count()} rows")
user_features_nps_rating_df.show(5)

## 2. Feature Store

Snowflake Feature Store provides:
- **Centralized feature management** with versioning
- **Point-in-time correct** feature retrieval for training
- **Automatic refresh** via dynamic tables

#### Create FS

In [None]:
# fs = FeatureStore(
#     session=session,
#     database="dev",
#     name="user_activity_feature_store",
#     default_warehouse="ds_wh_medium",
#     creation_mode=CreationMode.CREATE_IF_NOT_EXIST
# )

#### Connect to FS

In [None]:
# Connect to existing Feature Store
fs = FeatureStore(
    session=session,
    database=CONFIG["database"],
    name=CONFIG["feature_store_name"],
    default_warehouse=CONFIG["warehouse"]
)

logger.info(f"Connected to Feature Store: {CONFIG['feature_store_name']}")

#### Create and register entities

In [None]:
# entity = Entity(
#     name="user",
#     join_keys=["user_id_hex"],
#     desc="user entity"
# )
# fs.register_entity(entity)

#### Get existing entities

In [None]:
entity = fs.get_entity("user")

#### Create and register feature views

In [None]:
# Call Quality Feature View
user_features_call_quality_fv = FeatureView(
    name="user_features_call_quality",
    entities=[entity],
    feature_df=user_features_call_quality_df,
    timestamp_col="DATE_UTC",  # Enable point-in-time lookups
    refresh_freq=CONFIG["feature_refresh_freq"],
    desc="User call quality metrics including MOS scores and bad call counts"
)

fs.register_feature_view(
    feature_view=user_features_call_quality_fv,
    version="1",
    overwrite=True  # Allow re-registration during development
)
logger.info("Registered: user_features_call_quality")

In [None]:
# Call Rating Feature View
user_features_call_rating_fv = FeatureView(
    name="user_features_call_rating",
    entities=[entity],
    feature_df=user_features_call_rating_df,
    timestamp_col="DATE_UTC",
    refresh_freq=CONFIG["feature_refresh_freq"],
    desc="User call rating statistics and trends"
)

fs.register_feature_view(
    feature_view=user_features_call_rating_fv,
    version="1",
    overwrite=True
)
logger.info("Registered: user_features_call_rating")

In [None]:
# Data Usage Feature View
user_features_data_usage_fv = FeatureView(
    name="user_features_data_usage",
    entities=[entity],
    feature_df=user_features_data_usage_df,
    timestamp_col="DATE_UTC",
    refresh_freq=CONFIG["feature_refresh_freq"],
    desc="User mobile data consumption patterns"
)

fs.register_feature_view(
    feature_view=user_features_data_usage_fv,
    version="1",
    overwrite=True
)
logger.info("Registered: user_features_data_usage")

In [None]:
# Sessions Feature View
user_features_sessions_fv = FeatureView(
    name="user_features_sessions",
    entities=[entity],
    feature_df=user_features_sessions_df,
    timestamp_col="DATE_UTC",
    refresh_freq=CONFIG["feature_refresh_freq"],
    desc="User app session and engagement metrics"
)

fs.register_feature_view(
    feature_view=user_features_sessions_fv,
    version="1",
    overwrite=True
)
logger.info("Registered: user_features_sessions")

In [None]:
# NPS Rating Feature View
user_features_nps_rating_fv = FeatureView(
    name="user_features_nps_rating",
    entities=[entity],
    feature_df=user_features_nps_rating_df,
    timestamp_col="DATE_UTC",
    refresh_freq=CONFIG["feature_refresh_freq"],
    desc="User NPS (Net Promoter Score) feedback metrics"
)

fs.register_feature_view(
    feature_view=user_features_nps_rating_fv,
    version="1",
    overwrite=True
)
logger.info("Registered: user_features_nps_rating")

# Summary of registered feature views
logger.info("=" * 50)
logger.info("All 5 feature views registered successfully!")
logger.info("=" * 50)

## 3. Model Training

Train baseline models with Snowflake Experiment Tracking to:
- Compare multiple algorithms
- Log metrics, parameters, and model artifacts
- Track experiment lineage

In [None]:
# ML Libraries - Using Snowflake ML instead of sklearn
from snowflake.ml.modeling.linear_model import LinearRegression, Ridge, Lasso
from snowflake.ml.modeling.ensemble import RandomForestRegressor, GradientBoostingRegressor
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.lightgbm import LGBMRegressor
from snowflake.ml.modeling.preprocessing import StandardScaler
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.metrics import mean_squared_error, mean_absolute_error, r2_score
from snowflake.ml.experiment import ExperimentTracking
import numpy as np

# Initialize Experiment Tracking
EXPERIMENT_NAME = "user_activity_forecasting_baseline"

exp = ExperimentTracking(session=session)
exp.set_experiment(EXPERIMENT_NAME)

logger.info(f"Experiment initialized: {EXPERIMENT_NAME}")

#### Spine df

In [None]:
# Define training date range (avoids CURRENT_DATE() for reproducibility)
# In production, pass these as parameters
TRAINING_END_DATE = "2024-12-01"  # Adjust to your data's available date
TRAINING_START_DATE = "2024-11-17"  # 14 days before end date

# Spine DataFrame: Target variable (active_days_in_week)
spine_df = session.sql(f"""
SELECT
    '{TRAINING_END_DATE}'::DATE AS date_utc,  -- Reference date for feature lookup
    up.user_id_hex,
    SUM(IFF(m.time_in_app_mins_per_day > 1, 1, 0)) AS active_days_in_week
FROM dev.public.metrics_daily_userlevel_app_time_sessions m
JOIN dev.public.user_profiles up ON m.username = up.latest_username
WHERE m.date_utc BETWEEN '{TRAINING_START_DATE}' AND '{TRAINING_END_DATE}'
    AND up.user_id_hex IS NOT NULL
    AND up.user_id_hex != '000-00-000-000000000'
GROUP BY up.user_id_hex
""")

logger.info(f"Spine DataFrame: {spine_df.count()} users")
logger.info(f"Training period: {TRAINING_START_DATE} to {TRAINING_END_DATE}")
spine_df.show(5)

#### Get training dataset from FS

In [None]:
# Retrieve ALL feature views (using all 5, not just 2)
fv_call_quality = fs.get_feature_view(name="user_features_call_quality", version="1")
fv_call_rating = fs.get_feature_view(name="user_features_call_rating", version="1")
fv_data_usage = fs.get_feature_view(name="user_features_data_usage", version="1")
fv_sessions = fs.get_feature_view(name="user_features_sessions", version="1")
fv_nps_rating = fs.get_feature_view(name="user_features_nps_rating", version="1")

# Generate training set with point-in-time join
all_features = [
    fv_call_quality, 
    fv_call_rating, 
    fv_data_usage, 
    fv_sessions, 
    fv_nps_rating
]

training_df = fs.generate_training_set(
    spine_df=spine_df,
    features=all_features,
    spine_timestamp_col="DATE_UTC",  # For point-in-time correct joins
    spine_label_cols=["ACTIVE_DAYS_IN_WEEK"]
)

# Keep as Snowpark DataFrame (no .to_pandas() - Snowflake ML works directly with Snowpark DF!)
logger.info(f"Training dataset: {training_df.count()} rows")
logger.info(f"Features: {training_df.columns}")
training_df.show(5)

#### Split training and testing dataset

In [None]:
# Define feature and label columns for Snowflake ML
from snowflake.snowpark.types import FloatType, DoubleType, IntegerType, LongType, DecimalType

LABEL_COL = "ACTIVE_DAYS_IN_WEEK"
ID_COLS = ["USER_ID_HEX", "DATE_UTC"]

# Get numerical types for filtering
NUMERICAL_TYPES = (FloatType, DoubleType, IntegerType, LongType, DecimalType)

# Get ALL feature columns (excluding ID and label)
ALL_FEATURE_COLS = [c for c in training_df.columns if c not in ID_COLS + [LABEL_COL]]

# Filter to only NUMERICAL columns using schema
NUMERICAL_COLS = [
    field.name for field in training_df.schema.fields 
    if field.name in ALL_FEATURE_COLS and isinstance(field.datatype, NUMERICAL_TYPES)
]

# Use numerical columns for scaling, all features for model
FEATURE_COLS = ALL_FEATURE_COLS  # All features for the model
SCALE_COLS = NUMERICAL_COLS      # Only numerical for StandardScaler

logger.info(f"Total features: {len(FEATURE_COLS)} columns")
logger.info(f"Numerical features (for scaling): {len(SCALE_COLS)} columns")
logger.info(f"Label: {LABEL_COL}")

# Handle nulls in Snowpark DF
training_df = training_df.fillna(0)

# Split using Snowpark random_split (80-20 train/test ratio)
train_df, test_df = training_df.random_split(
    weights=[0.8, 0.2], 
    seed=CONFIG["random_state"]
)

logger.info(f"Training set: {train_df.count()} samples")
logger.info(f"Test set: {test_df.count()} samples")
logger.info(f"Features used: {len(FEATURE_COLS)}")

#### Preprocessing

In [None]:
# Snowflake ML StandardScaler - only applied to NUMERICAL columns
# Non-numerical columns pass through unchanged
scaler = StandardScaler(
    input_cols=SCALE_COLS,      # Only numerical columns
    output_cols=SCALE_COLS      # Overwrite in place
)

logger.info(f"Scaler configured for {len(SCALE_COLS)} numerical features")

#### Base training

In [None]:
# Define baseline models with Snowflake ML API
# Key difference: Must specify input_cols, label_cols, output_cols
baseline_models = [
    # Linear models
    ("LinearRegression", LinearRegression(
        input_cols=FEATURE_COLS,
        label_cols=[LABEL_COL],
        output_cols=["PREDICTION"]
    )),
    ("Ridge", Ridge(
        input_cols=FEATURE_COLS,
        label_cols=[LABEL_COL],
        output_cols=["PREDICTION"]
    )),
    ("Lasso", Lasso(
        input_cols=FEATURE_COLS,
        label_cols=[LABEL_COL],
        output_cols=["PREDICTION"]
    )),
    
    # Tree-based models
    ("RandomForest", RandomForestRegressor(
        input_cols=FEATURE_COLS,
        label_cols=[LABEL_COL],
        output_cols=["PREDICTION"],
        n_estimators=100,
        random_state=CONFIG["random_state"]
    )),
    ("GradientBoosting", GradientBoostingRegressor(
        input_cols=FEATURE_COLS,
        label_cols=[LABEL_COL],
        output_cols=["PREDICTION"],
        random_state=CONFIG["random_state"]
    )),
    
    # Boosting models
    ("XGBoost", XGBRegressor(
        input_cols=FEATURE_COLS,
        label_cols=[LABEL_COL],
        output_cols=["PREDICTION"],
        random_state=CONFIG["random_state"]
    )),
    ("LightGBM", LGBMRegressor(
        input_cols=FEATURE_COLS,
        label_cols=[LABEL_COL],
        output_cols=["PREDICTION"],
        random_state=CONFIG["random_state"]
    ))
]

logger.info(f"Prepared {len(baseline_models)} baseline models for training (Snowflake ML)")

In [None]:
# Training loop with Snowflake ML using Pipeline
results = []

logger.info("=" * 60)
logger.info("TRAINING BASELINE MODELS (Snowflake ML with Pipeline)")
logger.info("=" * 60)

for name, model in baseline_models:
    logger.info(f"\nTraining: {name}")
    
    # Build pipeline with preprocessing (StandardScaler on numerical cols only + Model)
    pipeline = Pipeline(steps=[
        ("scaler", StandardScaler(input_cols=SCALE_COLS, output_cols=SCALE_COLS)),
        ("model", model)
    ])
    
    # Fit pipeline on training data
    pipeline.fit(train_df)
    
    # Predict on test data - returns Snowpark DataFrame with PREDICTION column
    predictions_df = pipeline.predict(test_df)
    
    # Calculate metrics using Snowflake ML metrics API
    mse = mean_squared_error(
        df=predictions_df, 
        y_true_col_names=[LABEL_COL], 
        y_pred_col_names=["PREDICTION"]
    )
    mae = mean_absolute_error(
        df=predictions_df, 
        y_true_col_names=[LABEL_COL], 
        y_pred_col_names=["PREDICTION"]
    )
    r2 = r2_score(
        df=predictions_df, 
        y_true_col_names=[LABEL_COL], 
        y_pred_col_names=["PREDICTION"]
    )
    rmse = np.sqrt(mse)
    
    results.append({
        "name": name,
        "mse": mse,
        "rmse": rmse,
        "mae": mae,
        "r2": r2,
        "pipeline": pipeline
    })
    
    logger.info(f"  MSE: {mse:.4f} | RMSE: {rmse:.4f} | MAE: {mae:.4f} | R¬≤: {r2:.4f}")
    
    # Log to Snowflake Experiment Tracking
    with exp.start_run():
        exp.log_param("model_type", name)
        exp.log_param("n_features", len(FEATURE_COLS))
        exp.log_param("train_samples", train_df.count())
        exp.log_param("test_samples", test_df.count())
        
        exp.log_metric("mse", mse)
        exp.log_metric("rmse", rmse)
        exp.log_metric("mae", mae)
        exp.log_metric("r2", r2)
        
        # Log pipeline with Snowpark DF sample
        exp.log_model(
            model=pipeline, 
            model_name=f"{name}_model", 
            sample_input_data=train_df.select(FEATURE_COLS).limit(5)
        )

logger.info("\n" + "=" * 60)
logger.info("TRAINING COMPLETE")
logger.info("=" * 60)
    

## 4. Model Selection & Results

Compare model performance and identify the best model for production.

In [None]:
# Create results comparison DataFrame
results_df = pd.DataFrame([
    {
        "Model": r["name"],
        "MSE": r["mse"],
        "RMSE": r["rmse"],
        "MAE": r["mae"],
        "R¬≤": r["r2"]
    }
    for r in results
]).sort_values("RMSE")

logger.info("\nüìä MODEL COMPARISON (sorted by RMSE):")
print(results_df.to_string(index=False))

# Identify best model (lowest RMSE)
best_model_result = min(results, key=lambda x: x["rmse"])
best_model_name = best_model_result["name"]
best_model_pipeline = best_model_result["pipeline"]

logger.info(f"\nüèÜ BEST MODEL: {best_model_name}")
logger.info(f"   RMSE: {best_model_result['rmse']:.4f}")
logger.info(f"   R¬≤: {best_model_result['r2']:.4f}")

In [None]:
## 5. Model Registry

# Register the best model for production deployment
from snowflake.ml.registry import Registry

registry = Registry(
    session=session, 
    database_name=CONFIG["database"], 
    schema_name="data_science"
)

logger.info("Connected to Snowflake Model Registry")

In [None]:
# Register the best model as champion
MODEL_NAME = "user_activity_predictor"
MODEL_VERSION = "v1.0"

model_version = registry.log_model(
    model=best_model_pipeline,
    model_name=MODEL_NAME,
    version_name=MODEL_VERSION,
    comment=f"Best baseline model: {best_model_name} | RMSE: {best_model_result['rmse']:.4f} | R¬≤: {best_model_result['r2']:.4f}",
    sample_input_data=X_train.head(),
    metrics={
        "rmse": best_model_result["rmse"],
        "mse": best_model_result["mse"],
        "mae": best_model_result["mae"],
        "r2": best_model_result["r2"]
    }
)

logger.info(f"‚úÖ Model '{model_version.model_name}' version '{model_version.version_name}' registered successfully!")
logger.info(f"   Algorithm: {best_model_name}")
logger.info(f"   Metrics: RMSE={best_model_result['rmse']:.4f}, R¬≤={best_model_result['r2']:.4f}")

## 6. Inference (Example)

Load the registered model and make predictions on new data.

In [None]:
# Load model from registry for inference
loaded_model = registry.get_model(MODEL_NAME).version(MODEL_VERSION)

# Example: Make predictions on test data (using Snowpark DF)
predictions_df = loaded_model.run(test_df.select(FEATURE_COLS))

# Get prediction stats
pred_stats = predictions_df.select("PREDICTION").agg({
    "PREDICTION": ["count", "min", "max"]
}).collect()[0]

logger.info(f"Generated {pred_stats[0]} predictions")
logger.info(f"Prediction range: {pred_stats[1]:.2f} - {pred_stats[2]:.2f}")

# Summary
logger.info("\n" + "=" * 60)
logger.info("PIPELINE COMPLETE! (Using Snowflake ML)")
logger.info("=" * 60)
logger.info(f"‚úÖ Feature Store: 5 feature views registered")
logger.info(f"‚úÖ Experiments: {len(baseline_models)} models trained and logged")
logger.info(f"‚úÖ Best Model: {best_model_name} (RMSE: {best_model_result['rmse']:.4f})")
logger.info(f"‚úÖ Model Registry: {MODEL_NAME} v{MODEL_VERSION} registered")
logger.info(f"‚úÖ All training done IN Snowflake - no data downloaded!")
logger.info("=" * 60)
