# Good & Bad Eggs

Ce notebook regroupe tout : prétraitement → features classiques → embeddings profonds → hybrides → réduction de dimension → classifieurs → évaluation.


In [None]:

# Installer les paquets nécessaires (décommente si nécessaire)
# !pip install timm xgboost scikit-image PyWavelets umap-learn joblib nbformat tensorflow

import os, sys, time, json, math, random
from pathlib import Path
import sys
sys.path.insert(0, "numpy_for_cv2")

import numpy as np

import cv2
import pandas as pd

import cv2
import matplotlib.pyplot as plt
from tqdm.auto import tqdm

# Machine learning / DL
import torch
from torchvision import transforms
import timm

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import classification_report, confusion_matrix, f1_score, accuracy_score, recall_score
from sklearn.pipeline import Pipeline
import joblib

# xgboost (import optional if installed)
try:
    import xgboost as xgb
    XGBOOST_AVAILABLE = True
except Exception as e:
    XGBOOST_AVAILABLE = False

print('torch:', torch.__version__, 'timm available:', 'timm' in sys.modules)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Device:', device)


In [None]:

# MODIFY THIS PATH to your local dataset root
DATA_ROOT = Path(r'C:\Users\cheic\Documents\M2 IA MathInfo\DeepL\PROJET FINAL DPL')
original_dir = DATA_ROOT / 'Original Images(Eggs)'
aug_dir = DATA_ROOT / 'Augmented_Images(Eggs)'


def gather_paths(base_dir):    
    rows = []
    if not base_dir.exists():
        return rows
    for label_dir in ['Good Eggs', 'Bad Eggs']:
        p = base_dir / label_dir
        if not p.exists():
            continue
        for img_path in p.glob('*'):
            if img_path.suffix.lower() not in ['.jpg','.jpeg','.png','.bmp']:
                continue
            rows.append({'path': str(img_path), 'label': 'good' if 'Good' in label_dir else 'bad', 'source': base_dir.name})
    return rows

rows = []
rows += gather_paths(original_dir)
rows += gather_paths(aug_dir)

df = pd.DataFrame(rows)
print('Total images found:', len(df))
if len(df)>0:
    df['label_enc'] = df['label'].map({'good':0,'bad':1})
    display(df.head())
else:
    print('No images found. Check DATA_ROOT path.')


In [None]:

def preprocess_cv2(path_or_img, size=(224,224), apply_clahe=True):
    # Accept path or numpy array (HWC BGR or RGB in 0..1)
    if isinstance(path_or_img, str):
        img = cv2.imread(path_or_img, cv2.IMREAD_COLOR)  # BGR uint8
        if img is None:
            raise FileNotFoundError(path_or_img)
    else:
        img = path_or_img.copy()
        if img.dtype == np.float32 or img.dtype == np.float64:
            img = (img * 255).astype('uint8')
        if img.shape[-1] == 3:
            img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    img = cv2.resize(img, size, interpolation=cv2.INTER_AREA)
    lab = cv2.cvtColor(img, cv2.COLOR_BGR2LAB)
    l, a, b = cv2.split(lab)
    if apply_clahe:
        clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8,8))
        l = clahe.apply(l)
    lab = cv2.merge((l,a,b))
    img = cv2.cvtColor(lab, cv2.COLOR_LAB2BGR)
    img = cv2.medianBlur(img, 3)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB).astype('float32') / 255.0
    return img

# preview
if 'df' in globals() and len(df) >= 6:
    sample = df.sample(6, random_state=42).reset_index(drop=True)
    plt.figure(figsize=(12,6))
    for i,row in sample.iterrows():
        try:
            raw = cv2.imread(row['path'])
            proc = preprocess_cv2(row['path'])
            plt.subplot(2,6,i+1); plt.imshow(cv2.cvtColor(raw, cv2.COLOR_BGR2RGB)); plt.title(row['label']); plt.axis('off')
            plt.subplot(2,6,i+7); plt.imshow(proc); plt.title('preprocessed'); plt.axis('off')
        except Exception as e:
            print('Preview error:', e)
    plt.tight_layout()
else:
    print('Not enough images to preview or df missing.')


In [None]:
import numpy as np
import cv2

# -------------------
# HOG
# -------------------
def feat_hog_rgb(img_rgb):
    g = (rgb2gray(img_rgb) * 255).astype('uint8')
    fd = hog(g, orientations=9, pixels_per_cell=(16,16), cells_per_block=(2,2), visualize=False, feature_vector=True)
    return fd

# -------------------
# LBP
# -------------------
def feat_lbp_rgb(img_rgb, P=8, R=1):
    gray = (rgb2gray(img_rgb) * 255).astype('uint8')
    lbp = local_binary_pattern(gray, P, R, method='uniform')
    (hist, _) = np.histogram(lbp.ravel(), bins=np.arange(0, P+3), range=(0, P+2))
    hist = hist.astype("float")
    hist /= (hist.sum() + 1e-6)
    return hist

