<a href="https://colab.research.google.com/github/cyrkaade/numpy-100/blob/master/binaryclass2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, random_split
import os
import time
import copy

# ==========================================
# 1. Configuration & Hyperparameters
# ==========================================
DATA_DIR = 'dataset'  # Point this to your folder containing 'real' and 'fraud'
BATCH_SIZE = 32
NUM_EPOCHS = 10
LEARNING_RATE = 0.001
# Standard ResNet input size.
# Note: For screen detection, higher res (e.g. 448) might capture pixel grids better
# if 224 proves insufficient.
INPUT_SIZE = 224

# Detect device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# ==========================================
# 2. Data Preparation
# ==========================================

# Define transformations
# We normalize using ImageNet stats because we are using a pre-trained model
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((INPUT_SIZE, INPUT_SIZE)), # Resize to uniform size
        transforms.RandomHorizontalFlip(),           # Augmentation
        transforms.RandomRotation(10),               # Slight rotation
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((INPUT_SIZE, INPUT_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

def get_data_loaders(data_dir):
    # Load the full dataset
    full_dataset = datasets.ImageFolder(data_dir)

    # Calculate split sizes (80% train, 20% val)
    total_size = len(full_dataset)
    train_size = int(0.8 * total_size)
    val_size = total_size - train_size

    # Split the dataset
    train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

    # Apply the transforms manually to the subsets
    # (Standard ImageFolder applies one transform to all, so we override specific indices here
    # or simpler: wrap them in a helper class. For brevity, we assign transforms to the underlying dataset
    # but since random_split doesn't copy, we strictly rely on the loader logic below or use this standard trick:)

    # Reloading with specific transforms for clarity in this snippet:
    train_set = datasets.ImageFolder(data_dir, transform=data_transforms['train'])
    val_set = datasets.ImageFolder(data_dir, transform=data_transforms['val'])

    # We use indices from the split to create subsets with correct transforms
    train_subset = torch.utils.data.Subset(train_set, train_dataset.indices)
    val_subset = torch.utils.data.Subset(val_set, val_dataset.indices)

    dataloaders = {
        'train': DataLoader(train_subset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4),
        'val': DataLoader(val_subset, batch_size=BATCH_SIZE, shuffle=False, num_workers=4)
    }

    dataset_sizes = {'train': len(train_subset), 'val': len(val_subset)}
    class_names = full_dataset.classes # Expect ['fraud', 'real'] or similar

    return dataloaders, dataset_sizes, class_names

# ==========================================
# 3. Model Setup
# ==========================================

def initialize_model(num_classes):
    # Load pre-trained ResNet18
    model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

    # Freeze initial layers (optional, but good for small datasets)
    # for param in model.parameters():
    #     param.requires_grad = False

    # Replace the final fully connected layer
    # ResNet18's last layer is called 'fc' and has 512 input features
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, num_classes)

    return model.to(device)

# ==========================================
# 4. Training Loop
# ==========================================

def train_model(model, criterion, optimizer, dataloaders, sizes, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward
                # Track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / sizes[phase]
            epoch_acc = running_corrects.double() / sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # Deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model

# ==========================================
# 5. Execution
# ==========================================

if __name__ == '__main__':
    # Check if data dir exists
    if not os.path.exists(DATA_DIR):
        print(f"Error: Directory '{DATA_DIR}' not found. Please create it and add 'real' and 'fraud' subfolders.")
    else:
        # Load Data
        dataloaders, dataset_sizes, class_names = get_data_loaders(DATA_DIR)
        print(f"Classes found: {class_names}")

        # Initialize Model
        model_ft = initialize_model(num_classes=len(class_names))

        # Define Loss Function and Optimizer
        criterion = nn.CrossEntropyLoss()

        # SGD with momentum is a solid standard choice
        optimizer_ft = optim.SGD(model_ft.parameters(), lr=LEARNING_RATE, momentum=0.9)

        # Train
        model_ft = train_model(model_ft, criterion, optimizer_ft, dataloaders, dataset_sizes, num_epochs=NUM_EPOCHS)

        # Save Model
        torch.save(model_ft.state_dict(), 'barcode_fraud_detector.pth')
        print("Model saved to barcode_fraud_detector.pth")

        # --- Quick Inference Test (Optional) ---
        print("\n--- Running Inference on a few validation images ---")
        model_ft.eval()
        with torch.no_grad():
            # Get one batch from validation
            inputs, labels = next(iter(dataloaders['val']))
            inputs = inputs.to(device)
            outputs = model_ft(inputs)
            _, preds = torch.max(outputs, 1)

            for i in range(min(5, len(labels))):
                print(f"True: {class_names[labels[i]]} | Predicted: {class_names[preds[i]]}")

In [None]:
# VERSION 2


import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader, Subset
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np
import time
import copy
import os

# ==========================================
# 1. Configuration
# ==========================================
DATA_DIR = 'dataset'
BATCH_SIZE = 32
NUM_EPOCHS = 15
LEARNING_RATE = 0.001
INPUT_SIZE = 224

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# ==========================================
# 2. Advanced Data Preparation (80/10/10)
# ==========================================
data_transforms = {
    'train': transforms.Compose([
        # RandomResizedCrop helps focus on texture (moirÃ©/pixels)
        transforms.RandomResizedCrop(INPUT_SIZE, scale=(0.7, 1.0)),
        transforms.RandomHorizontalFlip(),
        transforms.ColorJitter(brightness=0.1, contrast=0.1),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val_test': transforms.Compose([
        transforms.Resize((INPUT_SIZE, INPUT_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

def get_dataloaders(data_dir):
    full_dataset = datasets.ImageFolder(data_dir)
    n = len(full_dataset)

    # Calculate indices for 80% train, 10% val, 10% test
    indices = list(range(n))
    np.random.shuffle(indices)
    train_split = int(0.8 * n)
    val_split = int(0.9 * n)

    train_idx = indices[:train_split]
    val_idx = indices[train_split:val_split]
    test_idx = indices[val_split:]

    # Create subsets with specific transforms
    train_data = Subset(datasets.ImageFolder(data_dir, transform=data_transforms['train']), train_idx)
    val_data = Subset(datasets.ImageFolder(data_dir, transform=data_transforms['val_test']), val_idx)
    test_data = Subset(datasets.ImageFolder(data_dir, transform=data_transforms['val_test']), test_idx)

    loaders = {
        'train': DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, num_workers=2),
        'val': DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=2),
        'test': DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
    }
    return loaders, full_dataset.classes

# ==========================================
# 3. Model: EfficientNet-B0 (Better for Texture)
# ==========================================
def get_model(num_classes):
    # Using EfficientNet_B0 for better feature extraction at 224px
    model = models.efficientnet_b0(weights=models.EfficientNet_B0_Weights.IMAGENET1K_V1)

    # Update the classifier head
    num_ftrs = model.classifier[1].in_features
    model.classifier[1] = nn.Linear(num_ftrs, num_classes)
    return model.to(device)

# ==========================================
# 4. Training Engine
# ==========================================
def train_and_evaluate():
    loaders, class_names = get_dataloaders(DATA_DIR)
    model = get_model(len(class_names))

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

    # Improvement: Scheduler helps the model settle into the best weights
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.1)

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(NUM_EPOCHS):
        print(f'Epoch {epoch+1}/{NUM_EPOCHS}')

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in loaders[phase]:
                inputs, labels = inputs.to(device), labels.to(device)
                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / len(loaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(loaders[phase].dataset)
            print(f'{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
        print("-" * 15)

    # ==========================================
    # 5. Final Testing (The Unseen Data)
    # ==========================================
    print("\nTraining Complete. Evaluating on Unseen TEST SET...")
    model.load_state_dict(best_model_wts)
    model.eval()

    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in loaders['test']:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # Confusion Matrix Analysis
    print("\nConfusion Matrix:")
    print(confusion_matrix(all_labels, all_preds))
    print("\nDetailed Report:")
    print(classification_report(all_labels, all_preds, target_names=class_names))

    torch.save(model.state_dict(), 'best_fraud_model.pth')
    print("Model saved as best_fraud_model.pth")

if __name__ == '__main__':
    train_and_evaluate()