In [1]:
import os
import glob
import random

import torch
import torchvision
import numpy as np
import cv2
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from torch import nn
from torchvision.transforms import v2
import torch.nn.functional as F

from IPython.display import Image

In [2]:
print(torch.cuda.is_available())
device = torch.device("cuda")

True


In [3]:
def encode(a):
    idx = {}
    for ai in a:
        if ai not in idx:
            idx[ai] = len(idx)
    for i in range(len(a)):
        a[i] = idx[a[i]]
    del idx
    return a

In [4]:
DATA_PATH = r'C:\Users\galah\Desktop\nn\archive\Training\Training'
DATA_PATH_VAL = r'C:\Users\galah\Desktop\nn\archive\Validation\Validation'


class SmokingDataset(Dataset):

    def __init__(self, rootdir, transform) -> None:
        super().__init__()
        self.img_names = os.listdir(rootdir)
        random.shuffle(self.img_names)
        self.transform = transform
        self.img_paths = []
        self.img_labels = []

        for img in self.img_names:
            path = os.path.join(rootdir, img)
            label = img.split('_')[0]
            self.img_paths.append(path)
            self.img_labels.append(label)

        self.img_labels = encode(self.img_labels)

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, index) -> tuple:
        img_path = self.img_paths[index]
        image = cv2.imread(img_path)
        if self.transform:
            image = self.transform(image)

        label = self.img_labels[index]

        return (image, label)

In [5]:
transform = v2.Compose(
    [
        v2.ToImage(),
        v2.ToDtype(torch.float32, scale=True),
        v2.Lambda(lambda x: torch.flatten(x))
    ]
)

In [6]:
dataset = SmokingDataset(DATA_PATH, transform)
dataset_val = SmokingDataset(DATA_PATH_VAL, transform)
train_loader = DataLoader(dataset, batch_size=64)
val_loader = DataLoader(dataset_val, batch_size=180)

In [7]:
start_size = dataset[0][0].shape[0]

In [15]:
class SmokingClassifier(nn.Module):
    def __init__(self):
        super(SmokingClassifier,self).__init__()
        self.flatten = nn.Flatten()
        self.lin1 = nn.Linear(start_size, 400)
        self.lin2 = nn.Linear(400, 150)
        self.lin3 = nn.Linear(150, 50)
        self.lin4 = nn.Linear(50, 1)

    def forward(self,x):
        x = self.flatten(x)
        x = self.lin1(x)
        x = F.relu(x)
        x = self.lin2(x)
        x = F.relu(x)
        x = self.lin3(x)
        x = F.relu(x)
        x = self.lin4(x)
        x = F.sigmoid(x)
        return x
        

In [16]:
model = SmokingClassifier()
model.to(device)
loss_function = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001,)

In [17]:
def acc(y_pred, y_val):
    eq = torch.eq(torch.Tensor(y_pred).to(device), y_val)
    acc = eq.sum()/y_val.shape[0]
    return acc

In [18]:
def validate(dataloader_val, model):
    for step, (x_val, y_val) in enumerate(dataloader_val):
        (x_val, y_val) = (x_val.to(device), y_val.to(device))
        y_val = y_val.type(torch.float32)
        y_pred = model.forward(x_val)
        y_pred = [1 if z > 0.5 else 0 for z in y_pred]
        return y_val.to(device), y_pred

In [19]:
def train(dataloader, dataloader_val, model, loss_fn, op_fn, epoch):
    for ep in range(epoch):
        for step, (x, y) in enumerate(dataloader):
            y = y.type(torch.float32)
            (x, y) = (x.to(device), y.to(device))
            y = torch.reshape(y,(y.shape[0],1))

            y_pred = model.forward(x)
            loss = loss_fn(y_pred.to(device), y.to(device))
            op_fn.zero_grad()
            loss.backward()
            op_fn.step()
            
            if step%100 == 0:
                loss, current = loss.item(), (ep+1)*(step+1)*16
                print(f"loss = {loss}, samples = {current}")
                
        y_val, y_pred = validate(dataloader_val, model)
        print(f"Epoch: {ep}, acc: {acc(y_pred,y_val)}")

In [21]:
train(train_loader, val_loader, model, loss_function, optimizer, 100)

loss = 0.4519258737564087, samples = 16
Epoch: 0, acc: 0.6722222566604614
loss = 0.4649658203125, samples = 32
Epoch: 1, acc: 0.6722222566604614
loss = 0.6705707907676697, samples = 48
Epoch: 2, acc: 0.6055555939674377
loss = 0.46167078614234924, samples = 64
Epoch: 3, acc: 0.6222222447395325
loss = 0.6197507381439209, samples = 80
Epoch: 4, acc: 0.550000011920929
loss = 0.7810415029525757, samples = 96
Epoch: 5, acc: 0.6500000357627869
loss = 0.6202262043952942, samples = 112
Epoch: 6, acc: 0.5944444537162781
loss = 0.6337360143661499, samples = 128
Epoch: 7, acc: 0.6111111044883728
loss = 0.6799682378768921, samples = 144
Epoch: 8, acc: 0.6166666746139526
loss = 0.622219979763031, samples = 160
Epoch: 9, acc: 0.6611111164093018
loss = 0.5003831386566162, samples = 176
Epoch: 10, acc: 0.6555555462837219
loss = 0.6414332389831543, samples = 192
Epoch: 11, acc: 0.6611111164093018
loss = 0.559099018573761, samples = 208
Epoch: 12, acc: 0.6777777671813965
loss = 0.5411829948425293, sample

KeyboardInterrupt: 

In [14]:
model.forward(dataset[0][0].to(device))

IndexError: Dimension out of range (expected to be in range of [-1, 0], but got 1)