In [10]:
import os, json, random
from pathlib import Path
from collections import Counter, defaultdict

import numpy as np
import pandas as pd

# Images & features
import cv2
from skimage.feature import hog

# Plotting
import matplotlib.pyplot as plt

# ML
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split, GridSearchCV, StratifiedKFold, cross_val_score
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (confusion_matrix, classification_report, roc_auc_score, 
                             precision_recall_fscore_support, roc_curve, auc)

# Optional DL
try:
    import torch
    from torch.utils.data import Dataset, DataLoader
    from torchvision import transforms, models
    import torch.nn as nn
    import torch.optim as optim
    TORCH_OK = True
except Exception as e:
    TORCH_OK = False
    print("PyTorch not available; DL section will be skipped. Reason:", e)

In [18]:
# ---- Project Paths (edit these) ----
ROOT = os.path.join('..', 'dataset', 'malaria')
TRAIN_JSON = os.path.join(ROOT, 'training.json')
TEST_JSON = os.path.join(ROOT, 'test.json')
IMAGES_DIR = os.path.join(ROOT, 'images')

# ---- Random Seeds ----
SEED = 42
random.seed(SEED); np.random.seed(SEED)

# ---- Feature Params ----
HOG_PARAMS = dict(
    pixels_per_cell=(8,8),
    cells_per_block=(2,2),
    orientations=9,
    block_norm='L2-Hys'
)
HSV_BINS = (8, 8, 8)  # H, S, V

print("Config loaded.")
print("ROOT:", ROOT)

Config loaded.
ROOT: ..\dataset\malaria


In [19]:
print(train_items[0])

