In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from torch.utils.tensorboard import SummaryWriter
import lightning as L
import augmentation
import torchmetrics
from torchmetrics import metric
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
# Timm 

import os
from torchvision.io import read_image
from torchvision.transforms import transforms
from lightning.pytorch.loggers import TensorBoardLogger

In [3]:
class pre_process_data(Dataset):
    def __init__(self, images_dir, img_transforms=None, target_transforms=None):
        self.images_dir = images_dir
        self.img_transforms = img_transforms
        self.target_transforms = target_transforms
        self.img_labels = os.listdir(self.images_dir)
        
       
    def __len__(self):
        return len(self.img_labels)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.images_dir, self.img_labels[idx])
        image = read_image(img_path)
        image = image.float()
        image = image / 255.0
        label = self.calculate_label(self.img_labels[idx])
        #print(self.img_labels[idx])
       
        if self.img_transforms:
            image = self.img_transforms(image)
        if self.target_transforms:
            label = self.target_transforms(label)
        
        return image, label
        
    def calculate_mean_std(self, dataloader):
        mean = 0.0
        std = 0.0
        total_images_count = 0

        for images, _ in dataloader:
            batch_samples = images.size(0)  # Number of images in the batch
            total_images_count += batch_samples
            
            images = images.view(batch_samples, images.size(1), -1)  # Flatten the image pixels
            mean += images.float().mean(2).sum(0)
            std += images.float().std(2).sum(0)

        mean /= total_images_count
        std /= total_images_count

        return mean.item(), std.item()

    
    def calculate_label(self, img_basename):
        tmp = img_basename[:len(img_basename)-4].split("_")
        row = tmp[2]
        rod_id = int(tmp[3])
        if row in {"A", "B", "E", "F", "I", "J"}:
            label = 4-(rod_id % 4)
        else:
            label = (rod_id % 4)+1
        return label

In [None]:
dataset_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    #transforms.ToTensor(),
])

In [None]:
dataset =pre_process_data(images_dir="QR_d_best",img_transforms=dataset_transform)

In [None]:
dataloader = DataLoader(dataset, batch_size=32, num_workers=4)

In [None]:
mean, std = dataset.calculate_mean_std(dataloader)
print(mean)
print(std)
mean = round(mean, 3)
std = round(std, 3)
print(mean)
print(std)

In [4]:
train_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    #transforms.ToTensor(),
    transforms.RandomHorizontalFlip(),  # Randomly flip images horizontally
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.Normalize(mean=[0.5], std = [0.5])
])

test_transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

def target_transform(label):
    one_hot = torch.zeros((4))
    one_hot[label - 1] = 1.0
    return one_hot

In [5]:
class light_pre_process_data(L.LightningDataModule):
    def __init__(self, img_dir, train_transforms=None, val_transforms=None, target_transforms=None,batch_size=32):
        super().__init__()
        self.img_dir = img_dir
        self.train_transforms= train_transforms
        self.val_transforms = val_transforms
        self.target_transforms = target_transforms
        self.batch_size = batch_size
        
    def setup(self, stage: str):
        if stage == 'fit':
            self.full_dataset = pre_process_data(self.img_dir)
            self.train_size = int( 0.9 * len(self.full_dataset))
            self.val_size = len(self.full_dataset) - self.train_size
            self.train_dataset, self.val_dataset = random_split(self.full_dataset, [self.train_size, self.val_size],torch.Generator().manual_seed(50))
            self.train_dataset.dataset.img_transforms = self.train_transforms
            self.train_dataset.dataset.target_transforms = self.target_transforms
            self.val_dataset.dataset.img_transforms = self.val_transforms
            self.val_dataset.dataset.target_transforms = self.target_transforms
            
    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=4)
    
    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=4)
            
        

In [6]:
dm = light_pre_process_data("QR_d_best", train_transforms=train_transform, val_transforms=test_transform,
                            target_transforms=target_transform, batch_size=32)

In [2]:
train_size, test_size, train_loader, test_loader = augmentation.dataloader_preparation()

