# Cat vs Dog recognition using transfer learning

In [None]:
import torch
from torch import nn, optim
from torch.nn import functional as F 
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset

from torchvision.datasets import ImageFolder
from torchvision import transforms
from torchvision.models.vgg import vgg11_bn, VGG11_BN_Weights

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [None]:
# Transoforming the data for VGG network
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [None]:
# Move files into appropriate class folders
!powershell "mkdir catvsdog/test1/dog *>$null"
!powershell "mkdir catvsdog/test1/cat *>$null"
!powershell "mkdir catvsdog/train/dog *>$null"
!powershell "mkdir catvsdog/train/cat *>$null"

In [None]:
# Move the files into the class folders
!powershell "mv catvsdog/train/cat.* catvsdog/train/cat"
!powershell "mv catvsdog/train/dog.* catvsdog/train/dog"

In [None]:
# Create the datasets
dataset = ImageFolder(
    root="catvsdog/train/",
    transform=transform,
)

In [None]:
train_data, test_data = train_test_split(dataset, test_size=0.2)

In [None]:
# Create dataloader
train_loader = DataLoader(train_data, 32, shuffle=True)
test_loader = DataLoader(test_data, 32)

### Define the model

In [None]:
vgg = vgg11_bn(weights=VGG11_BN_Weights.IMAGENET1K_V1, progress=False)

In [5]:
# Wrapper class to precompute the features vectors after passing the iamges into vgg
class VGGFeatures(nn.Module):
    def __init__(self, vgg_model):
        super(VGGFeatures, self).__init__()
        self.vgg = vgg_model
    
    def foward(self, X):
        out = self.vgg(X)
        out = self.vgg.avgpool(out)
        out = out.view(out.size(0), -1)
        return out

In [None]:
vggf = VGGFeatures(vgg)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
vggf.to(device)

In [None]:
# Compute size for the binary classification head
N_train = len(train_data)
N_test = len(test_data)
D = vggf(torch.randn((1, 3, 224, 224))).shape[1]

In [None]:
# Precompute the feature vectors
X_train, y_train = torch.zeros((N_train, D)), torch.zeros((N_train, 1))
X_test, y_test = torch.zeros((N_test, D)), torch.zeros((N_test, 1))

In [None]:
i = 0
with torch.no_grad():
    for inputs, targets in train_loader:
        inputs = inputs.to(device)
        out = vggf(inputs)
        sz = len(out)
        # Put the features extracted to X_train, y_train
        X_train[i: i + sz] = out.cpu().detach().numpy()
        y_train[i: i + sz] = targets.view(-1, 1).numpy()
        
        i += sz

In [None]:
i = 0
with torch.no_grad():
    for inputs, targets in test_loader:
        inputs = inputs.to(device)
        out = vggf(inputs)
        sz = len(out)
        # Put the features extracted to X_train, y_train
        X_test[i: i + sz] = out.cpu().detach().numpy()
        y_test[i: i + sz] = targets.view(-1, 1).numpy()
        
        i += sz

In [None]:
# Standardize the feature vectors
scaler =  StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

## Build the linear head

In [None]:
class BinaryClassifier(nn.Module):
    def __init__(self, num_ins, num_outs, num_hiddens, drop_out):
        super(BinaryClassifier, self).__init__()
        self.fc1 = nn.Linear(num_ins, num_hiddens)
        self.fc2 = nn.Linear(num_hiddens, num_outs)
    
    def forward(self, X):
        out = self.fc1(X)
        out = F.relu(X)
        out = F.dropout(X)
        out = self.fc2(X)
        return out

In [7]:
def configure(model, device, lr):
    model.to(device)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    return criterion, optimizer

In [6]:
def batch_gd(model: nn.Module, criterion: nn.Module, optimizer: optim.Optimizer, device: torch.device,
                train_loader: DataLoader, test_loader: DataLoader, num_epochs: int, record=False):
    assert 0 <= num_epochs

    # Record the loss over time
    loss_history = np.zeros(num_epochs)
    test_history = np.zeros(num_epochs)

    for epoch in range(num_epochs):
        # Train mode
        model.train()

        # Loss over the same epoch
        loss_epoch = []
        for inputs, targets in train_loader:
            # Zero the gradient
            optimizer.zero_grad()

            # Move data to device
            inputs, targets = inputs.to(device), targets.to(device)

            # Forward pass
            outs = model(inputs)
            loss = criterion(outs, targets)

            # Record the train loss
            loss_epoch.append(loss.item())

            # Backward pass
            loss.backward()
            optimizer.step()

        # Average loss after 1 epoch
        loss_history[epoch] = np.mean(loss_epoch)

        # Eval mode
        model.eval()
        test_epoch = []
        for inputs, targets in test_loader:
            # Zero the gradient
            optimizer.zero_grad()

            inputs, targets = inputs.to(device), targets.to(device)

            # Forward pass
            outs = model(inputs)
            test_loss = criterion(outs, targets)

            # Record test loss
            test_epoch.append(test_loss.item())

        # Average loss
        test_history[epoch] = np.mean(test_epoch)

    if record:
        print(f"Epoch {epoch}/{num_epochs}: Train loss={loss_history[epoch]}; Test loss={test_history[epoch]}")

    return loss_history, test_history

## Train the model

In [None]:
# Hyperparameters
lr = 0.01
num_epochs = 20
num_hiddens = 32768
drop_out = 0.5

In [None]:
# Initiate the model and configure
model = BinaryClassifier(D, 1, num_hiddens, drop_out)
criterion, optimizer = configure(model, device, lr)

In [None]:
# Data loaders for features dataset
train_feat_loader = DataLoader(TensorDataset(
    torch.from_numpy(X_train.astype(np.float32)),
    torch.from_numpy(y_train.astype(np.float32))
))
test_feat_loader = DataLoader(TensorDataset(
    torch.from_numpy(X_test.astype(np.float32)),
    torch.from_numpy(y_test.astype(np.float32))
))

In [None]:
train_his, test_his = batch_gd(model, criterion, optimizer, device, train_feat_loader,
                                test_feat_loader, num_epochs)

## Evaluate

In [None]:
# Plot the loss
plt.tilte("Loss over time")
plt.xlabel("epoch")
plt.ylabel("loss")
plt.plot(train_his, label="train loss")
plt.plot(test_his, label="test loss")
plt.legend()
plt.show()

In [None]:
# Compute the accuracy
def compute_acc(model, device, data_loader):
    n_correct, n_total = 0, 0
    model.eval()
    for inputs, targets in data_loader:
        inputs = inputs.to(device)
        outs = model(inputs)
        preds = (outs > 0)
        n_correct += (preds == targets).sum().item()
        n_total += targets.size(0)
    
    return n_correct / n_total

In [None]:
# Compute train and test accuracy
f"Train Acc: {compute_acc(model, device,train_feat_loader)}, \
Test Acc: {compute_acc(model, device, test_feat_loader)}"