In [0]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from datetime import datetime, timedelta
import mlflow
import mlflow.sklearn
from mlflow.models.signature import infer_signature
import warnings
warnings.filterwarnings('ignore')

# For Databricks
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# Initialize Spark session (already available in Databricks)
spark = SparkSession.builder.appName("MedicalOrderPrediction").getOrCreate()

def load_and_prepare_data(sample_fraction=None, max_rows=100000):
    """Load the medical orders data from Databricks table and prepare it for modeling"""
    
    # Read data from Databricks table
    df_spark = spark.table("mma_fe_innovation.mma.medical_orders_silver")
    
    # Check the size first
    total_count = df_spark.count()
    print(f"Total records in medical_orders_silver: {total_count:,}")
    
    # Apply sampling or filtering if dataset is too large
    if total_count > max_rows:
        print(f"Dataset is large ({total_count:,} rows). Applying sampling/filtering...")
        
        if sample_fraction is None:
            # Calculate sample fraction to get approximately max_rows
            sample_fraction = min(0.5, max_rows / total_count)
        
        print(f"Sampling {sample_fraction:.3f} of the data...")
        df_spark = df_spark.sample(fraction=sample_fraction, seed=42)
        
        # Also limit by recent dates to get more relevant data
        df_spark = (df_spark
                   .orderBy(F.desc("order_date"))
                   .limit(max_rows))
    
    # Convert to Pandas for easier manipulation
    df = df_spark.toPandas()
    
    print(f"Loaded {len(df):,} records for analysis")
    print(f"Date range: {df['order_date'].min()} to {df['order_date'].max()}")
    print(f"Unique devices: {df['device_name'].nunique()}")
    print(f"Unique retailers: {df['retailer_name'].nunique()}")
    
    return df

def engineer_features(df, target_device=None, min_orders=5):
    """Create features for the linear regression model"""
    
    # Convert order_date to datetime
    df['order_date'] = pd.to_datetime(df['order_date'])
    
    # If no specific device is specified, find devices with enough data
    if target_device is None:
        device_counts = df['device_name'].value_counts()
        # Filter devices with at least min_orders
        valid_devices = device_counts[device_counts >= min_orders]
        
        if len(valid_devices) == 0:
            print(f"No devices found with at least {min_orders} orders. Using device with most orders.")
            target_device = device_counts.index[0]
        else:
            target_device = valid_devices.index[0]
            
        print(f"Selected device: '{target_device}' ({device_counts[target_device]} orders)")
    
    # Filter for the target device
    device_df = df[df['device_name'] == target_device].copy()
    
    if len(device_df) < min_orders:
        print(f"Warning: Only {len(device_df)} orders found for device '{target_device}'. Model may not be reliable.")
    
    # Create time-based features
    device_df['year'] = device_df['order_date'].dt.year
    device_df['month'] = device_df['order_date'].dt.month
    device_df['day_of_year'] = device_df['order_date'].dt.dayofyear
    device_df['days_since_start'] = (device_df['order_date'] - device_df['order_date'].min()).dt.days
    device_df['quarter'] = device_df['order_date'].dt.quarter
    device_df['is_weekend'] = device_df['order_date'].dt.weekday >= 5
    
    # Encode categorical variables
    retailer_encoder = LabelEncoder()
    device_df['retailer_encoded'] = retailer_encoder.fit_transform(device_df['retailer_name'])
    
    # Create lag features (previous order quantities)
    device_df = device_df.sort_values('order_date')
    device_df['quantity_lag1'] = device_df['quantity'].shift(1)
    device_df['quantity_lag2'] = device_df['quantity'].shift(2)
    device_df['quantity_rolling_mean_3'] = device_df['quantity'].rolling(window=3, min_periods=1).mean().shift(1)
    
    # Calculate days since last order for this device
    device_df['days_since_last_order'] = device_df['order_date'].diff().dt.days
    
    # Drop rows with NaN values created by lag features (keep at least some data)
    device_df = device_df.dropna(subset=['quantity_lag1'])
    
    return device_df, target_device, retailer_encoder

def prepare_features_target(df):
    """Prepare feature matrix X and target vector y"""
    
    feature_cols = [
        'year', 'month', 'day_of_year', 'days_since_start', 'quarter',
        'is_weekend', 'retailer_encoded', 'quantity_lag1', 'quantity_lag2',
        'quantity_rolling_mean_3', 'days_since_last_order'
    ]
    
    # Handle any remaining NaN values
    df[feature_cols] = df[feature_cols].fillna(df[feature_cols].mean())
    
    X = df[feature_cols].values
    y = df['quantity'].values
    
    return X, y, feature_cols

