In [1]:
import torchvision
import torch.nn as nn
import torch
import torchvision.transforms as transforms
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np

In [2]:
if torch.cuda.is_available():
    device = torch.device("cuda:0")
elif torch.backends.mps.is_available():
    device = torch.device("mps")

print(device)

cuda:0


In [3]:
print(torch.cuda.get_device_name())
print(torch.__version__)
print(torch.version.cuda)
x = torch.randn(1).cuda()
print(x)

NVIDIA GeForce GTX 960M
2.1.0+cu118
11.8
tensor([0.7440], device='cuda:0')


In [16]:
torch.manual_seed(17)

normalize = transforms.Normalize(
    mean=[0.485, 0.456, 0.406],
    std=[0.229, 0.224, 0.225]
)

train_augs = torchvision.transforms.Compose(
    [
        # torchvision.transforms.RandomResizedCrop(size=(224,224), scale=(0.9, 1.0), ratio=(0.9, 1.1)),
        torchvision.transforms.Resize(128),
        # torchvision.transforms.RandomHorizontalFlip(),
        torchvision.transforms.ToTensor(),
        # normalize,
    ]
)

val_augs = torchvision.transforms.Compose(
    [
        torchvision.transforms.Resize(128),
        # torchvision.transforms.CenterCrop(224),
        torchvision.transforms.ToTensor(),
        # normalize,
    ]
)

In [17]:
# pretrained_net = torchvision.models.resnet18(weights=torchvision.models.ResNet18_Weights.IMAGENET1K_V1)
pretrained_net = torchvision.models.regnet_x_800mf(weights="IMAGENET1K_V2")

In [18]:
pretrained_net

