In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset, random_split
from PIL import Image
import os

In [None]:
path_train_plume = "cleanr/train data/images/plume"
path_train_no_plume = "cleanr/train data/images/no_plume"
path_test = "cleanr/test data/images"

In [None]:
# Define a transform for normalization
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5]),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(degrees=15),
])

#
# Define dataset class
class PlumeDataset(Dataset):
    def __init__(self, plume_dir, no_plume_dir, transform=None):
        self.plume_images = [
            os.path.join(plume_dir, img) for img in os.listdir(plume_dir)
        ]
        self.no_plume_images = [
            os.path.join(no_plume_dir, img) for img in os.listdir(no_plume_dir)
        ]
        self.images = self.plume_images + self.no_plume_images
        self.targets = [1] * len(self.plume_images) + [0] * len(self.no_plume_images)
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert("L")
        target = self.targets[idx]
        if self.transform:
            image = self.transform(image)
        return image, target


# Create datasets for both "plume" and "no plume" classes
dataset = PlumeDataset(path_train_plume, path_train_no_plume, transform=transform)

# Split the combined dataset into training and validation sets
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create data loaders
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

# Print the number of samples in each class
print(f"Number of rows in dataset: {len(dataset)}")

In [None]:
vgg = models.vgg16(pretrained=True)  # You can choose different VGG variants (e.g., vgg19)
vgg.features[0] = nn.Conv2d(1, 64, kernel_size=3, padding=1)
# Modify the final fully connected layer for your task
vgg.classifier[6] = nn.Linear(4096, 2)  # Replace the last fully connected layer

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(vgg.parameters(), lr=0.001)

# Training loop
num_epochs = 15
for epoch in range(num_epochs):
    vgg.train()
    running_loss = 0.0
    for images, labels in train_loader:
        optimizer.zero_grad()
        outputs = vgg(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # Print the average loss for this epoch
    print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {running_loss / len(train_loader)}')

# Evaluation
vgg.eval()
# Perform evaluation on a validation dataset or test dataset

In [None]:
# Assuming you have already trained the model and defined the evaluation dataset and data loader (val_loader)

vgg.eval()  # Set the model to evaluation mode
total_correct = 0
total_samples = 0
total_loss = 0.0

with torch.no_grad():
    for images, labels in val_loader:
        outputs = model(images)
        labels = labels.float().unsqueeze(1)
        loss = criterion(outputs, labels)

        # Calculate the binary classification accuracy
        predicted = (outputs > 0.5).float()  # Assuming you're using a sigmoid activation
        correct = (predicted == labels).sum().item()
        total_correct += correct
        total_samples += labels.size(0)
        total_loss += loss.item()

# Calculate accuracy and average loss
accuracy = (total_correct / total_samples) * 100.0
average_loss = total_loss / len(val_loader)

print(f"Validation Accuracy: {accuracy:.2f}%")
print(f"Average Validation Loss: {average_loss:.4f}")