{'image': {'checksum': '676bb8e86fc2dbf05dd97d51a64ac0af', 'pathname': '/images/8d02117d-6c71-4e47-b50a-6cc8d5eb1d55.png', 'shape': {'r': 1200, 'c': 1600, 'channels': 3}}, 'objects': [{'bounding_box': {'minimum': {'r': 1057, 'c': 1440}, 'maximum': {'r': 1158, 'c': 1540}}, 'category': 'red blood cell'}, {'bounding_box': {'minimum': {'r': 868, 'c': 1303}, 'maximum': {'r': 971, 'c': 1403}}, 'category': 'red blood cell'}, {'bounding_box': {'minimum': {'r': 578, 'c': 900}, 'maximum': {'r': 689, 'c': 1008}}, 'category': 'red blood cell'}, {'bounding_box': {'minimum': {'r': 304, 'c': 611}, 'maximum': {'r': 408, 'c': 713}}, 'category': 'red blood cell'}, {'bounding_box': {'minimum': {'r': 198, 'c': 881}, 'maximum': {'r': 312, 'c': 1003}}, 'category': 'red blood cell'}, {'bounding_box': {'minimum': {'r': 193, 'c': 1480}, 'maximum': {'r': 293, 'c': 1574}}, 'category': 'red blood cell'}, {'bounding_box': {'minimum': {'r': 257, 'c': 1384}, 'maximum': {'r': 364, 'c': 1502}}, 'category': 'red blood 

# 3. Utility functions: JSON, validation, previews

In [14]:

def load_json_safe(path):
    # handle both string and Path
    if isinstance(path, str):
        if not os.path.exists(path):
            print(f"[WARN] JSON not found: {path}")
            return None
        with open(path, "r") as f:
            return json.load(f)
    else:  # assume Path
        if not path.exists():
            print(f"[WARN] JSON not found: {path}")
            return None
        with open(path, "r") as f:
            return json.load(f)


def check_images_exist(items, images_dir: Path, img_key='image'):
    missing = []
    for it in items:
        img_name = it.get(img_key)
        if img_name is None:
            missing.append(None)  # Or some other placeholder
            continue

        # Check if img_name is a string; if not, skip or handle the error
        if not isinstance(img_name, str):
            print(f"[WARN] Unexpected image name type: {type(img_name)}, skipping")
            missing.append(None)  # Or some other placeholder
            continue

        if not (images_dir / img_name).exists():
            missing.append(img_name)
    return missing

def summarize_labels(items, label_key='label'):
    cnt = Counter([it.get(label_key, 'UNK') for it in items])
    return pd.DataFrame({'label': list(cnt.keys()), 'count': list(cnt.values())}).sort_values('count', ascending=False)

def show_class_distribution(df_counts):
    if df_counts is None or len(df_counts)==0:
        print("[INFO] No counts to display.")
        return
    plt.figure()
    plt.bar(df_counts['label'], df_counts['count'])
    plt.title("Class Distribution")
    plt.xlabel("Label"); plt.ylabel("Count")
    plt.xticks(rotation=30, ha='right')
    plt.show()

# 4. Load data & validate

In [15]:
train_items = load_json_safe(TRAIN_JSON) or []
test_items  = load_json_safe(TEST_JSON)  or []

print(f"Train items: {len(train_items)}  |  Test items: {len(test_items)}")

# Basic sanity checks
miss_train = check_images_exist(train_items, IMAGES_DIR, img_key='image')
miss_test  = check_images_exist(test_items, IMAGES_DIR, img_key='image')

if miss_train:
    print(f"[WARN] Missing {len(miss_train)} training images (first 5):", miss_train[:5])
if miss_test:
    print(f"[WARN] Missing {len(miss_test)} test images (first 5):", miss_test[:5])

# Optional EDA
train_counts = summarize_labels(train_items, 'label') if train_items else None
test_counts  = summarize_labels(test_items, 'label') if test_items else None

display(train_counts.head() if train_counts is not None else "No train data")
display(test_counts.head() if test_counts is not None else "No test data")

# Uncomment to visualize
# show_class_distribution(train_counts)
# show_class_distribution(test_counts)

Train items: 1208  |  Test items: 120
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[WARN] Unexpected image name type: <class 'dict'>, skipping
[W

Unnamed: 0,label,count
0,UNK,1208


Unnamed: 0,label,count
0,UNK,120


# 5. Feature extractors

In [17]:
def safe_imread(path: Path):
    img = cv2.imread(str(path))
    if img is None:
        raise FileNotFoundError(f"Failed to read image: {path}")
    return img


def hsv_hist(img_bgr, bins=HSV_BINS):
    hsv = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2HSV)
    hist = cv2.calcHist([hsv], [0,1,2], None, bins, [0,180, 0,256, 0,256])
    hist = cv2.normalize(hist, hist).flatten()
    return hist

def hog_features(img_bgr, hog_params=HOG_PARAMS):
    gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
    feats = hog(gray, **hog_params)
    return feats

def extract_features_for_item(item, images_dir: Path):
    img_name = item.get('image')
    label = item.get('label')
    path = images_dir / img_name
    img = safe_imread(path)
    return {
        'image': img_name,
        'label': label,
        'hsv_hist': hsv_hist(img),
        'hog': hog_features(img),
    }

def build_feature_table(items, images_dir: Path):
    rows = []
    failed = 0
    for it in items:
        try:
            feats = extract_features_for_item(it, images_dir)
            rows.append(feats)
        except Exception as e:
            failed += 1
            if failed < 5:
                print("[WARN] Skipping item due to error:", e)
    if failed:
        print(f"[INFO] Skipped {failed} items due to errors.")
    return rows

if train_items:
    train_rows = build_feature_table(train_items, IMAGES_DIR)
    print(f"Extracted features for {len(train_rows)} train images.")
else:
    train_rows = []
if test_items:
    test_rows  = build_feature_table(test_items, IMAGES_DIR)
    print(f"Extracted features for {len(test_rows)} test images.")
else:
    test_rows = []

# Convert to flat X, y with concatenated features (HSV+HOG)
def rows_to_Xy(rows):
    if not rows:
        return np.zeros((0, 1)), np.array([])
    X = []
    y = []
    for r in rows:
        feat = np.concatenate([r['hsv_hist'], r['hog']])
        X.append(feat)
        y.append(r['label'])
    return np.array(X), np.array(y)

X_train_raw, y_train_raw = rows_to_Xy(train_rows)
X_test_raw,  y_test_raw  = rows_to_Xy(test_rows)

print("X_train_raw shape:", X_train_raw.shape, "| y_train_raw:", y_train_raw.shape)
print("X_test_raw  shape:", X_test_raw.shape,  "| y_test_raw:",  y_test_raw.shape)

[WARN] Skipping item due to error: unsupported operand type(s) for /: 'str' and 'dict'
[WARN] Skipping item due to error: unsupported operand type(s) for /: 'str' and 'dict'
[WARN] Skipping item due to error: unsupported operand type(s) for /: 'str' and 'dict'
[WARN] Skipping item due to error: unsupported operand type(s) for /: 'str' and 'dict'
[INFO] Skipped 1208 items due to errors.
Extracted features for 0 train images.
[WARN] Skipping item due to error: unsupported operand type(s) for /: 'str' and 'dict'
[WARN] Skipping item due to error: unsupported operand type(s) for /: 'str' and 'dict'
[WARN] Skipping item due to error: unsupported operand type(s) for /: 'str' and 'dict'
[WARN] Skipping item due to error: unsupported operand type(s) for /: 'str' and 'dict'
[INFO] Skipped 120 items due to errors.
Extracted features for 0 test images.
X_train_raw shape: (0, 1) | y_train_raw: (0,)
X_test_raw  shape: (0, 1) | y_test_raw: (0,)


# 6. Traditional ML Pipelines

In [None]:

# Encode labels
lbl_enc = LabelEncoder()
if len(y_train_raw) > 0:
    y_train = lbl_enc.fit_transform(y_train_raw)
else:
    y_train = np.array([])
if len(y_test_raw) > 0:
    y_test  = lbl_enc.transform(y_test_raw) if set(y_test_raw).issubset(set(lbl_enc.classes_)) else np.array([])
else:
    y_test = np.array([])

def train_svm(X, y):
    pipe = Pipeline([('scaler', StandardScaler()), ('clf', SVC(probability=True))])
    param_grid = {
        'clf__C': [0.1, 1, 10],
        'clf__kernel': ['rbf', 'linear'],
        'clf__gamma': ['scale', 'auto']
    }
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED) if len(np.unique(y))>1 else 3
    grid = GridSearchCV(pipe, param_grid, cv=cv, scoring='f1_macro', n_jobs=-1)
    grid.fit(X, y)
    return grid

def train_rf(X, y):
    pipe = Pipeline([('clf', RandomForestClassifier(random_state=SEED))])
    param_grid = {
        'clf__n_estimators': [100, 300],
        'clf__max_depth': [None, 10, 20],
        'clf__min_samples_split': [2, 5]
    }
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=SEED) if len(np.unique(y))>1 else 3
    grid = GridSearchCV(pipe, param_grid, cv=cv, scoring='f1_macro', n_jobs=-1)
    grid.fit(X, y)
    return grid

svm_model = None
rf_model = None
if X_train_raw.shape[0] > 10 and len(np.unique(y_train)) > 1:
    svm_model = train_svm(X_train_raw, y_train)
    rf_model  = train_rf(X_train_raw, y_train)
    print("SVM best params:", svm_model.best_params_)
    print("RF  best params:", rf_model.best_params_)
else:
    print("[INFO] Not enough data to run model training (need >10 samples and >1 class).")

# 7. Evaluation

In [None]:

def evaluate_model(name, model, X, y, label_encoder):
    if model is None or len(y)==0 or X.shape[0]==0:
        print(f"[SKIP] {name}: no data/model.")
        return None
    y_pred = model.predict(X)
    y_proba = None
    try:
        y_proba = model.predict_proba(X)
    except Exception:
        pass
    
    print(f"\\n==== {name} ====")
    print(classification_report(y, y_pred, target_names=label_encoder.classes_))
    cm = confusion_matrix(y, y_pred)
    print("Confusion Matrix:\\n", cm)
    
    # ROC-AUC (macro) if probabilities available
    if y_proba is not None and y_proba.ndim==2 and y_proba.shape[1]>1:
        try:
            auc_macro = roc_auc_score(y, y_proba, multi_class='ovo', average='macro')
            print("ROC-AUC (macro, OVO):", auc_macro)
        except Exception as e:
            print("[INFO] ROC-AUC skipped:", e)

_ = evaluate_model("SVM (val on train set shown for demo)", svm_model, X_train_raw, y_train, lbl_enc)
_ = evaluate_model("RF  (val on train set shown for demo)",  rf_model,  X_train_raw, y_train, lbl_enc)

# If test split exists
_ = evaluate_model("SVM (TEST)", svm_model, X_test_raw, y_test, lbl_enc) if len(y_test)>0 else None
_ = evaluate_model("RF  (TEST)", rf_model,  X_test_raw, y_test, lbl_enc) if len(y_test)>0 else None

# 8. Cross-Validation quick check

In [None]:

def cv_score(model, X, y, cv_splits=5):
    if X.shape[0] == 0 or len(np.unique(y)) < 2:
        print("[SKIP] Not enough data/classes for CV.")
        return None
    cv = StratifiedKFold(n_splits=cv_splits, shuffle=True, random_state=SEED)
    scores = cross_val_score(model, X, y, cv=cv, scoring='f1_macro', n_jobs=-1)
    print(f"CV f1_macro: mean={scores.mean():.4f} ± {scores.std():.4f}")
    return scores

# Example (use base configs to avoid nesting GridSearch inside CV for demo)
svm_base = Pipeline([('scaler', StandardScaler()), ('clf', SVC(kernel='rbf', C=1.0, gamma='scale', probability=True))])
rf_base  = Pipeline([('clf', RandomForestClassifier(n_estimators=200, random_state=SEED))])

_ = cv_score(svm_base, X_train_raw, y_train, cv_splits=5)
_ = cv_score(rf_base,  X_train_raw, y_train, cv_splits=5)

# 9. Optional DL section

In [None]:


if TORCH_OK and len(train_items) > 0:
    class JsonImageDataset(Dataset):
        def __init__(self, items, images_dir, transform=None):
            self.items = items
            self.images_dir = Path(images_dir)
            self.transform = transform
        def __len__(self):
            return len(self.items)
        def __getitem__(self, idx):
            it = self.items[idx]
            img_path = self.images_dir / it['image']
            img = cv2.imread(str(img_path))
            if img is None:
                raise FileNotFoundError(str(img_path))
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            if self.transform:
                img = self.transform(image=img) if callable(self.transform) and 'image' in self.transform.__code__.co_varnames else self.transform(img)
            else:
                # default to tensor conversion [0,1]
                img = torch.tensor(img, dtype=torch.float32).permute(2,0,1)/255.0
            label = it['label']
            return img, label

    # Label map
    classes = sorted({it['label'] for it in train_items})
    class_to_idx = {c:i for i,c in enumerate(classes)}
    def encode_labels_inplace(items):
        for it in items:
            it['label'] = class_to_idx[it['label']]
    encode_labels_inplace(train_items)
    encode_labels_inplace(test_items)

    # Simple transforms
    def to_tensor(img):
        return torch.tensor(img, dtype=torch.float32).permute(2,0,1)/255.0

    train_ds = JsonImageDataset(train_items, IMAGES_DIR, transform=to_tensor)
    test_ds  = JsonImageDataset(test_items,  IMAGES_DIR, transform=to_tensor) if len(test_items)>0 else None

    train_loader = DataLoader(train_ds, batch_size=16, shuffle=True)
    test_loader  = DataLoader(test_ds, batch_size=16, shuffle=False) if test_ds else None

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # Tiny CNN baseline
    class TinyCNN(nn.Module):
        def __init__(self, num_classes):
            super().__init__()
            self.net = nn.Sequential(
                nn.Conv2d(3, 16, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
                nn.Conv2d(16, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
                nn.AdaptiveAvgPool2d((1,1))
            )
            self.fc = nn.Linear(32, num_classes)
        def forward(self, x):
            x = self.net(x)
            x = x.view(x.size(0), -1)
            return self.fc(x)

    model = TinyCNN(num_classes=len(classes)).to(device)
    crit = nn.CrossEntropyLoss()
    opt  = optim.Adam(model.parameters(), lr=1e-3)

    def train_epoch(loader):
        model.train()
        total=0; correct=0; loss_sum=0.0
        for x,y in loader:
            x = x.to(device); y = torch.tensor(y, dtype=torch.long, device=device)
            opt.zero_grad()
            out = model(x)
            loss = crit(out, y)
            loss.backward(); opt.step()
            loss_sum += loss.item()*x.size(0)
            pred = out.argmax(1)
            correct += (pred==y).sum().item()
            total += x.size(0)
        return loss_sum/total, correct/total

    def eval_epoch(loader):
        model.eval()
        total=0; correct=0; loss_sum=0.0
        with torch.no_grad():
            for x,y in loader:
                x = x.to(device); y = torch.tensor(y, dtype=torch.long, device=device)
                out = model(x)
                loss = crit(out, y)
                loss_sum += loss.item()*x.size(0)
                pred = out.argmax(1)
                correct += (pred==y).sum().item()
                total += x.size(0)
        return loss_sum/total, correct/total

    EPOCHS = 3
    for ep in range(1, EPOCHS+1):
        tr_loss, tr_acc = train_epoch(train_loader)
        print(f"[DL] Epoch {ep}: loss={tr_loss:.4f} acc={tr_acc:.4f}")
        if test_loader:
            te_loss, te_acc = eval_epoch(test_loader)
            print(f"      Test: loss={te_loss:.4f} acc={te_acc:.4f}")
else:
    print("[INFO] Skipping DL section (PyTorch not available or no data).")

# 10. Results summary

In [None]:

def summarize_models(models, X_train, y_train, label_encoder):
    rows = []
    for name, mdl in models.items():
        if mdl is None or X_train.shape[0]==0:
            continue
        y_pred = mdl.predict(X_train)
        p, r, f1, _ = precision_recall_fscore_support(y_train, y_pred, average='macro', zero_division=0)
        rows.append({'model': name, 'precision_macro': p, 'recall_macro': r, 'f1_macro': f1})
    if rows:
        df = pd.DataFrame(rows).sort_values('f1_macro', ascending=False)
        display(df)
    else:
        print("[INFO] No models to summarize.")
        
summarize_models({
    'SVM (best)': svm_model,
    'RF  (best)': rf_model
}, X_train_raw, y_train, lbl_enc)
