In [None]:
# <-------------------- CELL 1: IMPORTS -------------------->
print("Cell 1: Imports - Executing...")
import mlflow
import mlflow.pyfunc # For saving custom Python models

import pandas as pd
import numpy as np
import os
import joblib # For saving the pyfunc model's components IF NOT relying solely on mlflow pickling the instance
import time
from datetime import datetime

from sklearn.model_selection import train_test_split # Used for month-wise splits
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
import category_encoders as ce # For TargetEncoder

from pyspark.sql import SparkSession # Still useful for environment context

# Suppress common warnings for cleaner output
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning)
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', message="Previously subsetted data...") # From category_encoders

# Ensure spark session is available
if 'spark' not in locals():
    spark = SparkSession.builder.appName("Pandas_FE_Preprocessing_Full_MVP").getOrCreate()
    print("SparkSession created.")
else:
    print("SparkSession already exists.")

print("Imports successful for Pandas Preprocessing Pipeline with Feature Engineering.")
print("-" * 50)

# <-------------------- CELL 2: INIT CELL - GLOBAL CONFIGURATIONS FOR PREPROCESSING -------------------->
print("\nCell 2: Global Configurations for Preprocessing - Executing...")

# --- MLflow Configuration ---
# !!! IMPORTANT: SET YOUR MLFLOW EXPERIMENT PATH !!!
PREPROCESSING_EXPERIMENT_PATH = "/Users/your_username@example.com/MVP_Classification_FE_Preprocessing_Full" # CHANGE THIS

# --- Data Paths (Unity Catalog Volumes) ---
# !!! IMPORTANT: SET YOUR UNITY CATALOG VOLUME BASE PATH !!!
UC_BASE_DATA_PATH = "/Volumes/delfos/" # As per your input

# --- Path to the FULL RAW input dataset ---
# !!! IMPORTANT: UPDATE THIS TO YOUR ACTUAL FULL RAW DATASET PATH IN UC VOLUMES !!!
FULL_RAW_DATA_PARQUET_PATH = f"{UC_BASE_DATA_PATH}raw_data_full/full_dataset_generic.parquet" # Example

# --- Date Column for Stratified Splitting ---
# !!! IMPORTANT: SET THE NAME OF YOUR DATE/TIMESTAMP COLUMN IN THE RAW DATA !!!
DATE_COLUMN_FOR_SPLIT = "date_col" # Example: 'order_date', 'policy_start_date'

# --- Output Paths for INTERMEDIATE RAW SPLIT Data (Optional, but good for traceability) ---
# Using /dbfs/ prefix for direct Pandas/os operations on UC Volumes
DBFS_RAW_SPLITS_DIR = f"/dbfs{UC_BASE_DATA_PATH}raw_splits_v1_fe/" # Unique name
RAW_TRAIN_SPLIT_PATH = os.path.join(DBFS_RAW_SPLITS_DIR, "raw_train_split.parquet")
RAW_TEST_SPLIT_PATH = os.path.join(DBFS_RAW_SPLITS_DIR, "raw_test_split.parquet")

# --- Output Paths for FINAL PROCESSED Data (Parquet with Named Columns) ---
PROCESSED_DATA_VERSION_FE = "v1_pandas_fe_final" # Versioning for processed data
DBFS_PROCESSED_DATA_DIR_BASE_FE = f"/dbfs{UC_BASE_DATA_PATH}processed_data/"
PROCESSED_DATA_DIR_VERSIONED_FE = os.path.join(DBFS_PROCESSED_DATA_DIR_BASE_FE, PROCESSED_DATA_VERSION_FE)

# These paths will point to the Parquet files with named columns
SHARED_PROCESSED_TRAIN_PATH = os.path.join(PROCESSED_DATA_DIR_VERSIONED_FE, "train_processed_named_cols.parquet")
SHARED_PROCESSED_TEST_PATH = os.path.join(PROCESSED_DATA_DIR_VERSIONED_FE, "test_processed_named_cols.parquet")

