In [1]:
import torch
from torch import nn

torch.manual_seed(42)

device = "cuda" if torch.cuda.is_available() else "cpu"
print(device)

cuda


## Step 1 : Restructuring OS

In [2]:
import os

def restructure(dir, num_classes):
    for i in range(1, num_classes+1):
        dir_to_create = f".\\{dir}\\flower{str(i)}"
        
        if not os.path.isdir(dir_to_create):
            os.mkdir(dir_to_create)

    cwd = os.path.abspath(dir)

    for file in os.listdir(dir):
        if file.endswith(".jpg"):
            os.replace(f"{cwd}\\{file}", f"{cwd}\\flower{file.split("_")[0]}\\{file}")

restructure("train_data", 60)
restructure("val_data", 60)

## Step 2 : Load Dataset into torch Loader

In [3]:
import torch.utils.data
from torchvision import datasets, transforms
from sklearn.decomposition import PCA

transform = transforms.Compose([
    transforms.Resize(64),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    transforms.Lambda(lambda x: torch.flatten(x))
])

train_fds = datasets.ImageFolder("train_data", transform=transform)
val_fds = datasets.ImageFolder("val_data", transform=transform)

train_fdl = torch.utils.data.DataLoader(train_fds, batch_size=len(train_fds))
val_fdl = torch.utils.data.DataLoader(val_fds, batch_size=len(val_fds))

temp_train = next(iter(train_fdl))
X_train, y_train = temp_train[0].numpy(), temp_train[1]
temp_val = next(iter(val_fdl))
X_val, y_val = temp_val[0].numpy(), temp_val[1]

n_components = 50
pca = PCA(n_components=n_components)
X_train = torch.tensor(pca.fit_transform(X_train))
X_val = torch.tensor(pca.transform(X_val))

batch_size = 256
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_dataloader = torch.utils.data.DataLoader(train_dataset, shuffle=True, batch_size=batch_size)

val_dataset = torch.utils.data.TensorDataset(X_val, y_val)
val_dataloader = torch.utils.data.DataLoader(val_dataset, shuffle=True, batch_size=batch_size)

## Step 3 : Define Model

In [4]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(n_components, 100),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(100, 100),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(100, 60),
        )

    def forward(self, X):
        logits = self.linear_relu_stack(X)
        return logits
    
model = Model().to(device)
print(model)

Model(
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=50, out_features=100, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.2, inplace=False)
    (3): Linear(in_features=100, out_features=100, bias=True)
    (4): ReLU()
    (5): Dropout(p=0.2, inplace=False)
    (6): Linear(in_features=100, out_features=60, bias=True)
  )
)


In [5]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X = X.to(device)
        one_hot_y = torch.eye(60)[y].to(device)
        y = y.to(device)

        pred = model(X)
        loss = loss_fn(pred, one_hot_y)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        loss, current = loss.item(), batch * batch_size + len(X)
        # print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            X = X.to(device)
            one_hot_y = torch.eye(60)[y].to(device)
            y = y.to(device)
            
            pred = model(X)
            test_loss += loss_fn(pred, one_hot_y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

## Step 4 : Execution

In [6]:
import torch.optim

learning_rate = 1e-3
epochs = 50

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_dataloader, model, loss_fn, optimizer)
    test_loop(val_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
Test Error: 
 Accuracy: 6.7%, Avg loss: 3.991464 

Epoch 2
-------------------------------
Test Error: 
 Accuracy: 12.2%, Avg loss: 3.731024 

Epoch 3
-------------------------------
Test Error: 
 Accuracy: 14.2%, Avg loss: 3.551214 

Epoch 4
-------------------------------
Test Error: 
 Accuracy: 16.7%, Avg loss: 3.326039 

Epoch 5
-------------------------------
Test Error: 
 Accuracy: 19.5%, Avg loss: 3.143927 

Epoch 6
-------------------------------
Test Error: 
 Accuracy: 23.3%, Avg loss: 3.005151 

Epoch 7
-------------------------------
Test Error: 
 Accuracy: 25.2%, Avg loss: 2.935950 

Epoch 8
-------------------------------
Test Error: 
 Accuracy: 26.5%, Avg loss: 2.846713 

Epoch 9
-------------------------------
Test Error: 
 Accuracy: 27.8%, Avg loss: 2.767873 

Epoch 10
-------------------------------
Test Error: 
 Accuracy: 29.3%, Avg loss: 2.708366 

Epoch 11
-------------------------------
Test Error: 
 Accuracy: 29.2%, Avg loss