In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import pandas as pd
from pathlib import Path
from sklearn.model_selection import train_test_split
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

# Reproducibility
SEED = 42
torch.manual_seed(SEED)
np.random.seed(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


In [5]:
# Config & Paths

IMG_DIR = Path("C:\Recovered\BOAZ\study\projects\samsung\samsung_data")
JSON_PATH = Path(r"C:\Recovered\BOAZ\study\projects\samsung\samsung_data\annotations.json")

IMAGE_SIZE = 50                                 # Good balance for simple CNN + speed
BATCH_SIZE = 256 # chat says 512-1024 might be possible
NUM_EPOCHS = 20
LEARNING_RATE = 0.001
NUM_WORKERS = 4 if torch.cuda.is_available() else 0

# Make sure paths exist
assert IMG_DIR.exists(), f"Image folder not found: {IMG_DIR}"
assert JSON_PATH.exists(), f"JSON not found: {JSON_PATH}"

In [None]:
# 1. Load the JSON (adding lines=True just in case it's a JSONL file)

try:

    df = pd.read_json(JSON_PATH)

except ValueError:

    # If the above fails, it's likely a 'JSON Lines' format

    df = pd.read_json(JSON_PATH, lines=True)



# 2. Set your column name

# Based on your prompt, we are using 'artifact'

label_column = 'artifact'



# 3. Validation Check: Make sure the column actually exists

if label_column not in df.columns:

    print(f"❌ Error: '{label_column}' not found. Available columns: {list(df.columns)}")

    # Stop here if the column is missing

else:

    print("✅ Original columns:", list(df.columns))

    print(f"✅ Using label column: '{label_column}'")



    # 4. Create clean integer labels

    # .unique() gives us the distinct artifact names

    labels = sorted(df[label_column].unique())

    label2idx = {label: i for i, label in enumerate(labels)}

    idx2label = {i: label for label, i in label2idx.items()}



    # Map the text labels (e.g., 'blur') to integers (e.g., 0)

    df["label"] = df[label_column].map(label2idx).astype(int)



    print("\nClasses found:", labels)

    print("Number of classes:", len(labels))



    # 5. Stratified train/val split

    # 'stratify' ensures both sets have the same % of each artifact type

    train_df, val_df = train_test_split(

        df,

        test_size=0.2,

        random_state=42, # Replacing SEED with a literal if not defined

        stratify=df["label"]

    )



    print(f"\nTrain: {len(train_df)} images")

    print(f"Val:   {len(val_df)} images")

In [None]:
class DistortionDataset(Dataset):
    def __init__(self, df, img_dir, transform=None):
        self.df = df.reset_index(drop=True)
        self.img_dir = Path(img_dir)
        self.transform = transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = self.img_dir / row["filename"]

        image = Image.open(img_path).convert("RGB")

        label = int(row["label"])

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
# Training transforms (with augmentation)
train_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(15),
    transforms.ToTensor()
    # ,transforms.Normalize(mean=[0.485, 0.456, 0.406],
     #                    std=[0.229, 0.224, 0.225])
])

# Validation transforms (no augmentation)
val_transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor()
    # ,transforms.Normalize(mean=[0.485, 0.456, 0.406],
     #                    std=[0.229, 0.224, 0.225])
])

# Datasets
train_dataset = DistortionDataset(train_df, IMG_DIR, transform=train_transform)
val_dataset   = DistortionDataset(val_df,   IMG_DIR, transform=val_transform)

# DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,
                         num_workers=NUM_WORKERS, pin_memory=True)
val_loader   = DataLoader(val_dataset,   batch_size=BATCH_SIZE, shuffle=False,
                         num_workers=NUM_WORKERS, pin_memory=True)

print("DataLoaders ready!")

In [None]:
# Model

class SimpleCNN(nn.Module):
    def __init__(self, num_classes):
        super().__init__()

        self.features = nn.Sequential(
            # Block 1: Input 3x50x50 -> Output 32x25x25
            nn.Conv2d(3, 32, kernel_size=3, padding=1), # in_channels, out_channels, stride, padding, dilation, group, bias
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2), # kernel_size=2, stride = kernel_size = 2, 128 → 64 → 32 → 16 → 8

            # Block 2
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2)
        )

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 12 * 12, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x


# Instantiate model
model = SimpleCNN(num_classes=len(labels)).to(device)
print(model)
print(f"Total parameters: {sum(p.numel() for p in model.parameters()):,}")

Input image: 3 × 128 × 128

After 4 blocks of conv + maxpool(2): spatial size is divided by 2 four times
128 ÷ 2 ÷ 2 ÷ 2 ÷ 2 = 8

Last conv layer had 256 filters → feature map size = 256 × 8 × 8

We need to turn this 3D tensor into 1D vector before Linear layer
→ nn.Flatten() does that

256 channels × 8 height × 8 width = 16384 numbers

First linear layer: 16384 → 512

Then dropout (randomly zero 50% of values during training → prevents overfitting)

Final linear: 512 → number of your classes

Classic VGG-style head.

In [None]:
# Loss, Optimizer & Scheduler

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=3, verbose=True)

In [None]:
# Training Loop

best_acc = 0.0
history = {"train_loss": [], "val_loss": [], "val_acc": []}

for epoch in range(NUM_EPOCHS):
    # === Training ===
    model.train()
    train_loss = 0.0

    for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS} [Train]"):
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)

    # === Validation ===
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for images, labels in tqdm(val_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS} [Val]"):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)

            val_loss += loss.item()

            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    val_loss /= len(val_loader)
    val_acc = 100. * correct / total

    # Scheduler
    scheduler.step(val_acc)

    history["train_loss"].append(train_loss)
    history["val_loss"].append(val_loss)
    history["val_acc"].append(val_acc)

    print(f"\nEpoch {epoch+1:2d} | "
          f"Train Loss: {train_loss:.4f} | "
          f"Val Loss: {val_loss:.4f} | "
          f"Val Acc: {val_acc:.2f}%")

    # Save best model
    if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), "best_distortion_cnn.pth")
        print(f"   → New best model saved! ({best_acc:.2f}%)")

In [1]:
# Plot Training History

plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history["train_loss"], label="Train Loss")
plt.plot(history["val_loss"], label="Val Loss")
plt.title("Loss")
plt.xlabel("Epoch")
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history["val_acc"], label="Val Accuracy")
plt.title("Validation Accuracy")
plt.xlabel("Epoch")
plt.legend()

plt.tight_layout()
plt.show()

NameError: name 'plt' is not defined

In [2]:
# Evaluation & Confusion Matrix

from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for images, labels in val_loader:
        images = images.to(device)
        outputs = model(images)
        _, preds = outputs.max(1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.numpy())

print(classification_report(all_labels, all_preds, target_names=labels))

# Confusion matrix
cm = confusion_matrix(all_labels, all_preds)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
            xticklabels=labels, yticklabels=labels)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Confusion Matrix")
plt.show()

KeyboardInterrupt: 