#Notebook Purpose:
This notebook demonstrates a robust machine learning inference pipeline connecting a trained model to data stored in Google BigQuery.

**It covers:**

1. Fetching and cleaning data from BigQuery.

2. Mapping data columns to the modelâ€™s expected feature names.

3. Handling missing values, unknown columns, and datatype mismatches.

4. Running inference using a pre-trained model.

5. Generating a health report and writing predictions back to BigQuery.

The pipeline is designed to be robust and fail-safe, preventing runtime errors due to evolving data schemas or missing values.

**Imports & Configuration**

In [None]:
from google.colab import files
uploaded = files.upload()

In [26]:
import logging
import re
import sys
from dataclasses import dataclass, field
from typing import List, Dict, Optional

import joblib
import numpy as np
import pandas as pd
from google.cloud import bigquery
from google.api_core.exceptions import GoogleAPIError

# Logging Setup
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)


# Configuration
@dataclass
class Config:
    PROJECT_ID: str = "crown-486620"
    DATASET_ID: str = "crown1"
    SOURCE_TABLE: str = "Machine Failure Data"
    PRED_TABLE: str = "machine_failure_predictions"
    MODEL_PATH: str = "final_machine_failure_model.pkl"

    OPTIMAL_THRESHOLD: float = 0.78

    # Columns to explicitly remove before processing
    COLUMNS_TO_DROP: List[str] = field(default_factory=lambda: [
        'Machine failure', 'logit', 'HDF', 'OSF', 'PWF', 'TWF', 'RNF'
    ])

    # Mapping: { Clean_Column_Name_in_BQ : Column_Name_Expected_by_Model }
    COLUMN_MAPPING: Dict[str, str] = field(default_factory=lambda: {
        "Process temperature": "Process temperature [K]",
        "Rotational speed": "Rotational speed [rpm]",
        "Torque": "Torque [Nm]",
        "Tool wear": "Tool wear [min]",
        "Temp_diff": "Temp_diff",
        "Power_norm": "Power_norm",
        "Tool_wear_norm": "Tool_wear_norm",
        "Type_encoded": "Type_encoded"
    })

    @property
    def source_full_path(self) -> str:
        return f"{self.PROJECT_ID}.{self.DATASET_ID}.{self.SOURCE_TABLE}"

    @property
    def dest_full_path(self) -> str:
        return f"{self.PROJECT_ID}.{self.DATASET_ID}.{self.PRED_TABLE}"

    @property
    def expected_features(self) -> List[str]:
        return list(self.COLUMN_MAPPING.values())


# Helper Functions
def load_model(path: str):
    """Loads the serialized model from disk."""
    try:
        model = joblib.load(path)
        logger.info(f"Model loaded successfully from {path}")
        return model
    except FileNotFoundError:
        logger.error(f"Model file not found at {path}")
        sys.exit(1)
    except Exception as e:
        logger.error(f"Failed to load model: {e}")
        sys.exit(1)

def fetch_data(client: bigquery.Client, query: str) -> pd.DataFrame:
    """Fetches data from BigQuery safely."""
    try:
        df = client.query(query).to_dataframe()
        if df.empty:
            logger.warning("Check: Source table is empty. Exiting.")
            sys.exit(0)
        logger.info(f"Data fetched. Shape: {df.shape}")
        return df
    except GoogleAPIError as e:
        logger.error(f"BigQuery fetch failed: {e}")
        sys.exit(1)

def clean_column_names(df: pd.DataFrame) -> pd.DataFrame:
    """Normalizes column names by stripping units and whitespace."""
    df.columns = df.columns.str.strip()
    # Remove contents inside [] and ()
    df.columns = df.columns.str.replace(r"\[.*?\]", "", regex=True)
    df.columns = df.columns.str.replace(r"\(.*?\)", "", regex=True)
    # Collapse multiple spaces to one
    df.columns = df.columns.str.replace(r"\s+", " ", regex=True)
    # Strip again to handle trailing spaces after regex
    df.columns = df.columns.str.strip()
    return df

