In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import pandas as pd
import os
from sklearn.model_selection import train_test_split
import numpy as np

In [2]:
class CustomDataset(Dataset):
    def __init__(self, data_dir, labels_file=None, transform=None):
        self.data_dir = data_dir
        self.transform = transform

        if labels_file:
            # Load labels and filenames from the CSV file
            self.labels_df = pd.read_csv(labels_file, names=['filename', 'label'])
        else:
            # For test dataset, only load filenames
            self.filenames = os.listdir(data_dir)

    def __len__(self):
        if hasattr(self, 'labels_df'):
            return len(self.labels_df)
        else:
            return len(self.filenames)

    def __getitem__(self, idx):
        if hasattr(self, 'labels_df'):
            img_name = self.labels_df.iloc[idx, 0]  # Get filename from CSV
            label = self.labels_df.iloc[idx, 1]# Get label from CSV
        else:
            img_name = self.filenames[idx]  # Get filename for test dataset
            label = None
        if not img_name.endswith('.jpg'):
            img_name += '.jpg'
        img_path = os.path.join(self.data_dir, img_name)

        try:
            image = Image.open(img_path).convert("RGB")
        except FileNotFoundError:
            print(f"Warning: File not found: {img_path}")
            return None, None

        if self.transform:
            image = self.transform(image)

        if image is None:
            return None, None  # Return None for both image and label if loading fails

        return image, label

In [3]:
# Define data transformations
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [4]:
# Create datasets
dataset = CustomDataset(data_dir='/home/ryan_benvenuti_uri_edu/ondemand/data/sys/dashboard/batch_connect/sys/bc_jupyter/output/561/train/train', labels_file='/home/ryan_benvenuti_uri_edu/ondemand/data/sys/dashboard/batch_connect/sys/bc_jupyter/output/561/train_labels.csv', transform=data_transforms['train'])
train_dataset, valid_dataset = train_test_split(dataset, test_size=0.2, random_state=42)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=64, shuffle=False)

#test_dataset = CustomDataset(data_dir='home/ryan_benvenuti_uri_edu/ondemand/data/sys/dashboard/batch_connect/sys/bc_jupyter/output/87e02b46-5bd9-428a-a86a-c7093ea737b4/A3/test_files', transform=data_transforms['test'])
#test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Load pre-trained ResNet50
model = models.resnet50(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)
device = torch.device("cuda" if torch.cuda.is_available() else "gpu")
model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)



In [5]:
# Training loop
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    total_samples = 0
    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = torch.tensor(labels).to(device)  # Convert labels to tensor and move to device
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * inputs.size(0)
        total_samples += inputs.size(0)
    return running_loss / total_samples

# Validation loop
def validate(model, valid_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in valid_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

In [6]:
# Training and validation
num_epochs = 10
for epoch in range(num_epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    valid_accuracy = validate(model, valid_loader, device)
    print(f"Epoch {epoch+1}/{num_epochs}, Training Loss: {train_loss:.4f}, Validation Accuracy: {valid_accuracy:.4f}")

  labels = torch.tensor(labels).to(device)  # Convert labels to tensor and move to device


Epoch 1/10, Training Loss: 0.0990, Validation Accuracy: 0.9973
Epoch 2/10, Training Loss: 0.0110, Validation Accuracy: 0.9987
Epoch 3/10, Training Loss: 0.0049, Validation Accuracy: 0.9982
Epoch 4/10, Training Loss: 0.0042, Validation Accuracy: 0.9987
Epoch 5/10, Training Loss: 0.0025, Validation Accuracy: 0.9982
Epoch 6/10, Training Loss: 0.0023, Validation Accuracy: 0.9987
Epoch 7/10, Training Loss: 0.0017, Validation Accuracy: 0.9982
Epoch 8/10, Training Loss: 0.0013, Validation Accuracy: 0.9987
Epoch 9/10, Training Loss: 0.0013, Validation Accuracy: 0.9987
Epoch 10/10, Training Loss: 0.0006, Validation Accuracy: 0.9987


In [None]:
# Define custom dataset for testing without labels
class TestDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform

    def __len__(self):
        return len(os.listdir(self.data_dir))

    def __getitem__(self, idx):
        img_name = os.listdir(self.data_dir)[idx]
        img_path = os.path.join(self.data_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image

# Test function
def test(model, test_data, device):
    model.eval()
    test_predictions = []
    with torch.no_grad():
        for inputs in test_data:
            inputs = inputs.to(device)
            outputs = model(inputs)
            test_predictions.append(outputs.cpu())
    return torch.cat(test_predictions, dim=0)

In [None]:
# Create test dataset and data loader
test_dataset = TestDataset(data_dir='test_data', transform=data_transforms['test'])
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Testing
test_predictions = test(model, test_loader, device)

# Save test predictions to a .pt file
torch.save(test_predictions, 'prediction.pt')