def train_model_with_mlflow(X, y, feature_names, target_device, test_size=0.2, random_state=42):
    """Train linear regression model and log with MLflow"""
    
    # Start MLflow run
    with mlflow.start_run(run_name=f"medical_order_prediction_{target_device[:30]}") as run:
        
        # Log parameters
        mlflow.log_param("target_device", target_device)
        mlflow.log_param("test_size", test_size)
        mlflow.log_param("random_state", random_state)
        mlflow.log_param("n_features", len(feature_names))
        mlflow.log_param("n_samples", len(X))
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=test_size, random_state=random_state
        )
        
        # Train model
        model = LinearRegression()
        model.fit(X_train, y_train)
        
        # Make predictions
        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)
        
        # Calculate metrics
        train_rmse = np.sqrt(mean_squared_error(y_train, y_pred_train))
        test_rmse = np.sqrt(mean_squared_error(y_test, y_pred_test))
        train_r2 = r2_score(y_train, y_pred_train)
        test_r2 = r2_score(y_test, y_pred_test)
        train_mae = mean_absolute_error(y_train, y_pred_train)
        test_mae = mean_absolute_error(y_test, y_pred_test)
        
        # Log metrics
        mlflow.log_metrics({
            "train_rmse": train_rmse,
            "test_rmse": test_rmse,
            "train_r2": train_r2,
            "test_r2": test_r2,
            "train_mae": train_mae,
            "test_mae": test_mae
        })
        
        # Log feature importance
        feature_importance = pd.DataFrame({
            'feature': feature_names,
            'coefficient': model.coef_,
            'abs_coefficient': np.abs(model.coef_)
        }).sort_values('abs_coefficient', ascending=False)
        
        print("Feature Importance:")
        print(feature_importance)
        
        # Log feature importance as artifact
        feature_importance.to_csv("feature_importance.csv", index=False)
        mlflow.log_artifact("feature_importance.csv")
        
        # Create model signature
        signature = infer_signature(X_train, y_pred_train)
        
        # Log model
        mlflow.sklearn.log_model(
            sk_model=model,
            artifact_path="model",
            signature=signature,
            input_example=X_train[:5]
        )
        
        # Print results
        print(f"\nModel Performance for device: {target_device}")
        print(f"Training RMSE: {train_rmse:.3f}")
        print(f"Test RMSE: {test_rmse:.3f}")
        print(f"Training R²: {train_r2:.3f}")
        print(f"Test R²: {test_r2:.3f}")
        print(f"Training MAE: {train_mae:.3f}")
        print(f"Test MAE: {test_mae:.3f}")
        
        return model, run.info.run_id

def predict_future_quantity(model, last_order_data, days_ahead=30, retailer_name=None):
    """Predict quantity for a future date, optionally for a specific retailer"""
    
    # Create future date features
    future_date = last_order_data['order_date'].max() + timedelta(days=days_ahead)
    
    # Handle retailer selection
    if retailer_name is not None:
        # Use specific retailer if provided
        retailer_encoded = last_order_data[
            last_order_data['retailer_name'] == retailer_name
        ]['retailer_encoded']
        
        if len(retailer_encoded) == 0:
            print(f"Warning: Retailer '{retailer_name}' not found in historical data. Using most common retailer.")
            retailer_encoded = last_order_data['retailer_encoded'].mode()[0]
        else:
            retailer_encoded = retailer_encoded.iloc[0]
    else:
        # Use most common retailer as default
        retailer_encoded = last_order_data['retailer_encoded'].mode()[0]
    
    future_features = {
        'year': future_date.year,
        'month': future_date.month,
        'day_of_year': future_date.timetuple().tm_yday,
        'days_since_start': (future_date - last_order_data['order_date'].min()).days,
        'quarter': (future_date.month - 1) // 3 + 1,
        'is_weekend': future_date.weekday() >= 5,
        'retailer_encoded': retailer_encoded,
        'quantity_lag1': last_order_data['quantity'].iloc[-1],
        'quantity_lag2': last_order_data['quantity'].iloc[-2] if len(last_order_data) > 1 else last_order_data['quantity'].iloc[-1],
        'quantity_rolling_mean_3': last_order_data['quantity'].tail(3).mean(),
        'days_since_last_order': days_ahead
    }
    
    # Convert to array in the same order as training features
    feature_array = np.array([[
        future_features['year'], future_features['month'], 
        future_features['day_of_year'], future_features['days_since_start'],
        future_features['quarter'], future_features['is_weekend'],
        future_features['retailer_encoded'], future_features['quantity_lag1'],
        future_features['quantity_lag2'], future_features['quantity_rolling_mean_3'],
        future_features['days_since_last_order']
    ]])
    
    prediction = model.predict(feature_array)[0]
    return max(0, round(prediction))  # Ensure non-negative integer

def predict_for_multiple_retailers(model, last_order_data, retailer_encoder, days_ahead=30, top_n=5):
    """Predict quantities for multiple retailers"""
    
    # Get top retailers by order frequency
    retailer_counts = last_order_data['retailer_name'].value_counts().head(top_n)
    
    print(f"\nPredictions for top {top_n} retailers in {days_ahead} days:")
    print("-" * 60)
    
    predictions = {}
    
    for retailer_name in retailer_counts.index:
        try:
            prediction = predict_future_quantity(
                model, last_order_data, days_ahead, retailer_name
            )
            predictions[retailer_name] = prediction
            historical_avg = last_order_data[
                last_order_data['retailer_name'] == retailer_name
            ]['quantity'].mean()
            
            print(f"{retailer_name[:40]:<40} | Predicted: {prediction:>3} | Historical Avg: {historical_avg:>5.1f}")
            
        except Exception as e:
            print(f"Error predicting for {retailer_name}: {e}")
    
    return predictions

