In [None]:
import pandas as pd
import numpy as np
from collections import Counter
from scipy.stats import zscore, median_abs_deviation
from sklearn.ensemble import IsolationForest
import joblib

# 1. File Paths
MODEL_PATH = 'V:/SimbaChen/xgb_raw_noimpute_removal.joblib'
PIPELINE_PATH = 'V:/SimbaChen/pipeline_noimpute_removal.joblib'

# 2. Key Column Names
GROUPBY_COLS = ['Lot', 'Wafer Number']
SITE_COL = 'site'

# 3. Preprocessing Parameters
Z_SCORE_THRESHOLD = 5.0 # Threshold for replacing outliers with NaN in preprocess_data

# 4. Anomaly Detection Parameters
ANOMALY_SCORE_THRESHOLD = -0.65 # Threshold for Isolation Forest in detect_anomalies


def preprocess_data(df: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Cleans the input DataFrame by dropping irrelevant columns and handling outliers.

    Args:
        df: The raw input DataFrame for a single Lot/Wafer group.

    Returns:
        A tuple containing:
        - The preprocessed DataFrame.
        - A DataFrame of median values per test, with sites as columns.
        - A DataFrame of standard deviation values per test, with sites as columns.
    """
    # Define columns and patterns to drop
    prefixes_to_drop = [
        'ECID', 'failtest_name', 'failtest_number', 'hardbin_name', 'hardbin_number',
        'lastfailtest_name', 'lastfailtest_number', 'Part_Id', 'softbin_name', 'softbin_number',
        'Lot', 'DieID', 'CodedDieID', 'OCR', 'Source Lot', 'rework_flag', 'Program', 'Wafer Number',
        'start_time', 'temperature', 'subid', 'Wafer','die_x', 'die_y', 'device_nr', 'rom_code',
        'hardbin', 'lib_info', 'BinName', 'BinState', 'cnt_alarmed_tests', 'cnt_failed_tests',
        'cnt_passed_tests', 'cnt_recorded_tests', 'Device_Test_Time', 'lot_key', 'wf_key', 'Fab',
        'Probecard', 'Testcenter', 'Tester', 'program revision', 'Stage',
    ]
    suffixes_to_drop = [',0,0', ',0,-']

    # Build the list of columns to drop
    cols_to_drop = [
        col for col in df.columns
        if any(col.startswith(p) for p in prefixes_to_drop) or any(col.endswith(s) for s in suffixes_to_drop)
    ]
    df = df.drop(columns=cols_to_drop, errors='ignore')

    # Identify numeric columns, excluding 'site'. This part is already efficient.
    numeric_cols = df.columns.drop(SITE_COL)
    df[numeric_cols] = df[numeric_cols].apply(pd.to_numeric, errors='coerce')

    # --- Vectorized Outlier Removal ---
    # 1. Calculate group-wise mean and std for all columns at once.
    #    .transform() broadcasts the result back to the original DataFrame's shape.
    means = df.groupby(SITE_COL)[numeric_cols].transform('mean')
    stds = df.groupby(SITE_COL)[numeric_cols].transform('std')

    # 2. Calculate Z-scores for all data points in a single, vectorized operation.
    #    We'll fill stds of 0 with NaN to avoid division-by-zero errors, then fill
    #    resulting Z-score NaNs with 0, as a point with no deviation is 0 Z-score.
    z_scores = ((df[numeric_cols] - means) / stds.replace(0, np.nan)).fillna(0)
    
    # 3. Create a boolean mask to identify outliers.
    #    This mask is True for any value where the absolute Z-score > 5.
    mask = z_scores.abs() > 5

    # 4. Use the mask to replace outliers with NaN.
    #    df.where() keeps original values where the mask is False.
    df[numeric_cols] = df[numeric_cols].where(~mask, np.nan)
    # ------------------------------------

    # Calculate final median and standard deviation after handling outliers.
    # These are already efficient groupby aggregations.
    med = df.groupby(SITE_COL).median().T
    std = df.groupby(SITE_COL).std().T
    
    return df, med, std


def standardize(df: pd.DataFrame) -> pd.DataFrame:
    """
    Applies row-wise Z-score standardization, with maximum safety checks for
    data types and data variation to ensure universal compatibility.
    """
    def safe_zscore(row):
        """
        Calculates zscore by first sanitizing the row's data type, then checking
        for variation, making it robust against all known edge cases.
        """
        # ** THE FINAL, BULLETPROOF FIX **
        # 1. Sanitize the input: Immediately convert the row to a numeric type.
        #    This is the most critical step to prevent the TypeError.
        numeric_row = pd.to_numeric(row, errors='coerce')

        # 2. Work only with valid data from the sanitized row.
        valid_data = numeric_row.dropna()
        
        # 3. Check for variation.
        if valid_data.nunique() <= 1:
            return pd.Series(0.0, index=numeric_row.index).where(numeric_row.notna())
        
        # 4. If all checks pass, perform the calculation on the sanitized data.
        return zscore(numeric_row, nan_policy='omit')

    # Apply the fully robust safe_zscore function to each row.
    return df.apply(safe_zscore, axis=1)

    

def generate_features(med: pd.DataFrame, std: pd.DataFrame, z_med: pd.DataFrame, z_std: pd.DataFrame) -> pd.DataFrame:
    """
    Generates statistical features for the machine learning model.

    Args:
        med: DataFrame of median values (tests x sites).
        std: DataFrame of standard deviation values (tests x sites).
        z_med: Standardized median values.
        z_std: Standardized standard deviation values.

    Returns:
        A DataFrame of generated features, with one row per test parameter.
    """
    features_dict = {
        "z_med_range": z_med.max(axis=1) - z_med.min(axis=1),
        "z_std_range": z_std.max(axis=1) - z_std.min(axis=1),
        "z_med_iqr": z_med.quantile(0.75, axis=1) - z_med.quantile(0.25, axis=1),
        "z_std_iqr": z_std.quantile(0.75, axis=1) - z_std.quantile(0.25, axis=1),
        "z_med_mad": z_med.apply(median_abs_deviation, axis=1),
        "z_std_mad": z_std.apply(median_abs_deviation, axis=1),
        "med_skewness": med.skew(axis=1),
        "std_skewness": std.skew(axis=1),
        "med_kurtosis": med.kurt(axis=1),
        "std_kurtosis": std.kurt(axis=1),
    }
    return pd.DataFrame(features_dict)


def predict_anomalies(features: pd.DataFrame, model_path: str, pipeline_path: str) -> np.ndarray:
    """Loads a model and pipeline to predict anomalies from features."""
    model = joblib.load(model_path)
    pipeline = joblib.load(pipeline_path)
    x_transformed = pipeline.transform(features)
    return model.predict(x_transformed)
    

def detect_anomalies(z_med: pd.DataFrame, z_std: pd.DataFrame, predictions: np.ndarray, features_index: pd.Index) -> pd.DataFrame:
    """
    Filters out non-anomalous tests and performs a second-level check for outlier sites using Isolation Forest scores.

    Args:
        z_med: Standardized median values (tests x sites).
        z_std: Standardized standard deviation values (tests x sites).
        predictions: Anomaly predictions from the XGBoost model.
        features_index: The index of the features DataFrame (test names).

    Returns:
        A DataFrame listing each anomalous site and its corresponding test parameter.
    """

    pred_df = pd.DataFrame({'XGB_is_anomaly': predictions}, index=features_index)
    bad_params_df = pred_df[pred_df['XGB_is_anomaly'] == 0]

    z_med_filtered = z_med.loc[bad_params_df.index]
    z_std_filtered = z_std.loc[bad_params_df.index]

    anomalies_records = []

    for test_name in z_med_filtered.index:
        data = pd.DataFrame({
            'med': z_med_filtered.loc[test_name],
            'std': z_std_filtered.loc[test_name]
        }).dropna()

        if data.empty:
            continue

        iso_forest = IsolationForest(contamination='auto', random_state=42)
        iso_forest.fit(data[['med', 'std']])
        data['anomaly_score'] = iso_forest.score_samples(data[['med', 'std']])

        # Flag sites with scores below the fixed threshold
        anomalous_sites = data[data['anomaly_score'] < ANOMALY_SCORE_THRESHOLD].index.tolist()

        for site in anomalous_sites:
            anomalies_records.append({SITE_COL: site, 'test_param': test_name})

    if not anomalies_records:
        return pd.DataFrame(columns=[SITE_COL, 'test_param'])

    return pd.DataFrame(anomalies_records).sort_values(by=SITE_COL, ascending=True)


def main(df_in: pd.DataFrame):
    """
    Main function to run the complete anomaly detection pipeline.

    Args:
        df_in: The raw input DataFrame containing data for multiple lots and wafers.
    """
    all_anomalies = []

    # Process each Lot/Wafer group individually
    for (lot, wafer), group_df in df_in.groupby(GROUPBY_COLS):
        print(f"Processing Lot: {lot}, Wafer: {wafer}...")
        
        # 1. Preprocess data and calculate stats
        df, med, std = preprocess_data(group_df)
        if med.empty or std.empty:
            print(f"  -> Skipping group due to empty data after preprocessing.")
            continue

        # 2. Standardize stats and generate features for the model
        z_med = standardize(med)
        z_std = standardize(std)
        features = generate_features(med, std, z_med, z_std)

        # 3. Predict which TEST PARAMETERS are anomalous
        predictions = predict_anomalies(features, MODEL_PATH, PIPELINE_PATH)

        # 4. For the non-anomalous tests, find which specific SITES are outliers
        anomalies_df = detect_anomalies(z_med, z_std, predictions, features.index)

        # If anomalies were found, add identifiers and store the result
        if not anomalies_df.empty:
            anomalies_df['Lot'] = lot
            anomalies_df['Wafer Number'] = wafer
            all_anomalies.append(anomalies_df)

    # Combine results from all groups into a final DataFrame
    if all_anomalies:
        final_anomalies_df = pd.concat(all_anomalies, ignore_index=True)
        # Reorder columns for the final output
        final_anomalies_df = final_anomalies_df[['Lot', 'Wafer Number', SITE_COL, 'test_param']]
        print("\n--- Anomaly Detection Complete ---")
        print(final_anomalies_df)
        return final_anomalies_df
    else:
        print("\n--- Anomaly Detection Complete: No anomalies found. ---")
        columns=['Lot', 'Wafer Number', SITE_COL, 'test_param']
        nan_df = pd.DataFrame([[np.nan] * len(columns)], columns=columns)
        return nan_df

df_in = pd.DataFrame(data=input)

# Run the main pipeline
anomalies_df = main(df_in)

# Generating count per test parameter table
test_param_counts = anomalies_df['test_param'].value_counts().reset_index()
test_param_counts.columns = ['test_param', 'count']