# --- MLflow artifact path for the saved pyfunc preprocessor model ---
MLFLOW_PYFUNC_PREPROCESSOR_ARTIFACT_PATH = "pandas_full_preprocessor"

# !!! IMPORTANT: SET YOUR ACTUAL BINARY TARGET LABEL COLUMN NAME (must exist in raw data) !!!
YOUR_TARGET_COLUMN_NAME = "target_binary" # Example: 0 or 1

# --- Define your categorical and numerical columns from the RAW data ---
# These are columns read from FULL_RAW_DATA_PARQUET_PATH *before* any FE in the PyFunc model.
# The PyFunc model will then create new features based on these (e.g. from date_col, age_col, premium_col).
# !!! IMPORTANT: UPDATE THESE LISTS BASED ON YOUR ACTUAL RAW DATASET !!!
CATEGORICAL_COLUMNS_RAW = ["cat_feat_1", "cat_feat_2", "region_col"] # Example
NUMERICAL_COLUMNS_RAW = ["num_feat_1", "age_col", "premium_col", "interactions_col"] # Example

# --- Preprocessing Configuration ---
TEST_SET_SPLIT_RATIO = 0.20
TARGET_ENCODING_SMOOTHING = 20.0 # For category_encoders.TargetEncoder
CATEGORICAL_IMPUTE_CONSTANT = "__MISSING_CAT__" # For SimpleImputer
NUMERICAL_IMPUTE_STRATEGY = "median" # For SimpleImputer

# --- Reproducibility ---
GLOBAL_SEED = 117

# --- Feature Engineering Configuration (Example) ---
# These original column names are used as basis for FE. They must be in NUMERICAL_COLUMNS_RAW or DATE_COLUMN_FOR_SPLIT.
AGE_COL_FOR_FE = "age_col" # Must be in NUMERICAL_COLUMNS_RAW
PREMIUM_COL_FOR_FE = "premium_col" # Must be in NUMERICAL_COLUMNS_RAW
INTERACTIONS_COL_FOR_FE = "interactions_col" # Example, must be in NUMERICAL_COLUMNS_RAW

# For binning age (example)
AGE_BINS = [0, 18, 30, 45, 60, 120]
AGE_BIN_LABELS = ['0-18', '19-30', '31-45', '46-60', '60+']

# Ensure output directories exist (using /dbfs/ prefix for os.makedirs)
try:
    os.makedirs(DBFS_RAW_SPLITS_DIR, exist_ok=True)
    os.makedirs(PROCESSED_DATA_DIR_VERSIONED_FE, exist_ok=True)
    print(f"Checked/created raw splits directory: {DBFS_RAW_SPLITS_DIR}")
    print(f"Checked/created processed data directory: {PROCESSED_DATA_DIR_VERSIONED_FE}")
except Exception as e:
    print(f"Warning: Could not create directory. Ensure UC Volume '{UC_BASE_DATA_PATH}' exists and you have write permissions. Error: {e}")

print(f"--- Preprocessing Global Configurations (with FE) Initialized ---")
print(f"MLflow Experiment Path for Preprocessing: {PREPROCESSING_EXPERIMENT_PATH}")
print(f"Full Raw Data Input Path: {FULL_RAW_DATA_PARQUET_PATH}")
print(f"Date Column for Split: {DATE_COLUMN_FOR_SPLIT}")
print(f"Target Column: {YOUR_TARGET_COLUMN_NAME}")
print(f"  Output Processed Train Data Path (Parquet Named Cols): {SHARED_PROCESSED_TRAIN_PATH}")
print(f"  Output Processed Test Data Path (Parquet Named Cols): {SHARED_PROCESSED_TEST_PATH}")
print(f"  MLflow Pyfunc Preprocessor Artifact Path: {MLFLOW_PYFUNC_PREPROCESSOR_ARTIFACT_PATH}")
print(f"Categorical Columns (Raw): {CATEGORICAL_COLUMNS_RAW}")
print(f"Numerical Columns (Raw): {NUMERICAL_COLUMNS_RAW}")
print(f"Global Seed: {GLOBAL_SEED}")
print("-" * 50)


