In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from pathlib import Path
from PIL import Image
from sklearn.model_selection import train_test_split
import torch.nn.functional as F
import random
from sklearn.metrics import f1_score
import numpy as np
from torch.nn.utils import clip_grad_norm_

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

In [None]:
class CustomDataset(Dataset):
    def __init__(self, data_paths, transform=None):
        self.data_paths = data_paths
        self.transform = transform

    def __len__(self):
        return len(self.data_paths)

    def __getitem__(self, idx):
        img_path = self.data_paths[idx]
        image = Image.open(img_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        labels = [int(label) for label in img_path.stem.split('_')[-3:]]

        return image, torch.tensor(labels)

In [None]:
class NeuralNetwork(nn.Module):
    def __init__(self, input_channels, input_size, hidden_size, num_classes):
        super(NeuralNetwork, self).__init__()

        self.conv1 = nn.Conv2d(input_channels, 32, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(32)

        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.upsample = nn.Upsample(scale_factor=2, mode='nearest')

        self.conv3 = nn.Conv2d(96, 128, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        #
        # self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        # self.bn4 = nn.BatchNorm2d(256)

        # self.conv5 = nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1)
        # self.bn5 = nn.BatchNorm2d(512)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.flatten = nn.Flatten()

        # Adjust the input size for the fully connected layer
        self.fc1 = nn.Linear(64 * (input_size // 4) * (input_size // 4), hidden_size)
        # self.fc1 = nn.Linear(98304, hidden_size)
        self.bn_fc1 = nn.BatchNorm1d(hidden_size)

        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, hidden_size)

        self.dropout = nn.Dropout(0.5)
        self.fc5 = nn.Linear(hidden_size, num_classes)

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.bn1(x)
        x = self.pool(x)

        x = F.relu(self.conv2(x))
        x = self.bn2(x)
        x = self.pool(x)

        # x = F.relu(self.conv3(x))
        # x = self.bn3(x)
        # x = self.pool(x)

        # x = F.relu(self.conv4(x))
        # x = self.bn4(x)
        # x = self.pool(x)

        # x = F.relu(self.conv5(x))
        # x = self.bn5(x)
        # x = self.pool(x)

        x = self.flatten(x)

        x = F.relu(self.fc1(x))
        x = self.bn_fc1(x)
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        # x = F.relu(self.fc4(x))

        x = self.dropout(x)
        x = self.fc5(x)

        x = self.softmax(x)

        return x

In [None]:
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
])

# dataset_path = "/content/drive/Othercomputers/My Laptop/objective_2_Crop_classification"\
dataset_path = "../input/objective_2_crop_classification"
data_paths = list(Path(dataset_path).rglob("*.tif"))

# Split the dataset into train and test sets
train_paths, test_paths = train_test_split(data_paths, test_size=0.2, random_state=42)
train_paths, val_paths = train_test_split(train_paths, test_size=0.2, random_state=42)


In [None]:
train_dataset = CustomDataset(train_paths, transform=transform)
val_dataset = CustomDataset(val_paths, transform=transform)
test_dataset = CustomDataset(test_paths, transform=transform)

batch_size = 64
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
input_size = 64
hidden_size = 128
num_classes = 15
input_channels = 3
model = NeuralNetwork(input_channels, input_size, hidden_size, num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# optimizer = torch.optim.RMSprop(model.parameters(), lr=0.001, alpha=0.9)


In [None]:
print("Start training")

num_epochs = 50
test_every_epochs = 1

has_checkpoint = False
clip_value = 0.01

if has_checkpoint:
    checkpoint = torch.load('saved_model.pth')
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    starting_epoch = checkpoint['epoch']
    loss = checkpoint['loss']
else:
    starting_epoch = 0

for epoch in range(starting_epoch, starting_epoch + num_epochs):
    model.train()
    # training_percentage = random.uniform(0.6, 0.8)
    # sampled_indices = random.sample(range(len(train_dataset)), int(training_percentage * len(train_dataset)))
    # sampled_train_loader = torch.utils.data.DataLoader(torch.utils.data.Subset(train_dataset, sampled_indices), batch_size=batch_size, shuffle=True)

    # train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)

    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels[:, -1])
        loss.backward()
        clip_grad_norm_(model.parameters(), clip_value)
        optimizer.step()

    # Perform validation every epoch
    model.eval()
    with torch.no_grad():
        correct_top1 = 0
        correct_top3 = 0
        total = 0
        val_loss = 0
        y_true_val = []
        y_score_val = []

        for inputs, labels in val_loader:
            outputs = model(inputs)
            _, predicted = torch.topk(outputs, k=3, dim=1)

            total += labels.size(0)

            for i in range(labels.size(0)):
                if labels[i, -1] in predicted[i]:
                    correct_top3 += 1
                    if labels[i, -1] == predicted[i, 0]:
                        correct_top1 += 1

            y_true_val.extend(labels[:, -1].cpu().numpy())
            y_score_val.extend(outputs.cpu().numpy())
            val_loss += criterion(outputs, labels[:, -1]).item()

        accuracy_top1 = correct_top1 / total
        accuracy_top3 = correct_top3 / total
        avg_val_loss = val_loss / len(val_loader)
        f1_val = f1_score(y_true_val, np.argmax(y_score_val, axis=1), average='weighted')

        print(f"Epoch {epoch + 1}/{starting_epoch + num_epochs}, "
              f"Validation Top-1 Accuracy: {accuracy_top1 * 100:.2f}%, "
              f"Top-3 Accuracy: {accuracy_top3 * 100:.2f}%, "
              f"Validation F1 Score: {f1_val:.4f}, "
              f"Validation Loss: {avg_val_loss:.4f}")

    if (epoch + 1) % test_every_epochs == 0 or epoch == num_epochs - 1:
        model.eval()
        with torch.no_grad():
            correct_top1 = 0
            correct_top3 = 0
            total = 0
            test_loss = 0

            y_true_test = []
            y_score_test = []
            for inputs, labels in test_loader:
                outputs = model(inputs)
                _, predicted = torch.topk(outputs, k=3, dim=1)

                total += labels.size(0)

                for i in range(labels.size(0)):
                    if labels[i, -1] in predicted[i]:
                        correct_top3 += 1
                        if labels[i, -1] == predicted[i, 0]:
                            correct_top1 += 1

                y_true_test.extend(labels[:, -1].cpu().numpy())
                y_score_test.extend(outputs.cpu().numpy())

                test_loss += criterion(outputs, labels[:, -1]).item()

            accuracy_top1 = correct_top1 / total
            accuracy_top3 = correct_top3 / total
            avg_test_loss = test_loss / len(test_loader)
            f1_val = f1_score(y_true_test, np.argmax(y_score_test, axis=1), average='weighted')

            print(f"Epoch {epoch + 1}/{starting_epoch + num_epochs}, "
                  f"Test Top-1 Accuracy: {accuracy_top1 * 100:.2f}%, "
                  f"Top-3 Accuracy: {accuracy_top3 * 100:.2f}%, "
                  f"Test F1 Score: {f1_val:.4f}, "
                  f"Test Loss: {avg_test_loss:.4f}"
                  )

In [None]:
checkpoint = {
    'epoch': epoch + 1,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': loss.item(),
}

torch.save(checkpoint, 'saved_model.pth')