In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data.sampler import SubsetRandomSampler
import torchvision.transforms as transforms
import time
import os
import numpy as np

import torchvision
from torchvision import datasets, models
import matplotlib.pyplot as plt

In [None]:
import torch
import torch.nn as nn

class CustomCNN(nn.Module):
    def __init__(self, num_classes=6):
        super(CustomCNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(256 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, 256)

    def forward(self, x):
        x = self.pool(self.conv1(x))
        x = self.pool(self.conv2(x))
        x = self.pool(self.conv3(x))
        x = x.view(x.size(0), -1)  # Flattening the tensor
        x = self.fc1(x)
        x = self.fc2(x)
        return x


In [None]:
from torchvision import models

class ModifiedResNet(nn.Module):
    def __init__(self, num_classes=6):
        super(ModifiedResNet, self).__init__()
        self.resnet = models.resnet50(pretrained=True)
        in_features = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(in_features, 256)
    def forward(self, x):
        x = self.resnet(x)
        return x


In [None]:
#combining both networks
class CombinedModel(nn.Module):
    def __init__(self, num_classes=6):
        super(CombinedModel, self).__init__()
        self.custom_cnn = CustomCNN(num_classes)
        self.modified_resnet = ModifiedResNet(num_classes)
        self.fc = nn.Linear(256 * 2, num_classes)

    def forward(self, x):
        custom_out = self.custom_cnn(x)
        resnet_out = self.modified_resnet(x)
        combined_out = torch.cat((custom_out, resnet_out), dim=1)
        out = self.fc(combined_out)
        return out


In [None]:
import time
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Check for GPU availability and set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#functions below are the ones used in lab2/lab3 of APS360
def evaluate(net, loader, criterion):
    net.eval()
    total_loss = 0.0
    total_err = 0.0
    total_correct = 0
    total_samples = 0
    with torch.no_grad():
        for i, data in enumerate(loader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            _, predicted = torch.max(outputs, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
            total_loss += loss.item()
            total_err += (predicted != labels).sum().item()
    accuracy = total_correct / total_samples
    err = float(total_err) / total_samples
    loss = float(total_loss) / (i + 1)
    return accuracy, err, loss


def loader(train_images, val_images, test_images, batch_size=64):
    train_loader = torch.utils.data.DataLoader(train_images, batch_size=batch_size, num_workers=2, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_images, batch_size=batch_size, num_workers=2, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_images, batch_size=batch_size, num_workers=2, shuffle=True)
    return train_loader, val_loader, test_loader

def train(model, train_images, val_images, test_images, batch_size=64, lr=0.001, num_epochs=30):
    torch.manual_seed(10)

    # Move the model to GPU
    model.to(device)

    train_loader, val_loader, test_loader = loader(train_images, val_images, test_images, batch_size)
    criterion = nn.CrossEntropyLoss()
    #optimizer = optim.Adam(model.parameters(), lr=lr)
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)

    train_err = np.zeros(num_epochs)
    train_loss = np.zeros(num_epochs)
    train_acc = np.zeros(num_epochs)
    val_err = np.zeros(num_epochs)
    val_loss = np.zeros(num_epochs)
    val_acc = np.zeros(num_epochs)

    start_time = time.time()
    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0.0
        total_train_err = 0.0
        total_train_correct = 0
        total_train_samples = 0
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            labels = labels.long()

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, predicted = torch.max(outputs, 1)
            total_train_correct += (predicted == labels).sum().item()
            total_train_samples += labels.size(0)
            total_train_loss += loss.item()
            total_train_err += (predicted != labels).sum().item()

        train_acc[epoch] = total_train_correct / total_train_samples
        train_err[epoch] = float(total_train_err) / total_train_samples
        train_loss[epoch] = float(total_train_loss) / (i + 1)
        val_acc[epoch], val_err[epoch], val_loss[epoch] = evaluate(model, val_loader, criterion)

        print(f"Epoch {epoch + 1}: Train Acc: {train_acc[epoch]:.4f}, Train err: {train_err[epoch]:.4f}, Train loss: {train_loss[epoch]:.4f} | Validation Acc: {val_acc[epoch]:.4f}, Validation err: {val_err[epoch]:.4f}, Validation loss: {val_loss[epoch]:.4f}")

        # Save the model every few epochs instead of every epoch
        if (epoch + 1) % 5 == 0 or (epoch + 1) == num_epochs:
            model_path = f"model_{model.__class__.__name__}_bs{batch_size}_lr{lr}_epoch{epoch + 1}.pth"
            torch.save(model.state_dict(), model_path)

    print('Finished Training')
    elapsed_time = time.time() - start_time
    print(f"Total time elapsed: {elapsed_time:.2f} seconds")

    epochs = np.arange(1, num_epochs + 1)
    np.savetxt(f"{model.__class__.__name__}_train_acc.csv", train_acc)
    np.savetxt(f"{model.__class__.__name__}_train_err.csv", train_err)
    np.savetxt(f"{model.__class__.__name__}_train_loss.csv", train_loss)
    np.savetxt(f"{model.__class__.__name__}_val_acc.csv", val_acc)
    np.savetxt(f"{model.__class__.__name__}_val_err.csv", val_err)
    np.savetxt(f"{model.__class__.__name__}_val_loss.csv", val_loss)


In [None]:
###########      Plotting data from lab2        #############

def plot_training_curve(path):
    import matplotlib.pyplot as plt
    train_err = np.loadtxt("{}_train_err.csv".format(path))
    val_err = np.loadtxt("{}_val_err.csv".format(path))
    train_loss = np.loadtxt("{}_train_loss.csv".format(path))
    val_loss = np.loadtxt("{}_val_loss.csv".format(path))
    plt.title("Train vs Validation Error")
    n = len(train_err) # number of epochs
    plt.plot(range(1,n+1), train_err, label="Train")
    plt.plot(range(1,n+1), val_err, label="Validation")
    plt.xlabel("Epoch")
    plt.ylabel("Error")
    plt.legend(loc='best')
    plt.show()
    plt.title("Train vs Validation Loss")
    plt.plot(range(1,n+1), train_loss, label="Train")
    plt.plot(range(1,n+1), val_loss, label="Validation")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend(loc='best')
    plt.show()


In [None]:
from torchvision import transforms, datasets

transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_images = datasets.ImageFolder('/content/drive/MyDrive/aps360/Processed Data/training_database/corn', transform=transform)
val_images = datasets.ImageFolder('/content/drive/MyDrive/aps360/Processed Data/validation_database/corn', transform=transform)
test_images = datasets.ImageFolder('/content/drive/MyDrive/aps360/Processed Data/testing_database/corn', transform=transform)


In [None]:

model = CustomCNN(num_classes = 6)
train(model, train_images,val_images, test_images,batch_size=64, lr=0.001, num_epochs=20)

Epoch 1: Train Acc: 0.5641, Train err: 0.4359, Train loss: 1.5897 | Validation Acc: 0.7262, Validation err: 0.2738, Validation loss: 0.8364
Epoch 2: Train Acc: 0.7374, Train err: 0.2626, Train loss: 0.7855 | Validation Acc: 0.7119, Validation err: 0.2881, Validation loss: 0.6627
Epoch 3: Train Acc: 0.7599, Train err: 0.2401, Train loss: 0.6380 | Validation Acc: 0.8190, Validation err: 0.1810, Validation loss: 0.4723
Epoch 4: Train Acc: 0.8349, Train err: 0.1651, Train loss: 0.4641 | Validation Acc: 0.8071, Validation err: 0.1929, Validation loss: 0.4840
Epoch 5: Train Acc: 0.8615, Train err: 0.1385, Train loss: 0.3893 | Validation Acc: 0.8405, Validation err: 0.1595, Validation loss: 0.4401
Epoch 6: Train Acc: 0.8718, Train err: 0.1282, Train loss: 0.3574 | Validation Acc: 0.8190, Validation err: 0.1810, Validation loss: 0.4675
Epoch 7: Train Acc: 0.8813, Train err: 0.1187, Train loss: 0.3053 | Validation Acc: 0.8143, Validation err: 0.1857, Validation loss: 0.4866
Epoch 8: Train Acc: 