In [38]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from torch.utils.data import DataLoader
import pandas as pd
import json
from tqdm import tqdm
import torch.optim as optim
from torch.utils.data import Dataset
from PIL import Image
from sklearn.preprocessing import LabelEncoder

# is pytorch using my gpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [39]:
class CustomModel(nn.Module):
    def __init__(self):
        super(CustomModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(32, 32, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(64 * 25 * 24, 64)  # Adjust size according to your output from last conv layer
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(64, 4)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = F.relu(self.conv3(x))
        x = self.pool(x)
        x = torch.flatten(x, 1)  # Flatten all dimensions except the batch dimension
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [40]:
class CustomDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.image_labels = pd.read_csv(csv_file)
        self.transform = transform
        self.label_encoder = LabelEncoder()
        # Assuming the column with class labels is named 'alteration'
        self.image_labels['alteration'] = self.label_encoder.fit_transform(self.image_labels['alteration'])

    def __len__(self):
        return len(self.image_labels)

    def __getitem__(self, idx):
        # Make sure the path is correctly handled as a string
        img_name = self.image_labels.iloc[idx, 6]
        image = Image.open(img_name).convert('RGB')
        label = self.image_labels.iloc[idx, 4]  # Ensure this column index matches the label column

        if self.transform:
            image = self.transform(image)

        return image,  torch.tensor(label, dtype=torch.long) 

transform = transforms.Compose([
    transforms.Resize((206, 192)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

dataset = CustomDataset(csv_file='data/labels.csv', transform=transform)

train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)

In [41]:
# Initialize the model, criterion, and optimizer
model = CustomModel()
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

early_stopping_patience = 2
early_stopping_counter = 0

best_val_loss = float('inf')
for epoch in range(10):
    model.train()
    for images, labels in tqdm(train_loader):
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

    # Validation phase
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for images, labels in tqdm(val_loader):
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

    val_loss /= len(val_loader)
    print(f'Epoch {epoch+1}, Validation Loss: {val_loss}')

    # Early stopping and checkpointing logic
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'models/best_model_pytorch.pth')
        print("Model improved and saved!")
        early_stopping_counter = 0  # reset counter if performance improved
    else:
        early_stopping_counter += 1
        print(f"No improvement in validation loss for {early_stopping_counter} epochs.")
        if early_stopping_counter >= early_stopping_patience:
            print("Early stopping triggered.")
            break


  7%|▋         | 24/346 [00:38<08:30,  1.58s/it]


KeyboardInterrupt: 

In [None]:
# Loading the model
model = CustomModel()  # Initialize the model again
model.load_state_dict(torch.load('best_model.pth', map_location=device))
model.to(device)  # Move model to the appropriate device after loading
model.eval()  # Set the model to evaluation mode

In [None]:
# Load the model
model = CustomModel()
model.load_state_dict(torch.load('models/cnn_pytorch.pth'))
model.eval()

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score

def get_all_preds(model, loader):
    all_preds = torch.tensor([])
    all_labels = torch.tensor([])
    for batch in loader:
        images, labels = batch
        preds = model(images)
        all_preds = torch.cat((all_preds, preds), dim=0)
        all_labels = torch.cat((all_labels, labels), dim=0)
    return all_preds, all_labels

with torch.no_grad():
    preds, labels = get_all_preds(model, val_loader)
    preds_classes = preds.argmax(dim=1)
    cm = confusion_matrix(labels.numpy(), preds_classes.numpy())
    accuracy = accuracy_score(labels.numpy(), preds_classes.numpy())
    precision = precision_score(labels.numpy(), preds_classes.numpy(), average=None)
    recall = recall_score(labels.numpy(), preds_classes.numpy(), average=None)
    f1 = f1_score(labels.numpy(), preds_classes.numpy(), average=None)

metrics = {
    "accuracy": accuracy,
    "precision": precision.tolist(),
    "recall": recall.tolist(),
    "f1": f1.tolist()
}

with open('metrics.json', 'w') as f:
    json.dump(metrics, f)

print(metrics)


In [31]:
import torch
import time

def gpu_speed_test(device, size=1024):
    num_tests = 50
    timing_results = []

    # Create random matrices
    a = torch.randn(size, size, device=device)
    b = torch.randn(size, size, device=device)

    # Warm up the GPU
    for _ in range(10):
        c = torch.matmul(a, b)
        torch.cuda.synchronize(device)  # Ensure completion of CUDA operations

    # Start timing
    start_time = time.time()
    for _ in range(num_tests):
        c = torch.matmul(a, b)
        torch.cuda.synchronize(device)  # Ensure completion of CUDA operations
    total_time = time.time() - start_time

    print(f"Average time per matrix multiplication: {total_time / num_tests * 1000:.2f} ms")

# Check if CUDA is available and choose the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

if device.type == 'cuda':
    print(f"Device name: {torch.cuda.get_device_name(0)}")

gpu_speed_test(device)

Using device: cuda
Device name: NVIDIA GeForce RTX 3060 Laptop GPU
Average time per matrix multiplication: 0.60 ms


In [34]:
import torch
import time

def cpu_speed_test(size=1024):
    num_tests = 50
    timing_results = []

    # Create random matrices on CPU
    a = torch.randn(size, size)
    b = torch.randn(size, size)

    # Warm up the CPU
    for _ in range(10):
        c = torch.matmul(a, b)

    # Start timing
    start_time = time.time()
    for _ in range(num_tests):
        c = torch.matmul(a, b)
    total_time = time.time() - start_time

    print(f"Average time per matrix multiplication: {total_time / num_tests * 1000:.2f} ms")

print("Testing on CPU...")
cpu_speed_test()

Testing on CPU...
Average time per matrix multiplication: 12.38 ms
