In [0]:
access_key = ""

In [0]:

file_path = "fpnacopilot.data_engineering.final_dummy_data"
#df = pd.read_csv("/Workspace/final_imputed.csv")
df = spark.read.table(file_path)
display(df)



In [0]:
import sys
print(sys.path)
sys.path.insert(0, '/Workspace/Notebook&Files')

import config

print("Config imported successfully")

In [0]:
print(config.FULL_MODEL_NAME)

In [0]:
import logging
import time
import datetime
from io import BytesIO
from azure.storage.blob import BlobServiceClient

# Azure Storage account details
storage_account_name = "nistorage"
container_name = "dbnotebook-logs"  # Container for logs

# Connection string for Azure Blob Storage
connection_string = (
    f"DefaultEndpointsProtocol=https;"
    f"AccountName={storage_account_name};"
    f"AccountKey={access_key};"
    f"EndpointSuffix=core.windows.net"
)

# Custom logging handler for Azure Blob Storage
class AzureBlobHandler(logging.Handler):
    """
    Custom logging handler that writes log records to an Azure Blob Storage file.
    Inherits from logging.Handler, which is the base class for all log handlers in Python's logging module.
    Each time a log record is emitted, it appends the formatted log entry to the specified blob.
    """
    def __init__(self, connection_string, container_name, blob_name):
        super().__init__()
        self.blob_service_client = BlobServiceClient.from_connection_string(connection_string)
        self.container_name = container_name
        self.blob_name = blob_name
        
    def emit(self, record):
        """
        Emit a log record.
        Downloads the current blob content (if exists), appends the new log entry, and uploads it back.
        If the blob does not exist, it creates a new one.
        """
        try:
            log_entry = self.format(record) + '\n'
            blob_client = self.blob_service_client.get_blob_client(
                container=self.container_name, 
                blob=self.blob_name
            )
            try:
                existing_data = blob_client.download_blob().readall().decode('utf-8')
                new_data = existing_data + log_entry
            except Exception:
                new_data = log_entry
            blob_client.upload_blob(new_data.encode('utf-8'), overwrite=True)
        except Exception as e:
            print(f"Error writing log to blob: {e}")

# Create logger
file_date = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d')
blob_log_name = f'logs_model/custom_log_{file_date}.log'
print(f"Logging to: wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{blob_log_name}")

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

# âœ… FIX: Clear all existing handlers to prevent duplicate logs
logger.handlers.clear()

handler = AzureBlobHandler(connection_string, container_name, blob_log_name)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

print(f"Logger configured with {len(logger.handlers)} handler(s)")

# Test log
logger.info('Logger initialized and writing to Azure Blob Storage')

In [0]:
"""
Model Training Script for Revenue Forecasting - Databricks Unity Catalog
"""

#import logging
import traceback
from datetime import datetime
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
from sklearn.linear_model import RidgeCV
from sklearn.model_selection import TimeSeriesSplit
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error
from sklearn.preprocessing import StandardScaler

import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature

# import config

# logging.basicConfig(
#     level=logging.INFO,
#     format='%(asctime)s | %(levelname)s | %(message)s',
#     datefmt='%Y-%m-%d %H:%M:%S'
# )
# logger = logging.getLogger(__name__)


