In [12]:
# If not installed, you can install randaugment via pip
!pip install randaugment



In [1]:
import os
import random
from PIL import Image
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.models import alexnet
from randaugment import RandAugment

# Custom dataset class
class CustomDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path, label = self.data[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, torch.tensor(label, dtype=torch.long)

# Prepare and load datasets
def prepare_datasets(paths, labels, sample_size=30000, train_size=10000):
    train_data = []
    test_data = []
    for path, label in zip(paths, labels):
        images = [(os.path.join(path, img), label) for img in os.listdir(path) if img.endswith(('.png', '.jpg', '.jpeg'))]
        if len(images) > sample_size:
            images = random.sample(images, sample_size)
        train_data.extend(images[:train_size])
        test_data.extend(images[train_size:])
    return train_data, test_data

# Paths and labels for datasets
dataset_paths = ["C:\\Users\\ysang\\VOC2012\\JPEGImages", "C:\\Users\\ysang\\flickr30k_images", "C:\\Users\\ysang\\imagenet\\imagenet"]
dataset_labels = [0, 1, 2]

# Prepare data
train_data, test_data = prepare_datasets(dataset_paths, dataset_labels)

# Consistent resizing for both train and test data
transform = transforms.Compose([
    transforms.Resize(80),  # Ensure consistent minimum size
    RandAugment(),  # Apply RandAugment for data augmentation
    transforms.ToTensor(),
])

# Apply RandomCrop to train and uniform resize to test
transform_train = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomCrop(64),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.CenterCrop(64),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

# Create datasets
train_dataset = CustomDataset(train_data, transform=transform_train)
test_dataset = CustomDataset(test_data, transform=transform_test)

# Data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Modified AlexNet for 64x64 input images
class AlexNetCustom(nn.Module):
    def __init__(self, num_classes=3):
        super(AlexNetCustom, self).__init__()
        self.features = nn.Sequential(
            # Adjusted architecture for smaller images
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

# Initialize the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = AlexNetCustom(num_classes=3).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [2]:
import torch
from torch.optim import lr_scheduler

# Placeholder for the training loop
def train_model(model, train_loader, criterion, optimizer, num_epochs=10):
    model.train()  # Set the model to training mode

    for epoch in range(num_epochs):
        running_loss = 0.0
        running_corrects = 0

        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            # Statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)

        print(f'Epoch {epoch+1}/{num_epochs} - Loss: {epoch_loss:.4f}, Acc: {epoch_acc:.4f}')

    print('Training complete')

# Placeholder for the testing/evaluation loop
def evaluate_model(model, test_loader, criterion):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0
    running_corrects = 0

    # Disabling gradient calculation is useful for inference
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

            # Statistics
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

    total_loss = running_loss / len(test_loader.dataset)
    total_acc = running_corrects.double() / len(test_loader.dataset)

    print(f'Test Loss: {total_loss:.4f}, Test Acc: {total_acc:.4f}')

# Train the model
train_model(model, train_loader, criterion, optimizer, num_epochs=10)

# Evaluate the model
evaluate_model(model, test_loader, criterion)


Epoch 1/10 - Loss: 0.9791, Acc: 0.4877
Epoch 2/10 - Loss: 0.9171, Acc: 0.5283
Epoch 3/10 - Loss: 0.9016, Acc: 0.5318
Epoch 4/10 - Loss: 0.8943, Acc: 0.5406
Epoch 5/10 - Loss: 0.8852, Acc: 0.5429
Epoch 6/10 - Loss: 0.8736, Acc: 0.5505
Epoch 7/10 - Loss: 0.8700, Acc: 0.5531
Epoch 8/10 - Loss: 0.8649, Acc: 0.5512
Epoch 9/10 - Loss: 0.8573, Acc: 0.5546
Epoch 10/10 - Loss: 0.8596, Acc: 0.5577
Training complete
Test Loss: 0.8927, Test Acc: 0.5337