# -------------------
# GLCM simplifié avec NumPy (sans skimage)
# -------------------
def feat_glcm_rgb(img_rgb, distances=[1], angles=[0]):
    gray = (rgb2gray(img_rgb) * 255).astype('uint8')
    levels = 8
    gray_q = (gray // (256 // levels)).astype('uint8')
    glcm = np.zeros((levels, levels), dtype=float)
    
    # Exemple simple : angle = 0 et distance = 1
    for i in range(gray_q.shape[0]):
        for j in range(gray_q.shape[1]-1):
            a = gray_q[i, j]
            b = gray_q[i, j+1]
            glcm[a, b] += 1
    
    glcm /= glcm.sum() + 1e-6  # normalisation
    
    # Extraire des propriétés simples
    contrast = np.sum((np.arange(levels)[:,None]-np.arange(levels)[None,:])**2 * glcm)
    energy = np.sum(glcm**2)
    homogeneity = np.sum(glcm / (1. + np.abs(np.arange(levels)[:,None]-np.arange(levels)[None,:])))
    
    return np.array([contrast, energy, homogeneity])

# -------------------
# Moments de Hu
# -------------------
def feat_hu_rgb(img_rgb):
    gray = (rgb2gray(img_rgb) * 255).astype('uint8')
    moments = cv2.moments(gray)
    hu = cv2.HuMoments(moments).ravel()
    hu = -np.sign(hu) * np.log10(np.abs(hu) + 1e-6)
    return hu

# -------------------
# Histogramme HSV
# -------------------
def feat_hsv_hist(img_rgb, bins=(16,16,8)):
    hsv = cv2.cvtColor((img_rgb*255).astype('uint8'), cv2.COLOR_RGB2HSV)
    hist = cv2.calcHist([hsv], [0,1,2], None, bins, [0,180,0,256,0,256])
    hist = cv2.normalize(hist, hist).flatten()
    return hist

# -------------------
# Extraction complète
# -------------------
def extract_classical(img_rgb):
    feats = [
        feat_hog_rgb(img_rgb),
        feat_lbp_rgb(img_rgb),
        feat_glcm_rgb(img_rgb),
        feat_hu_rgb(img_rgb),
        feat_hsv_hist(img_rgb)
    ]
    return np.concatenate([f.ravel() for f in feats])


In [None]:

FEATURES_CACHE = Path("C:/Users/cheic/Documents/features/egg_classical_features.npz")

# Crée le dossier automatiquement
FEATURES_CACHE.parent.mkdir(parents=True, exist_ok=True)

def build_classical_features(df, limit=None, force=False):
    if FEATURES_CACHE.exists() and not force:
        print('Loading cached classical features...')
        data = np.load(str(FEATURES_CACHE), allow_pickle=True)
        return data['X'], data['y'], data['paths']
    rows = df if limit is None else df.sample(limit, random_state=42)
    X, y, paths = [], [], []
    for _, r in tqdm(rows.iterrows(), total=len(rows)):
        try:
            img = preprocess_cv2(r['path'])
            feats = extract_classical(img)
            X.append(feats)
            y.append(r['label_enc'])
            paths.append(r['path'])
        except Exception as e:
            print('Error on', r['path'], e)
    X = np.vstack(X)
    y = np.array(y)
    np.savez_compressed(str(FEATURES_CACHE), X=X, y=y, paths=np.array(paths))
    return X, y, paths

print('Call build_classical_features(df) to compute classical features (cached).')


In [None]:

def get_backbone(name='efficientnet_b0', pretrained=True):
    model = timm.create_model(name, pretrained=pretrained, num_classes=0, global_pool='avg')
    model.eval().to(device)
    return model

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((224,224)),
    transforms.Normalize(mean=[0.485,0.456,0.406], std=[0.229,0.224,0.225])
])

def extract_deep_embedding(model, img_rgb):
    from PIL import Image
    arr = (img_rgb*255).astype('uint8')
    pil = Image.fromarray(arr)
    x = transform(pil).unsqueeze(0).to(device)
    with torch.no_grad():
        feat = model(x)
    return feat.cpu().numpy().ravel()

DEEP_CACHE = Path("C:/Users/cheic/Documents/features/egg_deep_features.npz")

def build_deep_features(df, model_name='efficientnet_b0', limit=None, force=False):
    if DEEP_CACHE.exists() and not force:
        print('Loading cached deep features...')
        data = np.load(str(DEEP_CACHE), allow_pickle=True)
        return data['X'], data['y'], data['paths']
    model = get_backbone(model_name)
    rows = df if limit is None else df.sample(limit, random_state=42)
    X, y, paths = [], [], []
    for _, r in tqdm(rows.iterrows(), total=len(rows)):
        try:
            img = preprocess_cv2(r['path'])
            emb = extract_deep_embedding(model, img)
            X.append(emb)
            y.append(r['label_enc'])
            paths.append(r['path'])
        except Exception as e:
            print('Error on', r['path'], e)
    X = np.vstack(X)
    y = np.array(y)
    np.savez_compressed(str(DEEP_CACHE), X=X, y=y, paths=np.array(paths))
    return X, y, paths