RegNet(
  (stem): SimpleStemIN(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
  )
  (trunk_output): Sequential(
    (block1): AnyStage(
      (block1-0): ResBottleneckBlock(
        (proj): Conv2dNormActivation(
          (0): Conv2d(32, 64, kernel_size=(1, 1), stride=(2, 2), bias=False)
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
        (f): BottleneckTransform(
          (a): Conv2dNormActivation(
            (0): Conv2d(32, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
            (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            (2): ReLU(inplace=True)
          )
          (b): Conv2dNormActivation(
            (0): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), groups=4, bias=False)
            

In [19]:
class LeNet(nn.Module):
    def __init__(self):
        super().__init__()
        # your code here
        self.model = nn.Sequential(
            nn.Conv2d(3, 20, 5),
            nn.ReLU(),
            nn.AvgPool2d(2, stride=2),
            nn.Conv2d(20, 50, 5),
            nn.ReLU(),
            nn.AvgPool2d(2, stride=2),
            nn.Flatten(),
            nn.Linear(800, 300),
            nn.ReLU(),
            nn.Linear(300, 100),
            nn.ReLU(),
            nn.Linear(100, 2),
        )

    def forward(self, x):
        # your code here
        return self.model(x)

In [20]:
finetuned_net = nn.Sequential(
    pretrained_net,
    nn.ReLU(),
    nn.Linear(1000, 2),
)

In [21]:
import time
import copy

train_losses = []
val_losses = []

def train_model(
    model, dataloaders, criterion, optimizer, num_epochs=25
):
    since = time.time()
    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(1, num_epochs + 1):
        print("Epoch {}/{}".format(epoch, num_epochs))
        print("-" * 10)

        # Each epoch has a training and validation phase
        for phase in ["train", "val"]:
            if phase == "train":
                model.train()  # Set model to training mode
            else:
                model.eval()  # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == "train"):
                    # Get model outputs and calculate loss

                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.float() / len(dataloaders[phase].dataset)


            print("{} Loss: {:.4f} Acc: {:.4f}".format(phase, epoch_loss, epoch_acc))

            if phase == "train":
                train_losses.append(epoch_loss)
            else:
                val_losses.append(epoch_loss)

            # deep copy the model
            if phase == "val" and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == "val":
                val_acc_history.append(epoch_acc)


        print()

    time_elapsed = time.time() - since
    print(
        "Training complete in {:.0f}m {:.0f}s".format(
            time_elapsed // 60, time_elapsed % 60
        )
    )
    print("Best val Acc: {:4f}".format(best_acc))

    model.load_state_dict(best_model_wts)
    return model, val_acc_history

In [22]:
import os

data_dir = "images_by_side"
batch_size = 32

# model_ft = finetuned_net.to(device)
model_ft = LeNet().to(device)
train_iter = torch.utils.data.DataLoader(
    torchvision.datasets.ImageFolder(
        os.path.join(data_dir, "train"), transform=train_augs
    ),
    batch_size=batch_size,
    shuffle=True,
)
val_iter = torch.utils.data.DataLoader(
    torchvision.datasets.ImageFolder(
        os.path.join(data_dir, "val"), transform=val_augs
    ),
    shuffle=True,
    batch_size=batch_size,
)
loss = nn.CrossEntropyLoss(reduction="none")

In [23]:
def train_fine_tuning(net, learning_rate, num_epochs=30):

    # trainer = torch.optim.SGD([{"params": finetuned_net[2].parameters(), "lr": learning_rate*5}], lr=learning_rate)
    trainer = torch.optim.SGD(net.parameters(), lr=learning_rate)

    dataloaders_dict = {"train": train_iter, "val": val_iter}
    criterion = nn.CrossEntropyLoss()
    model_ft, hist = train_model(
        net, dataloaders_dict, criterion, trainer, num_epochs=num_epochs
    )
    return model_ft, hist


In [24]:
model_ft, hist = train_fine_tuning(model_ft, learning_rate=5e-4, num_epochs=25)

Epoch 1/25
----------


RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x42050 and 800x300)

In [None]:
starting_epoch = 0
y = list(range(starting_epoch, len(train_losses)))
plt.figure(figsize=(10, 5))
plt.plot(y, train_losses[starting_epoch:], label='Train Loss')
plt.scatter(y, train_losses[starting_epoch:])
plt.plot(y, val_losses[starting_epoch:], label='Validation Loss')
plt.scatter(y, val_losses[starting_epoch:])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation Loss')
plt.show()

In [71]:

def imshow(inp, title=None):
    """Display image for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated

In [72]:
def predict_image(image_path, model, transform):
    image = Image.open(image_path).convert('RGB')

    image = transform(image).unsqueeze(0)

    image = image.to(device)

    model.eval()

    with torch.no_grad():
        output = model(image)

    _, predicted = torch.max(output, 1)

    return predicted.item(), image.cpu().squeeze()

def visualize_prediction(image_path, model, transform, class_names):
    predicted_class, image_tensor = predict_image(image_path, model, transform)
    # plt.figure(figsize=(6, 6))
    # imshow(image_tensor, title=f'Predicted: {class_names[predicted_class]}')
    # plt.show()
    return class_names[predicted_class]

def visualize_predictionWithImage(image_path, model, transform, class_names):
    predicted_class, image_tensor = predict_image(image_path, model, transform)
    plt.figure(figsize=(6, 6))
    imshow(image_tensor, title=f'Predicted: {class_names[predicted_class]}')
    plt.show()
    return class_names[predicted_class]

image_path = 'images_by_side/train/right/13c2ur549vohc0jat2dvu3xrxb1_27.png'

class_names = ["left", "right"]



In [73]:
import os

directory_path = 'images_by_side/test/right'

file_names = os.listdir(directory_path)

file_names = [f for f in file_names if os.path.isfile(os.path.join(directory_path, f))]

class_names = ["left", "right"]
count = 0
# print(file_names)
for file_name in file_names:
    image_path = os.path.join(directory_path, file_name)
    predicted_class = visualize_prediction(image_path, model_ft, val_augs, class_names)
    if predicted_class == "left":
        count += 1
        # print(file_name)
        # visualize_predictionWithImage(image_path, model_ft, val_augs, class_names)
print("Incorrectly predicted right images: ", count)
print(f"{count/len(file_names)*100:.2f}%")


Incorrectly predicted right images:  9
18.00%


In [74]:
import os

directory_path = 'images_by_side/test/left'

file_names = os.listdir(directory_path)

file_names = [f for f in file_names if os.path.isfile(os.path.join(directory_path, f))]

class_names = ["left", "right"]
count = 0
# print(file_names)
for file_name in file_names:
    image_path = os.path.join(directory_path, file_name)
    predicted_class = visualize_prediction(image_path, model_ft, val_augs, class_names)
    if predicted_class == "right":
        count += 1
        # print(file_name)
        # visualize_predictionWithImage(image_path, model_ft, val_augs, class_names)
print("Incorrectly predicted left images: ", count)
print(f"{count/len(file_names)*100:.2f}%")


Incorrectly predicted left images:  5
8.62%


In [24]:
print(len(file_names))

50


In [1]:
import sys
print(sys.version)

3.12.3 (tags/v3.12.3:f6650f9, Apr  9 2024, 14:05:25) [MSC v.1938 64 bit (AMD64)]