In [8]:
class Net(L.LightningModule):
    def __init__(self):
        super().__init__()
        self.save_hyperparameters()
        self.conv1 = nn.Conv2d(1, 32, 7)
        self.conv2 = nn.Conv2d(32, 64, 5)
        self.conv3 = nn.Conv2d(64, 128, 5)
        self.conv4 = nn.Conv2d(128, 256, 3)
        
        x = torch.randn((1, 64, 64))
        self._to_linear = None
        
        self.convs(x)
        
        self.fc1 = nn.Linear(self._to_linear, 256)
        self.fc2 = nn.Linear(256, 4)
        
        self.loss_function = nn.CrossEntropyLoss()
        self.accuracy = torchmetrics.Accuracy('multiclass', num_classes=4)
        self.f1_score = torchmetrics.F1Score('multiclass', num_classes=4)
        
    def convs(self, x):
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        x = F.max_pool2d(F.relu(self.conv2(x)), (2, 2))
        x = F.max_pool2d(F.relu(self.conv3(x)), (2, 2))
        x = F.max_pool2d(F.relu(self.conv4(x)), (2, 2))
        if self._to_linear is None:
            self._to_linear = x.shape[0] * x.shape[1] * x.shape[2]
            
        return x
    
    def forward(self, x):
        x = self.convs(x)
        x = x.view(-1, self._to_linear)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        
        return x
    
    def _common_step(self, batch, batch_idx):
        x, y = batch
        x = x.view(x.size(0), 1, 64, 64)
        output = self.forward(x)
        loss = self.loss_function(output, y)
        preds = torch.argmax(output, dim=1)
        return loss, preds, y
    
    def training_step(self, batch, batch_idx):
        loss, preds, y = self._common_step(batch, batch_idx)
        y = torch.argmax(y, dim=1)
        train_accuracy = self.accuracy(preds, y)
        train_f1_score = self.f1_score(preds, y)
        self.log_dict({'train_loss': loss, 'train_accuracy': train_accuracy, 'train_f1_score': train_f1_score},
                      prog_bar=True, on_step=False, on_epoch=True)
        return loss
    
    def validation_step(self, batch, batch_idx):
        loss, preds, y = self._common_step(batch, batch_idx)
        y = torch.argmax(y, dim=1)
        val_accuracy = self.accuracy(preds, y)
        val_f1_score = self.f1_score(preds, y)
        self.log_dict({"val_loss": loss, "val_accuracy": val_accuracy, "val_f1_score": val_f1_score},
                      prog_bar=True, on_epoch=True, on_step=False)
        return loss
    
    def test_step(self, batch, batch_idx):
        loss, scores, y = self._common_step(batch, batch_idx)
        self.log('test_loss', loss)
        return loss
    
    def predict_step(self, batch, batch_idx):
        x, y = batch
        x = x.view(x.size(0), 1, 64, 64)
        scores = self.forward(x)
        preds = torch.argmax(scores, dim=1)
        return preds
    
    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=0.001)
        

In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [10]:
num_recognition = Net().to(device)

In [12]:
L.seed_everything(50)

Seed set to 50


50

In [13]:
early_stop_callback = EarlyStopping(monitor='val_loss', min_delta=0.008, mode="min", patience=9, verbose=True)

In [14]:
trainer = L.Trainer(accelerator="gpu", devices=[0], 
                    precision=16, limit_predict_batches=32, 
                    min_epochs=1, max_epochs=30, deterministic=True, callbacks=[early_stop_callback], default_root_dir=".")
# trainer.fit(model = num_recognition, train_dataloaders=train_loader, val_dataloaders=test_loader)
trainer.fit(model = num_recognition, datamodule=dm)
# trainer.test(model = num_recognition, dataloaders=train_loader)
trainer.validate(model = num_recognition, datamodule=dm)

/home/parsa/.local/lib/python3.10/site-packages/lightning/fabric/connector.py:571: `precision=16` is supported for historical reasons but its usage is discouraged. Please set your precision to 16-mixed instead!
Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
You are using a CUDA device ('NVIDIA GeForce RTX 3060 Laptop GPU') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32_matmul_precision


FileNotFoundError: [Errno 2] No such file or directory: 'QR_d_best'

In [11]:
model = Net().load_from_checkpoint(".")

TypeError: The classmethod `Net.load_from_checkpoint` cannot be called on an instance. Please call it on the class type and make sure the return value is used.