class FeatureEngineer:
    """Creates features for revenue prediction"""
    
    def __init__(self):
        self.feature_list = config.ALL_FEATURES
    
    def create_features(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, List[str]]:
        logger.info(f"[FEATURE] Creating features for {len(df)} rows")
        
        try:
            df_feat = df.copy().sort_values(['year', 'month_num']).reset_index(drop=True)
            
            if 'month_id' not in df_feat.columns:
                df_feat['month_id'] = df_feat['year'] * 100 + df_feat['month_num']
            
            # Static/Calendar
            df_feat['remaining_months'] = 13 - df_feat['month_num']
            df_feat['quarter'] = ((df_feat['month_num'] - 1) // 3) + 1
            df_feat['is_q4'] = (df_feat['quarter'] == 4).astype(int)
            df_feat['is_q2'] = (df_feat['quarter'] == 2).astype(int)
            df_feat['is_end_of_quarter'] = df_feat['month_num'].isin([3, 6, 9, 12]).astype(int)
            df_feat['is_quarter_start'] = df_feat['month_num'].isin([1, 4, 7, 10]).astype(int)
            df_feat['quarter_position'] = ((df_feat['month_num'] - 1) % 3) + 1
            
            # Past Year
            df_feat['ly_same_month_revenue'] = df_feat.groupby('month_num')['actual_revenue'].shift(1)
            df_feat['ly_same_qtr_avg'] = df_feat.groupby(['quarter'])['actual_revenue'].transform(
                lambda x: x.shift(3).rolling(3, min_periods=1).mean()
            )
            
            # Lag Features
            df_feat['revenue_lag_1'] = df_feat['actual_revenue'].shift(1)
            df_feat['revenue_lag_2'] = df_feat['actual_revenue'].shift(2)
            df_feat['revenue_lag_3'] = df_feat['actual_revenue'].shift(3)
            df_feat['revenue_3mo_avg'] = df_feat['actual_revenue'].shift(1).rolling(3, min_periods=1).mean()
            df_feat['revenue_velocity'] = df_feat['revenue_lag_1'] - df_feat['revenue_lag_2']
            df_feat['revenue_acceleration'] = df_feat['revenue_velocity'] - df_feat['revenue_velocity'].shift(1)
            
            # YTD
            df_feat['ytd_revenue'] = df_feat.groupby('year')['actual_revenue'].cumsum().shift(1)
            df_feat['ytd_avg'] = df_feat['ytd_revenue'] / (df_feat['month_num'] - 1).replace(0, 1)
            df_feat['perf_vs_ytd'] = ((df_feat['revenue_lag_1'] - df_feat['ytd_avg']) / 
                                      (df_feat['ytd_avg'] + 1e-10)).clip(-0.5, 0.5)
            
            # Quarter
            df_feat['quarter_cumulative'] = df_feat.groupby(['year', 'quarter'])['actual_revenue'].cumsum().shift(1)
            df_feat['prev_quarter_avg'] = df_feat['actual_revenue'].shift(1).rolling(3, min_periods=1).mean().shift(2)
            df_feat['last_quarter_end_rev'] = df_feat['actual_revenue'].shift(1).where(
                df_feat['month_num'].shift(1).isin([3, 6, 9, 12])
            ).ffill()
            df_feat['qoq_change'] = ((df_feat['revenue_3mo_avg'] - df_feat['prev_quarter_avg']) / 
                                    (df_feat['prev_quarter_avg'] + 1e-10)).clip(-0.5, 0.5)
            
            # Trend
            df_feat['revenue_6mo_avg'] = df_feat['actual_revenue'].shift(1).rolling(6, min_periods=1).mean()
            df_feat['trend_direction'] = np.sign(df_feat['revenue_3mo_avg'] - df_feat['revenue_6mo_avg'])
            
            # Forecast Remaining
            df_feat['fcst_total_rem'] = (df_feat['committed_sign_revenue'] + 
                                          df_feat['committed_unsig_revenue'] + 
                                          df_feat['wtd_pipeline_revenue'])
            df_feat['fcst_signed_rem'] = df_feat['committed_sign_revenue']
            df_feat['fcst_unsigned_rem'] = df_feat['committed_unsig_revenue']
            df_feat['fcst_pipeline_rem'] = df_feat['wtd_pipeline_revenue']
            df_feat['signed_per_month'] = df_feat['fcst_signed_rem'] / df_feat['remaining_months'].replace(0, 1)
            
            # Ratios
            df_feat['committed_ratio'] = df_feat['fcst_signed_rem'] / (df_feat['fcst_total_rem'] + 1e-10)
            df_feat['unsigned_ratio'] = df_feat['fcst_unsigned_rem'] / (df_feat['fcst_total_rem'] + 1e-10)
            df_feat['pipeline_quality'] = (
                df_feat['fcst_signed_rem'] * 1.0 + 
                df_feat['fcst_unsigned_rem'] * 0.7 +
                df_feat['fcst_pipeline_rem'] * 0.3
            ) / (df_feat['fcst_total_rem'] + 1e-10)
            
            # Handle infinities
            for col in df_feat.columns:
                if df_feat[col].dtype in [np.float64, np.float32]:
                    df_feat[col] = df_feat[col].replace([np.inf, -np.inf], np.nan)
            
            logger.info(f"[FEATURE] Created {len(self.feature_list)} features")
            return df_feat, self.feature_list
            
        except Exception as e:
            logger.error(f"[FEATURE] Error: {str(e)}")
            logger.error(traceback.format_exc())
            raise


class RevenueModelTrainer:
    """Revenue Forecasting Model Trainer"""
    
    def __init__(self, request_id: str = None):
        self.request_id = request_id or datetime.now().strftime("%Y%m%d_%H%M%S")
        self.feature_engineer = FeatureEngineer()
        self.model = None
        self.scaler = None
        self.selected_features = config.FINAL_FEATURES
        self.feature_medians = None
        self.metrics = {}
        self.X_train = None
        self.run_id = None
        
        logger.info(f"[{self.request_id}] Trainer initialized")
    
    def load_data(self, df: pd.DataFrame = None, file_path: str = None) -> pd.DataFrame:
        logger.info(f"[{self.request_id}] Loading data")
        
        if df is not None:
            data = df.copy()
        elif file_path:
        
            data = spark.read.table(file_path).toPandas()
            logger.info(f"[{self.request_id}] File: {file_path}")
            # data = pd.read_csv(file_path, index_col=0)
            # logger.info(f"[{self.request_id}] File: {file_path}")
        else:
            raise ValueError("Provide df or file_path")
        
        data = data.sort_values(['year', 'month_num']).reset_index(drop=True)
        data['month_id'] = data['year'] * 100 + data['month_num']
        
        logger.info(f"[{self.request_id}] Rows: {len(data)}, Years: {sorted(data['year'].unique())}")
        return data
    
    def prepare_data(self, df: pd.DataFrame, train_years: List[int] = None, test_year: int = None):
        logger.info(f"[{self.request_id}] Preparing data")
        
        df_feat, _ = self.feature_engineer.create_features(df)
        
        years = sorted(df_feat['year'].unique())
        train_years = train_years or years[:-1]
        test_year = test_year or years[-1]
        
        train_df = df_feat[df_feat['year'].isin(train_years)].dropna(subset=[config.TARGET])
        test_df = df_feat[df_feat['year'] == test_year].copy()
        
        logger.info(f"[{self.request_id}] Train: {len(train_df)} ({train_years}), Test: {len(test_df)} ({test_year})")
        return df_feat, train_df, test_df
    
    def train(self, df: pd.DataFrame = None, file_path: str = None) -> Dict[str, float]:
        """Train the model"""
        logger.info(f"[{self.request_id}] ========== TRAINING STARTED ==========")
        
        try:
            data = self.load_data(df, file_path)
            ##################
            # display(data)
            df_feat, train_df, test_df = self.prepare_data(data)
            # display(df_feat)
            # display(train_df)
            # display(test_df)
            X_test = test_df[self.selected_features].copy()
            y_test = test_df[config.TARGET].copy
            display(X_test)

            display(y_test)
            
            X_train = train_df[self.selected_features].copy()
            y_train = train_df[config.TARGET].copy()
            full_data = df_feat[self.selected_features + [config.TARGET]].copy()
            
            spark.createDataFrame(X_test).write.mode("overwrite").saveAsTable("fpnacopilot.machine_learning.testfeaturedata")
            spark.createDataFrame(X_train).write.mode("overwrite").saveAsTable("fpnacopilot.machine_learning.trainfeaturedata")
            spark.createDataFrame(full_data).write.mode("overwrite").saveAsTable("fpnacopilot.machine_learning.fullfeaturedata")



            
            self.feature_medians = X_train.median()
            X_train = X_train.fillna(self.feature_medians)
            self.X_train = X_train.copy()
            
            self.scaler = StandardScaler()
            X_train_scaled = self.scaler.fit_transform(X_train)
            
            tscv = TimeSeriesSplit(n_splits=config.CV_SPLITS)
            alphas = np.logspace(-1, 4, 50)
            
            self.model = RidgeCV(alphas=alphas, cv=tscv)
            self.model.fit(X_train_scaled, y_train)
            
            logger.info(f"[{self.request_id}] Model trained - Alpha: {self.model.alpha_:.2f}")
            
            # Metrics
            train_pred = self.model.predict(X_train_scaled)
            self.metrics = {
                'mape': mean_absolute_percentage_error(y_train, train_pred) * 100,
                'mae': mean_absolute_error(y_train, train_pred),
                'rmse': np.sqrt(mean_squared_error(y_train, train_pred))
            }
            
            logger.info(f"[{self.request_id}] MAPE: {self.metrics['mape']:.2f}%")
            logger.info(f"[{self.request_id}] ========== TRAINING COMPLETED ==========")
            
            return self.metrics
            
        except Exception as e:
            logger.error(f"[{self.request_id}] Training error: {str(e)}")
            logger.error(traceback.format_exc())
            raise
    
    def register_to_unity_catalog(self) -> str:
        """
        Register model to Unity Catalog
        
        Uses: mlflow.set_registry_uri("databricks-uc")
        Model path: {catalog}.{schema}.{model_name}
        """
        logger.info(f"[{self.request_id}] ========== REGISTERING TO UNITY CATALOG ==========")
        logger.info(f"[{self.request_id}] Model: {config.FULL_MODEL_NAME}")
        
        try:
            # Set Unity Catalog registry
            mlflow.set_registry_uri("databricks-uc")
            
            with mlflow.start_run() as run:
                # Create signature
                signature = infer_signature(self.X_train, self.model.predict(self.scaler.transform(self.X_train)))
                
                # Log model
                mlflow.sklearn.log_model(
                    sk_model=self.model,
                    artifact_path="model",
                    signature=signature,
                    input_example=self.X_train.head(5)
                )
                
                # Log scaler
                mlflow.sklearn.log_model(
                    sk_model=self.scaler,
                    artifact_path="scaler"
                )
                
                # Log metrics
                for key, val in self.metrics.items():
                    mlflow.log_metric(key, val)
                
                # Log params
                mlflow.log_param("alpha", self.model.alpha_)
                mlflow.log_param("n_features", len(self.selected_features))
                mlflow.log_param("features", str(self.selected_features))
                
                # Log artifacts
                mlflow.log_dict({"features": self.selected_features}, "feature_names.json")
                mlflow.log_dict(self.feature_medians.to_dict(), "feature_medians.json")
                
                # Register to Unity Catalog
                model_uri = f"runs:/{run.info.run_id}/model"
                mlflow.register_model(model_uri=model_uri, name=config.FULL_MODEL_NAME)
                
                self.run_id = run.info.run_id
            
            logger.info(f"[{self.request_id}] Run ID: {self.run_id}")
            logger.info(f"[{self.request_id}] Registered to: {config.FULL_MODEL_NAME}")
            logger.info(f"[{self.request_id}] ========== REGISTRATION COMPLETED ==========")
            
            return self.run_id
            
        except Exception as e:
            logger.error(f"[{self.request_id}] Registration error: {str(e)}")
            logger.error(traceback.format_exc())
            raise
    
    def train_and_register(self, df: pd.DataFrame = None, file_path: str = None) -> Dict:
        """Train and register in one step"""
        self.train(df=df, file_path=file_path)
        run_id = self.register_to_unity_catalog()
        
        return {
            'run_id': run_id,
            'model_name': config.FULL_MODEL_NAME,
            'metrics': self.metrics
        }


In [0]:

# In Databricks notebook:

#from model_training import RevenueModelTrainer


# Option 1: Train and register in one step
trainer = RevenueModelTrainer(request_id="training_001")

result = trainer.train_and_register(file_path = config.file_path)

print(f"Run ID: {result['run_id']}")
print(f"Model: {result['model_name']}")
print(f"MAPE: {result['metrics']['mape']:.2f}%")


In [0]:
print(model.predict([[2140805032.76,	12,	508515179.11,	-0.5,	1,	127898772.13,	-94471806.81,	222370578.94,	-158596557.71,	71536376.66416667,	-0.0747306702941992,	0,	-1,	158245828.04,	0]]))

In [0]:
from datetime import datetime
import uuid
import json
import mlflow

# Example inference input and output
databricks_request_id = str(uuid.uuid4())
request_time = datetime.utcnow().isoformat()
request = {
    "features": [2140805032.76, 12, 508515179.11, -0.5, 1, 127898772.13, -94471806.81, 222370578.94, -158596557.71, 71536376.66416667, -0.0747306702941992, 0, -1, 158245828.04, 0]
}

model = mlflow.sklearn.load_model("models:/fpnacopilot.ml_model.revenue_forecasting_model_v2/1")
                                
try:
    prediction = model.predict([request["features"]]).tolist()
    response = {"prediction": prediction}
    status_code = 200
except Exception as e:
    response = {"error": str(e)}
    # Example: set 400 for bad request, 403 for forbidden, etc. Here, default to 400 for inference error
    status_code = 400

# Prepare as Spark DataFrame
log_row = [(databricks_request_id, request_time, status_code, json.dumps(request), json.dumps(response))]
columns = ["databricks_request_id", "request_time", "status_code", "request", "response"]
log_df = spark.createDataFrame(log_row, columns)

# Append to Delta table (create if not exists)
log_df.write.mode("append").option("mergeSchema", "true").saveAsTable("fpnacopilot.machine_learning.inference_log")

display(log_df)

In [0]:
# import mlflow
# import time, json, uuid
# from datetime import datetime
 
# def predict_with_mlflow_logging(model, df):
#     request_id = str(uuid.uuid4())
#     user = spark.sql("select current_user()").collect()[0][0]
 
#     with mlflow.start_run(run_name=f"inference_{request_id}", nested=True):
#         start = time.time()
#         preds = model.predict(df)
#         end = time.time()
 
#         # Log metadata
#         mlflow.set_tag("request_id", request_id)
#         mlflow.set_tag("requested_by", user)
 
#         # Log metrics
#         mlflow.log_metric("latency_ms", int((end - start) * 1000))
#         mlflow.log_metric("rows", len(df))
 
#         # Log payloads as artifacts
#         mlflow.log_text(json.dumps(df), "input.json")
#         mlflow.log_text(json.dumps(preds.tolist()), "output.json")
 
#     return preds
 
 
# model = mlflow.sklearn.load_model("models:/fpnacopilot.ml_model.revenue_forecasting_model_v2/1")
 
# preds = predict_with_mlflow_logging(model, [[2140805032.76, 12, 508515179.11, -0.5, 1, 127898772.13, -94471806.81, 222370578.94, -158596557.71, 71536376.66416667, -0.0747306702941992, 0, -1, 158245828.04, 0]])
# print(preds)


In [0]:
[2140805032.76,	12,	508515179.11,	-0.5,	1,	127898772.13,	-94471806.81,	222370578.94,	-158596557.71,	71536376.66416667,	-0.0747306702941992,	0,	-1,	158245828.04,	0]

In [0]:
"""
Model Loading Utilities for Inference - Unity Catalog
"""

#import logging
import json

import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient

#import config

# logging.basicConfig(
#     level=logging.INFO,
#     format='%(asctime)s | %(levelname)s | %(message)s',
#     datefmt='%Y-%m-%d %H:%M:%S'
# )
# logger = logging.getLogger(__name__)


def load_model(version: str = None):
    """
    Load model from Unity Catalog
    
    Args:
        version: Model version (latest if None)
    
    Returns:
        Loaded sklearn model
    """
    mlflow.set_registry_uri("databricks-uc")
    
    if version:
        model_uri = f"models:/{config.FULL_MODEL_NAME}/{version}"
    else:
        model_uri = f"models:/{config.FULL_MODEL_NAME}@latest"
    
    logger.info(f"Loading model: {model_uri}")
    model = mlflow.sklearn.load_model(model_uri)
    
    
    return model


def load_scaler(run_id: str):
    """Load scaler from run"""
    scaler_uri = f"runs:/{run_id}/scaler"
    
    logger.info(f"Loading scaler from run: {run_id}")
    scaler = mlflow.sklearn.load_model(scaler_uri)
    
    return scaler


def load_artifacts(run_id: str):
    """Load feature names and medians"""
    client = MlflowClient()
    
    path = client.download_artifacts(run_id, "feature_names.json")
    with open(path, 'r') as f:
        features = json.load(f)['features']
    
    path = client.download_artifacts(run_id, "feature_medians.json")
    with open(path, 'r') as f:
        medians = json.load(f)
    
    logger.info(f"Loaded {len(features)} features")
    return features, medians


def list_versions():
    """List all model versions"""
    mlflow.set_registry_uri("databricks-uc")
    client = MlflowClient()
    
    versions = client.search_model_versions(f"name='{config.FULL_MODEL_NAME}'")
    
    print(f"\nModel: {config.FULL_MODEL_NAME}")
    print("-" * 50)
    for v in versions:
        print(f"Version {v.version}: Run={v.run_id[:8]}...")
    
    return versions


# =============================================================================
# DATABRICKS NOTEBOOK USAGE (FOR INFERENCE)
# =============================================================================


In [0]:
import os
import requests
import numpy as np
import pandas as pd
import json

# Feature names in the correct order (must match model training)
FEATURE_NAMES = [
    'ytd_revenue',
    'remaining_months', 
    'quarter_cumulative',
    'perf_vs_ytd',
    'is_quarter_start',
    'revenue_lag_1',
    'revenue_velocity',
    'revenue_lag_2',
    'revenue_acceleration',
    'signed_per_month',
    'qoq_change',
    'is_q2',
    'trend_direction',
    'revenue_lag_3',
    'is_q4'
]


def create_dataframe_records(data):
    """
    Convert list of lists to dataframe_records format with column names.
    This is required for MLflow model serving endpoints.
    """
    if isinstance(data, pd.DataFrame):
        return data[FEATURE_NAMES].to_dict(orient='records')
    elif isinstance(data, np.ndarray):
        df = pd.DataFrame(data, columns=FEATURE_NAMES)
        return df.to_dict(orient='records')
    elif isinstance(data, list):
        df = pd.DataFrame(data, columns=FEATURE_NAMES)
        return df.to_dict(orient='records')
    else:
        raise TypeError(f"Unsupported data type: {type(data)}")


def score_model(dataset):
    """
    Score data using the MLflow model serving endpoint.
    
    Args:
        dataset: Can be a DataFrame, numpy array, or list of lists
    
    Returns:
        Predictions from the model
    """
    url = 'https://adb-jdskd.15.azuredatabricks.net/serving-endpoints/revenue_forecasting_model_v2/invocations'
    headers = {'Authorization': f'Bearer dkshdhska', 'Content-Type': 'application/json'}

    # Convert to dataframe_records format with column names
    if isinstance(dataset, pd.DataFrame):
        records = dataset[FEATURE_NAMES].to_dict(orient='records')
    else:
        records = create_dataframe_records(dataset)
    
    # Use dataframe_records format - this includes column names!
    ds_dict = {'dataframe_records': records}

    data_json = json.dumps(ds_dict, allow_nan=True)

    response = requests.request(method='POST', headers=headers, url=url, data=data_json)

    if response.status_code != 200:
        raise Exception(f'Request failed with status {response.status_code}, {response.text}')

    return response.json()


if __name__ == "__main__":

    # Test data as list of lists (values in same order as FEATURE_NAMES)
    test_values = [

        [2140805032.76,	12,	508515179.11,	-0.5,	1,	127898772.13,	-94471806.81,	222370578.94,	-158596557.71,	71536376.66416667,	-0.0747306702941992,	0,	-1,	158245828.04,	0]
    ]

    # Convert to DataFrame with proper column names
    test_df = pd.DataFrame(test_values, columns=FEATURE_NAMES)
    
    # Create payload with dataframe_records format (includes column names!)
    test_data = {
        "dataframe_records": test_df.to_dict(orient='records')
    }

    # Send the test data to the endpoint
    url = 'https://adb-dshkds.15.azuredatabricks.net/serving-endpoints/revenue_forecasting_model_v2/invocations'

    headers = {'Authorization': f'Bearer dkshds', 'Content-Type': 'application/json'}

    data_json = json.dumps(test_data, allow_nan=True)

    data_json = json.dumps(test_data, allow_nan=True)

    response = requests.request(method='POST', headers=headers, url=url, data=data_json)

    if response.status_code != 200:
        print(f'Request failed with status {response.status_code}, {response.text}')
    else:
        predictions = response.json()
        print("Predictions:")
        print(json.dumps(predictions, indent=2))
 
 

In [0]:
import mlflow.sklearn

model = mlflow.sklearn.load_model("models:/fpnacopilot.ml_model.revenue_forecasting_model_v2/1")


In [0]:
print(model.coef_)
print(type(model).__name__)
print(train_df)
print(getattr(model, 'feature_names_in_', 'Attribute not available'))

In [0]:
[-56467.88336473   7575.74195926 -45804.68240135  60959.54863936
 -14946.88452612   7270.59006503  17913.58622299 -17655.75863062
  -6440.9282813   -8101.91594051  -2482.72487924   6990.42243696
  12716.8593275  -46442.46818755  23188.67997799]

In [0]:

import json
# import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from dataclasses import dataclass

import numpy as np
import pandas as pd

# Configure logging
# logging.basicConfig(
#     level=logging.INFO,
#     format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# )
# logger = logging.getLogger(__name__)

# =============================================================================
# Configuration
# =============================================================================

@dataclass
class InterpretabilityConfig:
    """Configuration for SHAP and LIME analysis."""
    shap_max_samples: int = 100  # Max samples for SHAP background
    shap_explainer_type: str = "auto"  # auto, tree, kernel, linear
    lime_num_features: int = 10
    lime_num_samples: int = 5000
    top_n_features: int = 10  # Top N features to include in summary


# =============================================================================
# Model Loading
# =============================================================================

def load_model_and_data():
    """
    PLACEHOLDER: Replace this function with your actual model and data loading code.
    
    Returns:
        model: Trained ML model object
        model_type: String name of model type (e.g., "RandomForestRegressor")
        feature_names: List of feature names
        preprocessor: Optional preprocessor/transformer pipeline
        feature_selector: Optional feature selector
        X_train: Training features (numpy array or DataFrame)
        X_explain: Features to explain (numpy array or DataFrame)
        y_train: Training target (optional)
        y_explain: Target to explain (optional)
        df_data: Full DataFrame with data for statistics
        target_col: Name of target column
        model_metrics: Dict with 'rmse', 'r2', 'mape' keys
        months_lookback: Integer, number of months analyzed
    """
    # TODO: Replace this placeholder with your actual code
    # Example structure:
    model_type = type(model).__name__
    feature_names = config.FINAL_FEATURES
    X_train = spark.read.table("fpnacopilot.machine_learning.trainfeaturedata").toPandas().fillna(0)
    X_explain = spark.read.table("fpnacopilot.machine_learning.testfeaturedata").toPandas().iloc[:1].fillna(0)  # Data to explain

    df_data = spark.read.table("fpnacopilot.machine_learning.fullfeaturedata").toPandas().fillna(0) # Full DataFrame
    model_metrics = result['metrics']
    target_col = config.TARGET
    preprocessor = None
    feature_selector = None
    y_train = None
    y_explain = None
    months_lookback = None

    return  model, model_type, feature_names, preprocessor, feature_selector,X_train, X_explain, y_train, y_explain,df_data, target_col, model_metrics, months_lookback

    
    raise NotImplementedError(
        "Please replace this function with your actual model and data loading code."
    )




In [0]:
load_model_and_data()

In [0]:

# =============================================================================
# Feature Importance Extraction
# =============================================================================

class FeatureImportanceExtractor:
    """Extracts feature importance from trained models (from reference file)."""
    
    def __init__(self, model: Any, feature_names: List[str]):
        """
        Initialize with model and feature names.
        
        Args:
            model: Trained ML model
            feature_names: List of feature names
        """
        self.model = model
        self.feature_names = feature_names
    
    def get_feature_importance(self) -> pd.DataFrame:
        """
        Extract feature importance from the model.
        
        Returns:
            DataFrame with feature names and importance scores
        """
        importance_values = self._extract_importance()
        
        if importance_values is None:
            logger.warning("Could not extract feature importance from model")
            return pd.DataFrame()
        
        # Create DataFrame
        df = pd.DataFrame({
            'feature': self.feature_names[:len(importance_values)],
            'importance': importance_values
        })
        
        # Sort by importance and add rank
        df = df.sort_values('importance', ascending=False).reset_index(drop=True)
        df['rank'] = range(1, len(df) + 1)
        
        return df
    
    def _extract_importance(self) -> Optional[np.ndarray]:
        """Extract importance values based on model type."""
        model = self.model
        
        # Tree-based models (RF, GBR, XGBoost, LightGBM, CatBoost)
        if hasattr(model, 'feature_importances_'):
            return model.feature_importances_
        
        # Linear models (coefficients)
        if hasattr(model, 'coef_'):
            return np.abs(model.coef_).flatten()
        
        # Try to get from nested estimator
        if hasattr(model, 'estimator_') and hasattr(model.estimator_, 'feature_importances_'):
            return model.estimator_.feature_importances_
        
        return None
    
    def get_top_features(self, n: int = 10) -> List[Tuple[str, float]]:
        """Get top N most important features."""
        importance_df = self.get_feature_importance()
        
        if importance_df.empty:
            return []
        
        top = importance_df.head(n)
        return list(zip(top['feature'], top['importance']))
    
    def importance_summary(self) -> Dict[str, Any]:
        """Generate a summary of feature importance."""
        importance_df = self.get_feature_importance()
        
        if importance_df.empty:
            return {"error": "Could not extract feature importance"}
        
        return {
            "total_features": len(importance_df),
            "top_10_features": self.get_top_features(10),
            "importance_concentration": {
                "top_5_pct": importance_df.head(5)['importance'].sum() / importance_df['importance'].sum() * 100,
                "top_10_pct": importance_df.head(10)['importance'].sum() / importance_df['importance'].sum() * 100,
            },
            "full_importance_table": importance_df.to_dict('records')
        }


In [0]:

# =============================================================================
# SHAP Analysis Module
# =============================================================================

class SHAPAnalyzer:
    """Performs SHAP (SHapley Additive exPlanations) analysis."""
    
    def __init__(
        self, 
        model: Any, 
        feature_names: List[str],
        config: InterpretabilityConfig
    ):
        self.model = model
        self.feature_names = feature_names
        self.config = config
        self._explainer = None
        self._shap_values = None
        self._background_data = None
    
    def _create_explainer(self, X_background: np.ndarray):
        """Create appropriate SHAP explainer based on model type."""
        try:
            import shap
        except ImportError:
            raise ImportError(
                "shap package not installed. Install with: pip install shap"
            )
        
        model_name = type(self.model).__name__
        explainer_type = self.config.shap_explainer_type
        
        # Sample background data if too large
        if len(X_background) > self.config.shap_max_samples:
            indices = np.random.choice(
                len(X_background), 
                self.config.shap_max_samples, 
                replace=False
            )
            X_background = X_background[indices]
        
        self._background_data = X_background
        
        logger.info(f"Creating SHAP explainer for {model_name}")
        
        # Auto-select explainer type
        if explainer_type == "auto":
            # Tree-based models
            if model_name in ['RandomForestRegressor', 'GradientBoostingRegressor', 
                             'XGBRegressor', 'LGBMRegressor', 'CatBoostRegressor',
                             'ExtraTreesRegressor', 'DecisionTreeRegressor']:
                explainer_type = "tree"
            # Linear models
            elif model_name in ['LinearRegression', 'Ridge', 'Lasso', 
                               'ElasticNet', 'BayesianRidge', 'HuberRegressor']:
                explainer_type = "linear"
            else:
                explainer_type = "kernel"
        
        # Create explainer
        if explainer_type == "tree":
            self._explainer = shap.TreeExplainer(self.model)
        elif explainer_type == "linear":
            self._explainer = shap.LinearExplainer(self.model, X_background)
        else:
            # Kernel explainer as fallback (slower but universal)
            self._explainer = shap.KernelExplainer(
                self.model.predict, 
                shap.sample(X_background, min(100, len(X_background)))
            )
        
        return self._explainer
    
    def compute_shap_values(
        self, 
        X_train: np.ndarray, 
        X_explain: np.ndarray
    ) -> np.ndarray:
        """
        Compute SHAP values for explanation data.
        
        Args:
            X_train: Training data for background
            X_explain: Data to explain
            
        Returns:
            SHAP values array
        """
        if self._explainer is None:
            self._create_explainer(X_train)
        
        logger.info(f"Computing SHAP values for {len(X_explain)} samples")
        
        # Compute SHAP values
        self._shap_values = self._explainer.shap_values(X_explain)
        
        # Handle different SHAP value formats
        if isinstance(self._shap_values, list):
            self._shap_values = self._shap_values[0]
        
        return self._shap_values
    
    def get_global_importance(self) -> pd.DataFrame:
        """Get global feature importance from SHAP values."""
        if self._shap_values is None:
            raise ValueError("SHAP values not computed. Call compute_shap_values first.")
        
        # Mean absolute SHAP values
        mean_abs_shap = np.abs(self._shap_values).mean(axis=0)
        
        df = pd.DataFrame({
            'feature': self.feature_names[:len(mean_abs_shap)],
            'shap_importance': mean_abs_shap
        })
        
        return df.sort_values('shap_importance', ascending=False).reset_index(drop=True)
    
    def generate_summary(self) -> Dict[str, Any]:
        """Generate comprehensive SHAP analysis summary."""
        global_importance = self.get_global_importance()
        
        return {
            "method": "SHAP",
            "samples_analyzed": len(self._shap_values) if self._shap_values is not None else 0,
            "top_features": global_importance.head(self.config.top_n_features).to_dict('records'),
            "feature_importance_distribution": {
                "mean": float(global_importance['shap_importance'].mean()),
                "std": float(global_importance['shap_importance'].std()),
                "max": float(global_importance['shap_importance'].max()),
                "min": float(global_importance['shap_importance'].min()),
            },
            "interpretation": self._generate_interpretation(global_importance)
        }
    
    def _generate_interpretation(self, importance_df: pd.DataFrame) -> str:
        """Generate human-readable interpretation."""
        top = importance_df.head(3)
        features = top['feature'].tolist()
        
        interpretation = f"The top 3 most influential features are: {', '.join(features)}. "
        
        # Check for feature concentration
        top_5_pct = importance_df.head(5)['shap_importance'].sum() / importance_df['shap_importance'].sum() * 100
        
        if top_5_pct > 70:
            interpretation += f"These features show high concentration ({top_5_pct:.1f}% in top 5), suggesting the model relies heavily on a few key predictors."
        else:
            interpretation += f"Feature importance is distributed across multiple features ({top_5_pct:.1f}% in top 5)."
        
        return interpretation



In [0]:

# =============================================================================
# LIME Analysis Module
# =============================================================================

class LIMEAnalyzer:
    """Performs LIME (Local Interpretable Model-agnostic Explanations) analysis."""
    
    def __init__(
        self, 
        model: Any, 
        feature_names: List[str],
        config: InterpretabilityConfig
    ):
        self.model = model
        self.feature_names = feature_names
        self.config = config
        self._explainer = None
        self._explanations = []
    
    def _create_explainer(self, X_train: np.ndarray):
        """Create LIME explainer."""
        try:
            from lime import lime_tabular
        except ImportError:
            raise ImportError(
                "lime package not installed. Install with: pip install lime"
            )
        
        self._explainer = lime_tabular.LimeTabularExplainer(
            training_data=X_train,
            feature_names=self.feature_names,
            mode='regression',
            verbose=False
        )
        
        return self._explainer
    
    def explain_instances(
        self, 
        X_train: np.ndarray,
        X_explain: np.ndarray,
        num_samples: Optional[int] = None
    ) -> List[Dict[str, Any]]:
        """
        Generate LIME explanations for multiple instances.
        
        Args:
            X_train: Training data
            X_explain: Instances to explain
            num_samples: Number of instances to explain (default: all)
            
        Returns:
            List of explanation dictionaries
        """
        if self._explainer is None:
            self._create_explainer(X_train)
        
        if num_samples is None or num_samples > len(X_explain):
            num_samples = min(len(X_explain), 50)  # Limit for performance
        
        logger.info(f"Generating LIME explanations for {num_samples} instances")
        
        self._explanations = []
        
        for i in range(num_samples):
            try:
                exp = self._explainer.explain_instance(
                    X_explain[i],
                    self.model.predict,
                    num_features=self.config.lime_num_features,
                    num_samples=self.config.lime_num_samples
                )
                
                self._explanations.append({
                    'instance_idx': i,
                    'prediction': self.model.predict(X_explain[i:i+1])[0],
                    'local_explanation': dict(exp.as_list()),
                    'intercept': exp.intercept[0] if hasattr(exp, 'intercept') else None,
                    'score': exp.score if hasattr(exp, 'score') else None
                })
            except Exception as e:
                logger.warning(f"Failed to explain instance {i}: {e}")
        
        return self._explanations
    
    def aggregate_explanations(self) -> pd.DataFrame:
        """Aggregate local explanations to get global feature importance."""
        if not self._explanations:
            raise ValueError("No explanations computed. Call explain_instances first.")
        
        # Collect all feature contributions
        all_contributions = {}
        
        for exp in self._explanations:
            for feature, contribution in exp['local_explanation'].items():
                if feature not in all_contributions:
                    all_contributions[feature] = []
                all_contributions[feature].append(abs(contribution))
        
        # Calculate mean absolute contribution
        aggregated = {
            'feature': list(all_contributions.keys()),
            'lime_importance': [np.mean(v) for v in all_contributions.values()],
            'lime_std': [np.std(v) for v in all_contributions.values()]
        }
        
        df = pd.DataFrame(aggregated)
        return df.sort_values('lime_importance', ascending=False).reset_index(drop=True)
    
    def generate_summary(self) -> Dict[str, Any]:
        """Generate comprehensive LIME analysis summary."""
        aggregated = self.aggregate_explanations()
        
        # Identify consistent vs variable features
        consistent_features = aggregated[
            aggregated['lime_std'] / aggregated['lime_importance'] < 0.5
        ]['feature'].tolist()[:5]
        
        variable_features = aggregated[
            aggregated['lime_std'] / aggregated['lime_importance'] >= 0.5
        ]['feature'].tolist()[:5]
        
        return {
            "method": "LIME",
            "instances_explained": len(self._explanations),
            "top_features": aggregated.head(self.config.top_n_features).to_dict('records'),
            "consistent_features": consistent_features,
            "variable_features": variable_features,
            "model_fidelity": {
                "mean_score": np.mean([e.get('score', 0) for e in self._explanations if e.get('score')]),
            },
            "interpretation": self._generate_interpretation(aggregated)
        }
    
    def _generate_interpretation(self, aggregated: pd.DataFrame) -> str:
        """Generate human-readable interpretation."""
        top = aggregated.head(3)
        
        interpretation = f"LIME analysis reveals that {top['feature'].iloc[0]} has the highest local importance "
        interpretation += f"(avg contribution: {top['lime_importance'].iloc[0]:.4f}). "
        
        # Check consistency
        if len(aggregated) > 0:
            high_variability = aggregated[
                aggregated['lime_std'] > aggregated['lime_importance'] * 0.5
            ]
            if len(high_variability) > 0:
                interpretation += f"Features with high variability across instances: {', '.join(high_variability['feature'].head(3).tolist())}."
        
        return interpretation


In [0]:

# =============================================================================
# Data Statistics Helper
# =============================================================================

def analyze_data_patterns(df: pd.DataFrame, target_col: str) -> Dict[str, Any]:
    """
    Analyze patterns in the data (matching reference implementation).
    
    Args:
        df: DataFrame with data
        target_col: Name of target column
        
    Returns:
        Dictionary with data statistics
    """
    if df is None or df.empty:
        return {}
    
    stats = {
        "total_records": len(df),
        "num_features": len(df.columns),
        "missing_values": int(df.isnull().sum().sum())
    }
    
    if target_col in df.columns:
        target = df[target_col]
        stats["target_stats"] = {
            "mean": float(target.mean()),
            "std": float(target.std()),
            "min": float(target.min()),
            "max": float(target.max()),
            "trend": "Increasing" if target.tail(10).mean() > target.mean() else "Decreasing"
        }
    
    return stats


In [0]:
# =============================================================================
# Blob Storage Helper (Databricks)
# =============================================================================

def save_to_blob_storage(
    container_name: str,
    blob_name: str,
    data: Dict[str, Any],
    storage_account: str = None,
    account_key: str = None
):
    """
    Save JSON data to Azure Blob Storage using BlobServiceClient.
    
    Args:
        container_name: Name of the blob container
        blob_name: Name of the blob file
        data: Dictionary to save as JSON
        storage_account: Azure Storage account name (required)
        account_key: Azure Storage account key (required)
    """
    try:
        # Convert to JSON string
        json_str = json.dumps(data, indent=2, default=str)
        
        # Validate required parameters
        if not storage_account or not account_key:
            raise ValueError("storage_account and account_key are required")
        
        # Use Azure Storage SDK with BlobServiceClient
        from azure.storage.blob import BlobServiceClient, ContentSettings
        
        connection_string = (
            f"DefaultEndpointsProtocol=https;"
            f"AccountName={storage_account};"
            f"AccountKey={account_key};"
            f"EndpointSuffix=core.windows.net"
        )
        
        blob_service_client = BlobServiceClient.from_connection_string(connection_string)
        container_client = blob_service_client.get_container_client(container_name)
        blob_client = container_client.get_blob_client(blob_name)
        
        blob_client.upload_blob(
            json_str,
            overwrite=True,
            content_settings=ContentSettings(content_type="application/json")
        )
        logger.info(f"Saved to blob storage: {container_name}/{blob_name}")
            
    except Exception as e:
        logger.error(f"Error saving to blob storage: {e}", exc_info=True)
        raise

In [0]:

# =============================================================================
# Main Pipeline
# =============================================================================

def run_shap_lime_analysis(
    year: int,
    month: int,
    shap_container: str = "shap-results",
    lime_container: str = "lime-results",
    storage_account: str = None,
    account_key: str = None,
    config: InterpretabilityConfig = None
):
    """
    Main function to run SHAP and LIME analysis and save results to blob storage.
    
    Args:
        year: Year for analysis (e.g., 2024)
        month: Month for analysis (1-12)
        shap_container: Azure Blob Storage container for SHAP results
        lime_container: Azure Blob Storage container for LIME results
        storage_account: Azure Storage account name
        account_key: Azure Storage account key
        config: InterpretabilityConfig object (uses defaults if None)
    """
    if not (1 <= month <= 12):
        raise ValueError(f"Invalid month: {month}. Must be between 1 and 12")
    
    if config is None:
        config = InterpretabilityConfig()
    
    logger.info(f"Starting SHAP and LIME analysis for {year}-{month:02d}")
    
    # Step 1: Load model and data (PLACEHOLDER - replace with your code)
    logger.info("Loading model and data...")
    (
        model, model_type, feature_names, preprocessor, feature_selector,
        X_train, X_explain, y_train, y_explain,
        df_data, target_col, model_metrics, months_lookback
    ) = load_model_and_data()


    
    # Ensure X_train and X_explain are numpy arrays
    if isinstance(X_train, pd.DataFrame):
        X_train = X_train.values
    if isinstance(X_explain, pd.DataFrame):
        X_explain = X_explain.values
    
    # Apply preprocessing if needed
    if preprocessor is not None:
        X_train = preprocessor.transform(X_train) if hasattr(preprocessor, 'transform') else X_train
        X_explain = preprocessor.transform(X_explain) if hasattr(preprocessor, 'transform') else X_explain
    
    # Apply feature selection if needed
    if feature_selector is not None:
        X_train = feature_selector.transform(X_train) if hasattr(feature_selector, 'transform') else X_train
        X_explain = feature_selector.transform(X_explain) if hasattr(feature_selector, 'transform') else X_explain
    
    # Step 2: Extract feature importance
    logger.info("Extracting feature importance from model...")
    feature_extractor = FeatureImportanceExtractor(model, feature_names)
    feature_importance_summary = feature_extractor.importance_summary()
    
    # Step 3: Analyze data patterns
    logger.info("Analyzing data patterns...")
    data_stats = analyze_data_patterns(df_data, target_col)
    
    # Step 4: Run SHAP analysis
    logger.info("Running SHAP analysis...")
    shap_analyzer = SHAPAnalyzer(model, feature_names, config)
    shap_analyzer.compute_shap_values(X_train, X_explain)
    shap_summary = shap_analyzer.generate_summary()
    
    # Step 5: Run LIME analysis
    logger.info("Running LIME analysis...")
    lime_analyzer = LIMEAnalyzer(model, feature_names, config)
    lime_analyzer.explain_instances(X_train, X_explain)
    lime_summary = lime_analyzer.generate_summary()
    
    # Step 6: Prepare metadata
    metadata = {
        "model_type": model_type,
        "target_variable": target_col,
        "months_lookback": months_lookback,
        "total_records": len(df_data) if df_data is not None else 0,
        "num_features": len(feature_names),
        "feature_importance": {
            "top_10_features": feature_importance_summary.get("top_10_features", [])
        },
        "data_stats": data_stats
    }
    
    # Step 7: Build complete SHAP result with metadata
    shap_result = {
        **shap_summary,
        "metadata": metadata,
        "model_metrics": model_metrics,
        "feature_importance": {
            "top_10_features": feature_importance_summary.get("top_10_features", [])
        },
        "data_stats": data_stats
    }
    
    # Step 8: Build complete LIME result with metadata
    lime_result = {
        **lime_summary,
        "metadata": metadata,
        "model_metrics": model_metrics,
        "feature_importance": {
            "top_10_features": feature_importance_summary.get("top_10_features", [])
        },
        "data_stats": data_stats
    }
    
    # Step 9: Save to blob storage
    shap_blob_name = f"shap_{year}_{month:02d}.json"
    lime_blob_name = f"lime_{year}_{month:02d}.json"
    
    logger.info(f"Saving SHAP results to {shap_container}/{shap_blob_name}...")
    save_to_blob_storage(
        shap_container,
        shap_blob_name,
        shap_result,
        storage_account,
        account_key
    )
    
    logger.info(f"Saving LIME results to {lime_container}/{lime_blob_name}...")
    save_to_blob_storage(
        lime_container,
        lime_blob_name,
        lime_result,
        storage_account,
        account_key
    )
    
    logger.info(f"Successfully completed SHAP and LIME analysis for {year}-{month:02d}")
    logger.info(f"SHAP results saved to: {shap_container}/{shap_blob_name}")
    logger.info(f"LIME results saved to: {lime_container}/{lime_blob_name}")
    
    return shap_result, lime_result


# =============================================================================
# Example Usage
# =============================================================================

if __name__ == "__main__":
    # Example configuration
    YEAR = 2024
    MONTH = 1
    SHAP_CONTAINER = "shap-results"
    LIME_CONTAINER = "lime-results"
    STORAGE_ACCOUNT = "gtairnistorage"  # Replace with actual storage account
    ACCOUNT_KEY = access_key  # Optional, can use dbutils secrets instead
    
    # Run analysis
    try:
        shap_result, lime_result = run_shap_lime_analysis(
            year=YEAR,
            month=MONTH,
            shap_container=SHAP_CONTAINER,
            lime_container=LIME_CONTAINER,
            storage_account=STORAGE_ACCOUNT,
            account_key=ACCOUNT_KEY
        )
        print("Analysis completed successfully!")
    except NotImplementedError as e:
        print(f"ERROR: {e}")
        print("Please implement the load_model_and_data() function with your model loading code.")
    except Exception as e:
        print(f"ERROR: {e}")
        raise