# <-------------------- CELL 3: PREPROCESSING LOGIC & MLFLOW PYFUNC MODEL CLASS -------------------->
print("\nCell 3: Preprocessing Logic & Pyfunc Model Class - Defining (with Custom Initial Formatting & Feature Engineering)...")

# --- MLflow Utility ---
def get_or_create_experiment(experiment_name_param, spark_session_param=None): # spark_session_param optional
    try:
        experiment = mlflow.get_experiment_by_name(experiment_name_param)
        if experiment:
            print(f"MLflow experiment '{experiment_name_param}' found with ID: {experiment.experiment_id}")
            return experiment.experiment_id
        else:
            print(f"MLflow experiment '{experiment_name_param}' not found. Attempting to create.")
            experiment_id = mlflow.create_experiment(name=experiment_name_param)
            print(f"MLflow experiment '{experiment_name_param}' created with ID: {experiment_id}")
            return experiment_id
    except Exception as e:
        print(f"Error in get_or_create_experiment for '{experiment_name_param}'. Error: {e}")
        return None

# --- Data Splitting Function ---
def split_by_month_and_stratify(full_pdf: pd.DataFrame, 
                                date_col: str, 
                                target_col: str, 
                                test_size: float, 
                                random_state: int) -> tuple[pd.DataFrame, pd.DataFrame]:
    print(f"  Starting stratified split by month from column '{date_col}' and target '{target_col}'...")
    if date_col not in full_pdf.columns:
        raise ValueError(f"Date column '{date_col}' not found in DataFrame.")
    if target_col not in full_pdf.columns:
        raise ValueError(f"Target column '{target_col}' not found in DataFrame.")

    try:
        temp_df = full_pdf.copy()
        temp_df[date_col] = pd.to_datetime(temp_df[date_col])
        temp_df['year_month_group'] = temp_df[date_col].dt.to_period('M')
    except Exception as e:
        raise ValueError(f"Could not convert column '{date_col}' to datetime or extract year_month. Error: {e}")

    train_dfs_list = []
    test_dfs_list = []
    
    for month_period, group_data in temp_df.groupby('year_month_group'):
        print(f"    Splitting for month-year: {month_period}, group size: {len(group_data)}")
        if len(group_data) < 2:
             print(f"      Group for {month_period} is too small ({len(group_data)}). Assigning based on ratio if possible, else to train.")
             if len(group_data) == 1: # Single sample goes to train
                train_dfs_list.append(group_data)
             elif np.random.RandomState(random_state).rand() > test_size : # Use seeded random for consistency
                 train_dfs_list.append(group_data)
             else:
                 test_dfs_list.append(group_data)
             continue
        
        target_counts = group_data[target_col].value_counts()
        # Check if any class has fewer samples than required for a split (typically 2 for train_test_split)
        # Or if only one class is present in the group
        min_samples_per_class_needed = 2 # For train_test_split to be able to make a split for each class
        
        if len(target_counts) < 2 or target_counts.min() < min_samples_per_class_needed:
            print(f"      Not enough class diversity or samples in {month_period} for stratification (counts: {target_counts.to_dict()}). Performing random split.")
            month_train_df, month_test_df = train_test_split(
                group_data, test_size=test_size, random_state=random_state, shuffle=True
            )
        else:
            try:
                month_train_df, month_test_df = train_test_split(
                    group_data, test_size=test_size, random_state=random_state, 
                    stratify=group_data[target_col], shuffle=True
                )
            except ValueError as ve: # Handles "The least populated class in y has only X members"
                print(f"      Stratification failed for {month_period} (Error: {ve}). Performing random split.")
                month_train_df, month_test_df = train_test_split(
                    group_data, test_size=test_size, random_state=random_state, shuffle=True
                )
        train_dfs_list.append(month_train_df)
        if not month_test_df.empty: # Only append if test_df is not empty
             test_dfs_list.append(month_test_df)

    final_train_df = pd.concat(train_dfs_list).drop(columns=['year_month_group']) if train_dfs_list else pd.DataFrame(columns=full_pdf.columns)
    final_test_df = pd.concat(test_dfs_list).drop(columns=['year_month_group']) if test_dfs_list else pd.DataFrame(columns=full_pdf.columns)


    print(f"  Splitting complete. Train shape: {final_train_df.shape}, Test shape: {final_test_df.shape}")
    if not final_train_df.empty: print(f"  Train target distribution:\n{final_train_df[target_col].value_counts(normalize=True, dropna=False)}")
    if not final_test_df.empty: print(f"  Test target distribution:\n{final_test_df[target_col].value_counts(normalize=True, dropna=False)}")
    return final_train_df, final_test_df