def main():
    """Main execution function"""
    
    print("Using default MLflow experiment")
    
    # Load data with size management
    df = load_and_prepare_data(max_rows=50000)
    
    # You can specify a particular device here, or let it auto-select the most frequent one
    target_device = None  # Will auto-select most frequent device
    
    # Engineer features and get device_df before analysis
    device_df, target_device, retailer_encoder = engineer_features(df, target_device)
    
    print(f"\n{'='*60}")
    print(f"RETAILER ANALYSIS FOR: {target_device}")
    print(f"{'='*60}")
    
    retailer_analysis = device_df.groupby('retailer_name').agg({
        'quantity': ['count', 'mean', 'sum', 'std'],
        'order_date': ['min', 'max']
    }).round(2)
    
    retailer_analysis.columns = ['Order_Count', 'Avg_Quantity', 'Total_Quantity', 'Std_Quantity', 'First_Order', 'Last_Order']
    retailer_analysis = retailer_analysis.sort_values('Order_Count', ascending=False)
    
    print("Top retailers for this device:")
    print(retailer_analysis.head(10))
    
    if len(device_df) < 5:
        print("Not enough data to build a reliable model. Try selecting a different device.")
        return
    
    X, y, feature_names = prepare_features_target(device_df)
    model, run_id = train_model_with_mlflow(X, y, feature_names, target_device)
    
    print(f"\n{'='*60}")
    print("PREDICTION SCENARIOS")
    print(f"{'='*60}")
    
    future_quantity = predict_future_quantity(model, device_df, days_ahead=30)
    most_common_retailer = device_df['retailer_name'].mode()[0]
    print(f"\nDefault prediction (using most common retailer '{most_common_retailer}'):")
    print(f"Predicted quantity for {target_device} in 30 days: {future_quantity}")
    
    retailer_predictions = predict_for_multiple_retailers(
        model, device_df, retailer_encoder, days_ahead=30, top_n=5
    )
    
    with mlflow.start_run(run_id=run_id):
        mlflow.log_param("future_prediction_days", 30)
        mlflow.log_metric("predicted_future_quantity_default", future_quantity)
        for retailer, prediction in retailer_predictions.items():
            safe_retailer_name = retailer.replace(" ", "_").replace(",", "")[:20]
            mlflow.log_metric(f"prediction_{safe_retailer_name}", prediction)
    
    print(f"\nMLflow run ID: {run_id}")
    print("Check the MLflow UI in Databricks for detailed results and model artifacts.")

if __name__ == "__main__":
    main()

# Additional helper function to get top devices by order frequency
def get_top_devices(limit=10, max_rows=10000):
    """Helper function to see the most frequently ordered devices"""
    df_spark = spark.table("mma_fe_innovation.mma.medical_orders_silver")
    
    # Sample data if too large
    total_count = df_spark.count()
    if total_count > max_rows:
        sample_fraction = max_rows / total_count
        df_spark = df_spark.sample(fraction=sample_fraction, seed=42)
        print(f"Sampled {sample_fraction:.3f} of data for device analysis")
    
    top_devices = (df_spark
                   .groupBy("device_name")
                   .count()
                   .orderBy(F.desc("count"))
                   .limit(limit))
    
    print("Top devices by order frequency:")
    top_devices.show(truncate=False)
    return top_devices

# Alternative approach: Work entirely in Spark without converting to Pandas
def analyze_device_spark_only(device_name, limit_records=1000):
    """Analyze a specific device using only Spark operations"""
    df_spark = spark.table("mma_fe_innovation.mma.medical_orders_silver")
    
    device_orders = (df_spark
                    .filter(F.col("device_name") == device_name)
                    .orderBy(F.desc("order_date"))
                    .limit(limit_records))  # Limit records for memory safety
    
    print(f"\nAnalysis for device: {device_name}")
    print(f"Total orders (limited to {limit_records}): {device_orders.count()}")
    
    # Basic statistics
    stats = device_orders.agg(
        F.avg("quantity").alias("avg_quantity"),
        F.min("quantity").alias("min_quantity"), 
        F.max("quantity").alias("max_quantity"),
        F.min("order_date").alias("earliest_order"),
        F.max("order_date").alias("latest_order")
    ).collect()[0]
    
    print(f"Quantity stats - Avg: {stats['avg_quantity']:.1f}, Min: {stats['min_quantity']}, Max: {stats['max_quantity']}")
    print(f"Date range: {stats['earliest_order']} to {stats['latest_order']}")
    
    return device_orders.toPandas()  # Only convert small filtered dataset

# Uncomment the line below to see top devices before running the model
# get_top_devices()