print('Deep feature builder ready. Use build_deep_features(df) to compute embeddings (cached).')


In [None]:

def build_hybrid(X_deep, X_classic):
    return np.concatenate([X_deep, X_classic], axis=1)

def reduce_pca(X, n_components=128):
    pca = PCA(n_components=n_components, random_state=42)
    Xr = pca.fit_transform(X)
    return Xr, pca

def build_autoencoder_encoder(input_dim, bottleneck=128):
    try:
        from tensorflow import keras
        from tensorflow.keras import layers
    except Exception as e:
        print('TensorFlow/Keras not available:', e)
        return None, None
    inp = keras.Input(shape=(input_dim,))
    x = layers.Dense(1024, activation='relu')(inp)
    x = layers.Dense(512, activation='relu')(x)
    bott = layers.Dense(bottleneck, activation='relu', name='bottleneck')(x)
    x = layers.Dense(512, activation='relu')(bott)
    x = layers.Dense(1024, activation='relu')(x)
    out = layers.Dense(input_dim, activation='linear')(x)
    ae = keras.Model(inp, out)
    encoder = keras.Model(inp, bott)
    ae.compile(optimizer='adam', loss='mse')
    return ae, encoder

print('Hybrid + reduction helpers ready.')


In [None]:

def train_and_eval(X, y, model_name='svc_rbf', test_size=0.2, random_state=42):
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, stratify=y, random_state=random_state)
    scaler = StandardScaler()
    X_train_s = scaler.fit_transform(X_train)
    X_test_s = scaler.transform(X_test)
    if model_name == 'svc_rbf':
        model = SVC(kernel='rbf', probability=True, class_weight='balanced')
    elif model_name == 'rf':
        model = RandomForestClassifier(n_estimators=200, n_jobs=-1)
    elif model_name == 'xgb':
        if not XGBOOST_AVAILABLE:
            raise RuntimeError('XGBoost not available in this environment')
        model = xgb.XGBClassifier(n_estimators=200, use_label_encoder=False, eval_metric='logloss')
    elif model_name == 'mlp':
        from sklearn.neural_network import MLPClassifier
        model = MLPClassifier(hidden_layer_sizes=(512,128), max_iter=200)
    else:
        model = LogisticRegression(max_iter=200, class_weight='balanced')

    model.fit(X_train_s, y_train)
    y_pred = model.predict(X_test_s)
    report = classification_report(y_test, y_pred, output_dict=True, target_names=['good','bad'])
    cm = confusion_matrix(y_test, y_pred)
    metrics = {'accuracy': accuracy_score(y_test,y_pred), 'f1_macro': f1_score(y_test,y_pred, average='macro'),
               'recall_good': recall_score(y_test,y_pred, pos_label=0), 'recall_bad': recall_score(y_test,y_pred, pos_label=1)}
    return {'model': model, 'scaler': scaler, 'report': report, 'cm': cm, 'metrics': metrics, 'X_test': X_test, 'y_test': y_test, 'y_pred': y_pred}

def show_confusion(cm, labels=['good','bad']):
    plt.figure(figsize=(4,4))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title('Confusion matrix')
    plt.colorbar()
    tick_marks = np.arange(len(labels))
    plt.xticks(tick_marks, labels, rotation=45)
    plt.yticks(tick_marks, labels)
    thresh = cm.max() / 2.
    for i, j in np.ndindex(cm.shape):
        plt.text(j, i, format(cm[i, j], 'd'), horizontalalignment="center", color="white" if cm[i, j] > thresh else "black")
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()

print('Training & evaluation helpers ready.')


In [None]:

# Exécution recommandée (décommente selon besoins)
X_classic, y_classic, paths = build_classical_features(df, limit=None, force=False)
X_deep, y_deep, paths = build_deep_features(df, model_name='efficientnet_b0', limit=None, force=False)
X_hybrid = build_hybrid(X_deep, X_classic)
Xr, pca = reduce_pca(X_hybrid, n_components=128)
res = train_and_eval(Xr, y_classic, model_name='svc_rbf')
print(res['metrics'])
show_confusion(res['cm'])
print('Cells ready - follow instructions in this notebook to run experiments.')


In [None]:
print("Train score:", model.score(X_train, y_train))
print("Test score:", model.score(X_test, y_test))


In [None]:

def save_pipeline(model, scaler, out_path='/mnt/data/experiment_pipeline'):
    os.makedirs(out_path, exist_ok=True)
    joblib.dump({'model': model, 'scaler': scaler}, os.path.join(out_path, 'pipeline.pkl'))
    print('Saved pipeline to', out_path)
