In [None]:
import sys
import os
import pickle
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks
from scipy import ndimage
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report

# ==========================================
# 1. REPRODUCIBILITY & COMPATIBILITY
# ==========================================
# Fixing the seed ensures the same result every time you run it
np.random.seed(42)
tf.random.set_seed(42)

import pandas.core.indexes as indexes
sys.modules['pandas.indexes'] = indexes

# ==========================================
# 2. DATA LOADING & ENHANCED PREPROCESSING
# ==========================================
def load_and_preprocess():
    file_path = 'LSWMD.pkl'
    if os.path.exists(file_path):
        with open(file_path, 'rb') as f:
            data = pickle.load(f, encoding='latin1')
        df = pd.DataFrame(data)
        df['failureType'] = df['failureType'].apply(
            lambda x: x[0][0] if isinstance(x, (list, np.ndarray)) and len(x) > 0 else 'none'
        )
        df_defects = df[df['failureType'] != 'none'].copy()
    else:
        # Synthetic Fallback
        test_data = {'waferMap': [np.random.randint(0, 3, (26, 26)) for _ in range(1000)],
                     'failureType': [np.random.choice(['Center', 'Donut', 'Scratch']) for _ in range(1000)]}
        df_defects = pd.DataFrame(test_data)

    label_map = {val: i for i, val in enumerate(df_defects['failureType'].unique())}
    df_defects['label_id'] = df_defects['failureType'].map(label_map)
    
    processed_images = []
    for x in df_defects['waferMap']:
        # Applied a slightly stronger median filter for cleaner feature extraction
        denoised = ndimage.median_filter(x, size=3) 
        resized = tf.image.resize(denoised[:, :, np.newaxis], (28, 28)).numpy()
        processed_images.append(resized)
    
    X = np.array(processed_images).astype('float32') / 2.0
    y = tf.keras.utils.to_categorical(df_defects['label_id'])
    return X, y, label_map

X, y, label_map = load_and_preprocess()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# ==========================================
# 3. BOOSTED EDGE-AI ARCHITECTURE
# ==========================================

def build_boosted_model(num_classes):
    model = models.Sequential([
        layers.Input(shape=(28, 28, 1)),
        
        # Increased filters slightly to capture more complex failure geometries
        layers.SeparableConv2D(64, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.MaxPooling2D((2, 2)),
        
        layers.SeparableConv2D(128, (3, 3), activation='relu', padding='same'),
        layers.BatchNormalization(),
        layers.GlobalAveragePooling2D(),
        
        layers.Dense(64, activation='relu'),
        layers.Dropout(0.3), # Higher dropout to prevent overfitting during 20 epochs
        layers.Dense(num_classes, activation='softmax')
    ])
    return model

model = build_boosted_model(len(label_map))
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), 
              loss='categorical_crossentropy', 
              metrics=['accuracy'])

# ==========================================
# 4. LEARNING RATE SCHEDULER & TRAINING
# ==========================================
# This callback reduces the learning rate when the model stops improving
lr_reducer = callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, min_lr=0.00001, verbose=1)

print("\nStarting Boosted Training (20 Epochs)...")
history = model.fit(X_train, y_train, 
                    epochs=20, 
                    batch_size=32, 
                    validation_data=(X_test, y_test),
                    callbacks=[lr_reducer])

# ==========================================
# 5. FINAL QUANTIZATION & EXPORT
# ==========================================
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_model = converter.convert()
with open('boosted_defect_model.tflite', 'wb') as f:
    f.write(tflite_model)

# ==========================================
# 6. RESULTS & VISUALIZATION
# ==========================================
def plot_boosted_results(history, model, X_test, y_test, label_map):
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
    ax1.plot(history.history['accuracy'], label='Train Accuracy'); ax1.plot(history.history['val_accuracy'], label='Val Accuracy')
    ax1.set_title('Boosted Accuracy'); ax1.legend()
    
    ax2.plot(history.history['loss'], label='Train Loss'); ax2.plot(history.history['val_loss'], label='Val Loss')
    ax2.set_title('Boosted Loss'); ax2.legend()
    plt.show()

    y_pred = np.argmax(model.predict(X_test), axis=1)
    y_true = np.argmax(y_test, axis=1)
    cm = confusion_matrix(y_true, y_pred)
    
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Purples', xticklabels=label_map.keys(), yticklabels=label_map.keys())
    plt.title('Final Confusion Matrix'); plt.show()
    print(classification_report(y_true, y_pred, target_names=list(label_map.keys())))

plot_boosted_results(history, model, X_test, y_test, label_map)