def preprocess_features(df: pd.DataFrame, config: Config) -> pd.DataFrame:
    """Prepares the feature matrix X matching model requirements."""

    # 1. Drop known unnecessary columns
    df_clean = df.drop(columns=config.COLUMNS_TO_DROP, errors='ignore').copy()

    # 2. Normalize raw column names (remove units like [K] so we can match keys)
    df_clean = clean_column_names(df_clean)

    # 3. Rename columns to match Model Expectations specifically
    renamed_cols = {}
    for clean_name, model_name in config.COLUMN_MAPPING.items():
        # Case-insensitive lookup
        matching_cols = [c for c in df_clean.columns if clean_name.lower() in c.lower()]

        if matching_cols:
            renamed_cols[matching_cols[0]] = model_name
        else:
            logger.warning(f"Expected feature base '{clean_name}' not found in data.")

    df_clean = df_clean.rename(columns=renamed_cols)

    # 4. Create Final Feature Matrix (X)
    X = pd.DataFrame(index=df_clean.index)

    for feature in config.expected_features:
        if feature in df_clean.columns:
            X[feature] = pd.to_numeric(df_clean[feature], errors='coerce')
        else:
            logger.warning(f"Missing feature '{feature}'. Filling with 0.")
            X[feature] = 0.0

    # 5. Handle NaNs
    if X.isnull().values.any():
        logger.info("Imputing missing values with batch median.")
        X = X.fillna(X.median())
        X = X.fillna(0) # Fallback if median fails (e.g. all NaNs)

    return X[config.expected_features] # Enforce order


# Health Report of model
def generate_health_report(df: pd.DataFrame, proba_col: str, pred_col: str):
    total = len(df)
    failures = df[pred_col].sum()
    failure_rate = (failures / total) * 100
    avg_confidence = df[proba_col].mean()

    print("\n" + "="*50)
    print("MODEL INFERENCE HEALTH REPORT")
    print("="*50)
    print(f"Total Machines Scanned : {total}")
    print(f"Predicted Failures     : {failures}")
    print(f"Failure Rate           : {failure_rate:.2f}%")
    print(f"Avg Failure Prob       : {avg_confidence:.4f}")
    print("-" * 50)

    # Distribution Check
    print("Probability Distribution:")
    print(df[proba_col].describe().to_string())
    print("-" * 50)

    if failure_rate > 20:
        print("ALERT: High failure rate detected (>20%). Check input features.")
    elif failure_rate == 0:
        print("ALERT: Zero failures predicted. Model might be too conservative.")
    else:
        print("Prediction rate looks within normal operational bounds.")

    print("="*50 + "\n")

    if failures > 0:
        print("TOP 5 MACHINES AT RISK:")
        cols_to_show = [c for c in ['Product ID', 'UDI', proba_col] if c in df.columns]
        print(df.sort_values(by=proba_col, ascending=False)[cols_to_show].head(5).to_string(index=False))


# Main Pipeline
def run_pipeline():
    cfg = Config()
    bq_client = bigquery.Client(project=cfg.PROJECT_ID)

    # 1. Load Resources
    print("Loading model...")
    model = load_model(cfg.MODEL_PATH)

    # 2. Fetch Data
    print(f"Fetching data from {cfg.source_full_path}...")
    query = f"SELECT * FROM `{cfg.source_full_path}`"
    df_raw = fetch_data(bq_client, query)

    # 3. Preprocess
    X = preprocess_features(df_raw, cfg)

    # 4. Inference
    print("Running inference...")
    try:
        y_proba = model.predict_proba(X)[:, 1]
        y_pred = (y_proba > cfg.OPTIMAL_THRESHOLD).astype(int)

        # Attach predictions to dataframe
        df_raw['failure_probability'] = y_proba
        df_raw['predicted_machine_failure'] = y_pred

        # ---PRINT THE REPORT TO TERMINAL ---
        generate_health_report(df_raw, 'failure_probability', 'predicted_machine_failure')

    except Exception as e:
        logger.error(f"Inference failed: {e}")
        raise

    # 5. Export
    print(f"Saving predictions to {cfg.dest_full_path}...")
    try:
        job_config = bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE")
        job = bq_client.load_table_from_dataframe(
            df_raw,
            cfg.dest_full_path,
            job_config=job_config
        )
        job.result()
        print("Pipeline Finished Successfully.")
    except Exception as e:
        logger.error(f"BigQuery write failed: {e}")
        sys.exit(1)

run_pipeline()

Loading model...
Fetching data from crown-486620.crown1.Machine Failure Data...
Running inference...

MODEL INFERENCE HEALTH REPORT
Total Machines Scanned : 10000
Predicted Failures     : 294
Failure Rate           : 2.94%
Avg Failure Prob       : 0.1080
--------------------------------------------------
Probability Distribution:
count    10000.000000
mean         0.107981
std          0.211855
min          0.000124
25%          0.004091
50%          0.014629
75%          0.083163
max          0.995519
--------------------------------------------------
Prediction rate looks within normal operational bounds.

TOP 5 MACHINES AT RISK:
 failure_probability
            0.995519
            0.994964
            0.992644
            0.991783
            0.990709
Saving predictions to crown-486620.crown1.machine_failure_predictions...
Pipeline Finished Successfully.