# --- Pandas Preprocessor as an mlflow.pyfunc.PythonModel (with Feature Engineering) ---
class PandasFeatureEngineeringPreprocessor(mlflow.pyfunc.PythonModel):
    
    def __init__(self, 
                 raw_categorical_cols, raw_numerical_cols, 
                 date_col_for_fe, age_col_for_fe, premium_col_for_fe, interactions_col_for_fe,
                 age_bins_for_fe, age_bin_labels_for_fe,
                 label_col_for_te_fitting, 
                 te_smoothing_factor, 
                 cat_impute_constant, num_impute_strategy,
                 global_seed):
        
        self.raw_categorical_cols_config = list(raw_categorical_cols)
        self.raw_numerical_cols_config = list(raw_numerical_cols)
        self.date_col_for_fe = date_col_for_fe
        self.age_col_for_fe = age_col_for_fe
        self.premium_col_for_fe = premium_col_for_fe
        self.interactions_col_for_fe = interactions_col_for_fe
        self.age_bins_for_fe = age_bins_for_fe
        self.age_bin_labels_for_fe = age_bin_labels_for_fe
        self.label_col_for_te_fitting = label_col_for_te_fitting # Only used for fitting TE
        self.te_smoothing_factor = te_smoothing_factor
        self.cat_impute_constant = cat_impute_constant
        self.num_impute_strategy = num_impute_strategy
        self.global_seed = global_seed

        # Fitted components and dynamic column lists will be stored here after `fit`
        self.fitted_components = {} # Store all imputers, encoders, scalers
        self.feature_engineering_details = {} # Store names of engineered features
        self.final_feature_columns_in_order = [] # Defines the output columns and their order

    def _get_valid_cols(self, df, col_list):
        return [col for col in col_list if col in df.columns and col != self.label_col_for_te_fitting]

    def _apply_initial_custom_formatting(self, df_input: pd.DataFrame) -> pd.DataFrame:
        df = df_input.copy()
        print("    Custom Preprocessing Step 1: Applying initial custom data formatting...")
        # !!! REPLACE THE CONTENT BELOW WITH YOUR ACTUAL CUSTOM FORMATTING LOGIC !!!
        # This function should take the raw DataFrame (without the label if separated early)
        # and return the formatted DataFrame.
        # The columns listed in raw_categorical_cols_config and raw_numerical_cols_config
        # should exist AFTER this step, or this step should create them.
        
        # Example (ensure this doesn't conflict with your actual column names):
        # if 'some_text_column' in df.columns:
        #     df['some_text_column'] = df['some_text_column'].astype(str).str.lower().str.strip()
        #     print("      Example custom formatting: 'some_text_column' processed.")
        # else:
        #     print("      Example custom formatting: 'some_text_column' not found.")
        print("    Custom Preprocessing Step 1: Initial custom data formatting complete.")
        return df

    def _engineer_features(self, df_input: pd.DataFrame, is_fitting_phase: bool) -> pd.DataFrame:
        df = df_input.copy()
        print("    Feature Engineering Step 2: Creating new features...")
        
        engineered_categoricals_temp = []
        engineered_numericals_temp = []

        # Date/Time Features
        if self.date_col_for_fe and self.date_col_for_fe in df.columns:
            try:
                s_date = pd.to_datetime(df[self.date_col_for_fe], errors='coerce')
                if not s_date.isnull().all(): # Proceed if some dates are valid
                    df['fe_month'] = s_date.dt.month.fillna(-1).astype(int).astype(str) # Impute NaT with -1 then to str
                    df['fe_day_of_week'] = s_date.dt.dayofweek.fillna(-1).astype(int).astype(str)
                    df['fe_is_weekend'] = s_date.dt.dayofweek.isin([5,6]).astype(int).astype(str)
                    engineered_categoricals_temp.extend(['fe_month', 'fe_day_of_week', 'fe_is_weekend'])
                    print(f"      FE: Created date features: month, day_of_week, is_weekend from {self.date_col_for_fe}")
            except Exception as e_date: print(f"      Warning: Could not create date features from {self.date_col_for_fe}. Error: {e_date}")
        
        # Numerical Transformations
        for col_name, new_col_prefix in [
            (self.premium_col_for_fe, "fe_premium"), 
            (self.age_col_for_fe, "fe_age"), 
            (self.interactions_col_for_fe, "fe_interactions") # Example
        ]:
            if col_name and col_name in df.columns:
                df[col_name] = pd.to_numeric(df[col_name], errors='coerce') # Ensure numeric
                df[f'{new_col_prefix}_log1p'] = np.log1p(df[col_name].fillna(0).clip(lower=0))
                df[f'{new_col_prefix}_sq'] = df[col_name].fillna(0)**2
                engineered_numericals_temp.extend([f'{new_col_prefix}_log1p', f'{new_col_prefix}_sq'])
                print(f"      FE: Created log1p and squared features for {col_name}")

        # Binning Age
        if self.age_col_for_fe and self.age_col_for_fe in df.columns and self.age_bins_for_fe and self.age_bin_labels_for_fe:
            age_col_binned_name = f'fe_{self.age_col_for_fe}_binned'
            # Ensure age column is numeric before binning
            df[self.age_col_for_fe] = pd.to_numeric(df[self.age_col_for_fe], errors='coerce')
            df[age_col_binned_name] = pd.cut(df[self.age_col_for_fe], 
                                             bins=self.age_bins_for_fe, 
                                             labels=self.age_bin_labels_for_fe, 
                                             right=False, include_lowest=True)
            df[age_col_binned_name] = df[age_col_binned_name].astype(str).fillna(self.cat_impute_constant) # Handle NaNs from binning then to string
            engineered_categoricals_temp.append(age_col_binned_name)
            print(f"      FE: Created binned age feature: {age_col_binned_name}")

        if is_fitting_phase:
            self.feature_engineering_details['engineered_categorical_cols'] = list(set(engineered_categoricals_temp))
            self.feature_engineering_details['engineered_numerical_cols'] = list(set(engineered_numericals_temp))
        return df

    def _create_interaction_features_post_te(self, df_input: pd.DataFrame, is_fitting_phase: bool) -> pd.DataFrame:
        df = df_input.copy()
        print("    Feature Engineering Step 4: Creating interaction features (post-target encoding)...")
        newly_created_interactions_temp = []

        # Use log_premium if available, else original premium (ensure it exists)
        premium_col_for_interact = f'fe_log_{self.premium_col_for_fe}' if f'fe_log_{self.premium_col_for_fe}' in df.columns else self.premium_col_for_fe
        if premium_col_for_interact not in df.columns:
            print(f"      FE Interaction: Base premium column '{premium_col_for_interact}' for interactions not found. Skipping.")
            if is_fitting_phase: self.feature_engineering_details['engineered_interaction_cols'] = []
            return df
        
        # Ensure premium column for interaction is numeric
        df[premium_col_for_interact] = pd.to_numeric(df[premium_col_for_interact], errors='coerce').fillna(0)

        # Example: Interaction with target-encoded binned age
        # The original binned age (e.g., 'fe_age_col_binned') was target encoded.
        # Its name remains the same after TE by category_encoders.TargetEncoder.
        age_binned_col_name = f'fe_{self.age_col_for_fe}_binned'
        if age_binned_col_name in df.columns: # This column is now numeric (target-encoded)
            interaction_col_name = f'fe_inter_{premium_col_for_interact}_x_{age_binned_col_name}'
            df[interaction_col_name] = df[premium_col_for_interact] * df[age_binned_col_name]
            newly_created_interactions_temp.append(interaction_col_name)
            print(f"      FE Interaction: Created {interaction_col_name}")
        
        # Example: Interaction with target-encoded raw categorical 'customer_segment'
        # (Assuming 'customer_segment' is in self.raw_categorical_cols_config)
        customer_segment_col_name = "customer_segment" # Example from raw_categorical_cols_config
        if customer_segment_col_name in self.raw_categorical_cols_config and customer_segment_col_name in df.columns:
            interaction_col_name_seg = f'fe_inter_{premium_col_for_interact}_x_{customer_segment_col_name}'
            df[interaction_col_name_seg] = df[premium_col_for_interact] * df[customer_segment_col_name] # Assumes customer_segment is now numeric post-TE
            newly_created_interactions_temp.append(interaction_col_name_seg)
            print(f"      FE Interaction: Created {interaction_col_name_seg}")

        if is_fitting_phase:
            self.feature_engineering_details['engineered_interaction_cols'] = list(set(newly_created_interactions_temp))
        return df


    def fit(self, train_pdf: pd.DataFrame):
        print("    Fitting PandasFeatureEngineeringPreprocessor...")
        if self.label_col_for_te_fitting not in train_pdf.columns:
            raise ValueError(f"Label column '{self.label_col_for_te_fitting}' for Target Encoder fitting not found in training DataFrame.")
        
        # --- Step 0: Initial Custom Formatting ---
        X_fit_custom_formatted = self._apply_initial_custom_formatting(
            train_pdf.drop(columns=[self.label_col_for_te_fitting], errors='ignore')
        )
        y_fit_series = train_pdf[self.label_col_for_te_fitting].astype(float).copy() # Used by TargetEncoder

        # --- Step 1: Create Base Engineered Features ---
        X_fit_fe_engineered = self._engineer_features(X_fit_custom_formatted, is_fitting_phase=True)
        
        # --- Define columns for imputation based on raw and engineered ---
        # Valid columns present after initial FE
        current_cols_in_X = X_fit_fe_engineered.columns.tolist()
        active_numerical_cols = self._get_valid_cols(X_fit_fe_engineered, self.raw_numerical_cols_config + self.feature_engineering_details.get('engineered_numerical_cols', []))
        active_categorical_cols = self._get_valid_cols(X_fit_fe_engineered, self.raw_categorical_cols_config + self.feature_engineering_details.get('engineered_categorical_cols', []))
        
        self.fitted_components['active_numerical_cols_for_impute'] = active_numerical_cols
        self.fitted_components['active_categorical_cols_for_impute'] = active_categorical_cols

        # --- Step 2: Impute Numerical Features ---
        if active_numerical_cols:
            num_imputer = SimpleImputer(strategy=self.num_impute_strategy)
            X_fit_fe_engineered[active_numerical_cols] = num_imputer.fit_transform(X_fit_fe_engineered[active_numerical_cols])
            self.fitted_components['numerical_imputer'] = num_imputer
            print(f"      Fitted Numerical Imputer for: {active_numerical_cols}")

        # --- Step 3: Impute Categorical Features ---
        if active_categorical_cols:
            cat_imputer = SimpleImputer(strategy="constant", fill_value=self.cat_impute_constant)
            X_fit_fe_engineered[active_categorical_cols] = cat_imputer.fit_transform(X_fit_fe_engineered[active_categorical_cols])
            self.fitted_components['categorical_imputer'] = cat_imputer
            print(f"      Fitted Categorical Imputer for: {active_categorical_cols}")
            
        # --- Step 4: Target Encoding ---
        # Target encode all active categorical columns (original raw + newly engineered categoricals)
        cols_for_te_fit = list(active_categorical_cols) # Use the imputed ones
        target_encoded_output_names = [] # Will be same as input names for category_encoders.TargetEncoder
        if cols_for_te_fit:
            for col in cols_