In [None]:
%matplotlib inline

In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor, Resize

In [None]:
from torch.utils.data import random_split, Subset

In [None]:
import matplotlib.pyplot as plt
import numpy as np

In [None]:
import os
import pandas as pd
from torchvision.io import read_image

class CustomImageDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, target_transform=None):
        self.img_labels = pd.read_csv(annotations_file, header=None)
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 1]
        image = image.float()
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label
        

In [None]:
def train_test_split(all_data, test_portion = 0.3, seed = 0):
  training_size = int((1 - test_portion) * len(all_data))
  test_size = len(all_data) - training_size
  training_indices, test_indices = random_split(
                              range(len(all_data)), 
                              [training_size, test_size],
                              generator=torch.Generator().manual_seed(seed))
  training_data = Subset(all_data, training_indices)
  test_data = Subset(all_data, test_indices)
  return training_data, test_data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Download training data from open datasets.

all_data = CustomImageDataset(
    annotations_file="/content/drive/MyDrive/Ai builders/annotations_file.csv", 
    img_dir="/content/drive/MyDrive/Ai builders/pig pics cleaned", 
    transform=Resize(size=128), 
    target_transform=None
)

training_data, test_data = train_test_split(all_data)

print("All data size: " + str(len(all_data)))
print("Training size: " + str(len(training_data)) + "\tTest size: " + str(len(test_data)))

All data size: 511
Training size: 357	Test size: 154


In [None]:
batch_size = 64

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X [N, C, H, W]: torch.Size([64, 3, 128, 128])
Shape of y: torch.Size([64]) torch.int64


In [None]:
# Define convoluational NN model

#################################
# Input 
# 128 x 128 x 3

# A convolutional layer with 32 filters of size 5x5
# 124 x 124 x 32

# A ReLU nonlinearity

# A max pooling layer with size 3x3 (ceil_mode=False)
# 41 x 41 x 32

# A convolutional layer with 64 filters of size 5x5
# 37 x 37 x 64

# A ReLU nonlinearity

# A max pooling layer with size 3x3 (ceil_mode=False)
# 12 x 12 x 64

# A flatten layer
# 9216 x 1

# A fully connected layer with 128 neurons

# A dropout layer with drop probability 0.5

# A fully-connected layer with 2 neuron
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
              nn.Conv2d(3, 32, (5, 5)),
              nn.ReLU(),
              nn.MaxPool2d((3, 3)),
              nn.Conv2d(32, 64, (5, 5)),
              nn.ReLU(),
              nn.MaxPool2d((3, 3)),
              nn.Flatten(),
              nn.Linear(9216, 128),
              nn.Dropout(0.5),
              nn.Linear(128, 2)
            )

    def forward(self, x):
        # x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits

model = NeuralNetwork().to(device)
print(model)

Using cpu device
NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Conv2d(3, 32, kernel_size=(5, 5), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=(3, 3), stride=(3, 3), padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=(3, 3), stride=(3, 3), padding=0, dilation=1, ceil_mode=False)
    (6): Flatten(start_dim=1, end_dim=-1)
    (7): Linear(in_features=9216, out_features=128, bias=True)
    (8): Dropout(p=0.5, inplace=False)
    (9): Linear(in_features=128, out_features=2, bias=True)
  )
)


In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

In [None]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [None]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            # print(f"X.Shape: {X.shape}")
            pred = model(X)
            # print(f"pred: {pred}")
            # print(f"pred shape: {pred.shape}")
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 6.867768  [    0/  357]
Test Error: 
 Accuracy: 44.8%, Avg loss: 0.692855 

Epoch 2
-------------------------------
loss: 0.749365  [    0/  357]
Test Error: 
 Accuracy: 45.5%, Avg loss: 0.688125 

Epoch 3
-------------------------------
loss: 0.721584  [    0/  357]
Test Error: 
 Accuracy: 51.3%, Avg loss: 0.686310 

Epoch 4
-------------------------------
loss: 0.695873  [    0/  357]
Test Error: 
 Accuracy: 52.6%, Avg loss: 0.681224 

Epoch 5
-------------------------------
loss: 0.732331  [    0/  357]
Test Error: 
 Accuracy: 67.5%, Avg loss: 0.646953 

Epoch 6
-------------------------------
loss: 0.732001  [    0/  357]
Test Error: 
 Accuracy: 66.9%, Avg loss: 0.658671 

Epoch 7
-------------------------------
loss: 0.665917  [    0/  357]
Test Error: 
 Accuracy: 70.1%, Avg loss: 0.607232 

Epoch 8
-------------------------------
loss: 0.621725  [    0/  357]
Test Error: 
 Accuracy: 64.9%, Avg loss: 0.645143 

Epoch 9
----------------

In [None]:
torch.save(model.state_dict(), "resnet_50_b_model_128.pth")
print("Saved PyTorch Model State to model.pth")

Saved PyTorch Model State to model.pth


In [None]:
from sklearn.metrics import classification_report
def get_incorrect_predictions(model, test_dataloader):
  out1 = []
  out2 = []
  
  for i in range(len(test_data)):
    model.eval()
    x, y = test_data[i][0], test_data[i][1]
    
    # Turn it into 4D tensor [N, C, H, W] just like in dataloader
    x = x.unsqueeze(0)
    
    with torch.no_grad():
      
      pred = model(x)
      predicted, actual = pred[0].argmax(0), y
   
      out1.append(predicted.item())
      out2.append(y)
       
  return out1, out2
y_true,y_pred = get_incorrect_predictions(model,test_dataloader)
print(classification_report(y_true,y_pred))


              precision    recall  f1-score   support

           0       0.74      0.73      0.73       107
           1       0.40      0.40      0.40        47

    accuracy                           0.63       154
   macro avg       0.57      0.57      0.57       154
weighted avg       0.63      0.63      0.63       154

