In [None]:
import os
import wandb
import kagglehub
import numpy as np
from PIL import Image
from tqdm import tqdm
from typing import Any
from matplotlib import pyplot as plt
 
import torch
import torch.nn as nn
import torch.optim as optim
import torch.functional as fun

import torchvision
import torchvision.transforms as transforms

In [None]:
path = kagglehub.dataset_download("kapillondhe/american-sign-language")
path += '\\ASL_Dataset'
print("Path to dataset files:", path)

# EDA

In [None]:
def get_paths(root: str, n: int) -> list[str]:
    paths = list()
    dont_stop = (n == -1)
    labels = os.listdir(root)
    for label in labels:
        for imgname in os.listdir(f'{root}\\{label}'):
            paths.append(f'{root}\\{label}\\{imgname}')
            n -= 1
            if not dont_stop and n < 0:
                break
            
        if not dont_stop and n < 0:
            break
        
    return paths

### Data distribution

In [None]:
def data_balance(root: str) -> dict[str, int]:
    labels = set()
    count_of_labels = dict()
    
    for data_class in os.listdir(root):
        labels.add(data_class)
    
    for label in labels:
        count_of_labels[label] = len(os.listdir(f'{root}\\{label}')) 
    
    return count_of_labels 

In [None]:
count = data_balance(f'{path}\\Train')
fig = plt.figure(figsize=(15, 5))
ax = fig.add_subplot(1, 1, 1)
ax.set_title("Class distribution")
ax.grid(True, 'major', 'y')
ax.bar(list(count.keys()), list(count.values()), align='center', width=0.5)
fig.show()

### Image resolutions

In [None]:
def resolutions(root) -> set[tuple[int, int]]:
    paths = get_paths(root, -1)  
    resolutions = set()
    for imgpath in paths:
        img = Image.open(imgpath)
        resolutions.add(img.size)
        
    return resolutions

In [None]:
print(f'Resolutions in training set: {resolutions(f'{path}\\Train')}')
print(f'Resolutions in evaluation set: {resolutions(f'{path}\\Test')}')

### Corrupted images

In [None]:
def corrupted(root) -> list:
    corrupted = list()
    paths = get_paths(root, 2000)  

    for p in paths:
        try:
            Image.open(p).verify()
        except:
            corrupted.append(p)
    
    return corrupted

In [None]:
train_corrupted = corrupted(f'{path}\\Train')
test_corrupted = corrupted(f'{path}\\Test')

print(f'Count of corrupted images in training set: {len(train_corrupted)} ({train_corrupted})')
print(f'Count of corrupted images in evaluating set: {len(test_corrupted)} ({test_corrupted})')

### Color channels
 

In [None]:
def get_channels(root: str) -> set[str]:
    paths = get_paths(root, 2000)
    channels = set()
    
    for p in paths:
        img = Image.open(p)
        channels.add(img.getbands())
        
    return channels

In [None]:
train_channels = get_channels(f'{path}\\Train')
test_channels = get_channels(f'{path}\\Test')

print(f'Count of color schemas in training set: {len(train_channels)} ({train_channels})')
print(f'Count of color schemas in evaluating set: {len(test_channels)} ({test_channels})')

### Pixel intensity

In [None]:
def get_average_intensities(root: str) -> list[float]:
    paths = get_paths(root, 1000)  
    intensities = list()
    for p in paths:
        img = Image.open(p).convert('L')
        intensities.append(float(np.mean(img)))
    
    return intensities

In [None]:
train_intensities = get_average_intensities(f'{path}\\Train')
test_intensities = get_average_intensities(f'{path}\\Test')

print('Training set')
print(f'Mean: {float(round(np.mean(train_intensities), 3))}') 
print(f'Std: {float(round(np.std(train_intensities), 3))}')
print(f'Range (min - max): {round(min(train_intensities), 3)} - {round(max(train_intensities), 3)}')
print()
print('Evaluating set')
print(f'Mean: {float(round(np.mean(test_intensities), 3))}') 
print(f'Std: {float(round(np.std(test_intensities), 3))}')
print(f'Range (min - max): {round(min(test_intensities), 3)} - {round(max(test_intensities), 3)}')

fig = plt.figure(figsize=(15, 5))
ax1 = fig.add_subplot(1, 2, 1)
ax1.set_title("Average pixel intensities")
ax1.grid(True, 'major', 'y')
ax1.hist([train_intensities, test_intensities], bins=30, label=['Training set', 'Evaluating set'])
ax1.legend()

ax2 = fig.add_subplot(1, 2, 2)
ax2.set_title("Average pixel intensities")
ax2.grid(True, 'both', 'x')
ax2.boxplot([train_intensities, test_intensities], orientation='horizontal', tick_labels=['Training set', 'Evaluating set'])

fig.subplots_adjust(wspace=0.25)
fig.show()

### Contrast

In [None]:
def get_contrasts(root: str) -> list[float]:
    paths = get_paths(root, 2000)  
    contrasts = list()
    for p in paths:
        img = Image.open(p).convert('L')
        arr = np.array(img, dtype=np.float32)
        min_l = arr.min()
        max_l = arr.max()

        if max_l + min_l == 0:
            contrasts.append(0.0)
        else:
            contrasts.append((max_l - min_l) / (max_l + min_l))   
        
    return contrasts    

In [None]:
train_contrasts = get_contrasts(f'{path}\\Train')
test_contrasts = get_contrasts(f'{path}\\Test')

print('Training set')
print(f'Mean: {float(round(np.mean(train_contrasts), 3))}') 
print(f'Std: {float(round(np.std(train_contrasts), 3))}')
print(f'Range (min - max): {round(min(train_contrasts), 3)} - {round(max(train_contrasts), 3)}')
print()
print('Evaluating set')
print(f'Mean: {float(round(np.mean(test_contrasts), 3))}') 
print(f'Std: {float(round(np.std(test_contrasts), 3))}')
print(f'Range (min - max): {round(min(test_contrasts), 3)} - {round(max(test_contrasts), 3)}')

fig = plt.figure(figsize=(15, 5))
ax1 = fig.add_subplot(1, 2, 1)
ax1.set_title("Contrasts")
ax1.grid(True, 'major', 'y')
ax1.hist([train_contrasts, test_contrasts], bins=30, label=['Training set', 'Evaluating set'])
ax1.legend()

ax2 = fig.add_subplot(1, 2, 2)
ax2.set_title("Contrasts")
ax2.grid(True, 'both', 'x')
ax2.boxplot([train_contrasts, test_contrasts], orientation='horizontal', tick_labels=['Training set', 'Evaluating set'])

fig.subplots_adjust(wspace=0.25)
fig.show()

# Data preparation

After the data exploration it was investigated that data:
- Are well distributed
- Have the same size
- Test data are in average brighter than the evaluating data
- All data are in RGB mode 

In [None]:
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomRotation(20),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(
        brightness=0.3,
        contrast=0.3,
        saturation=0.3,
        hue=0.05
    ),
    transforms.GaussianBlur(kernel_size=3),
    transforms.ToTensor()
])

test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

In [None]:
train_dataset = torchvision.datasets.ImageFolder(root=f'{path}\\Train', transform=train_transform)
test_dataset = torchvision.datasets.ImageFolder(root=f'{path}\\Test', transform=test_transform)

In [None]:
aug_imgs = list()
fig, ax = plt.subplots(5, 5, figsize=(10, 10))
for i in range(25):
    img, label = train_dataset[i]
    img2 = img.permute(1, 2, 0).numpy() 
    aug_imgs.append(img2)
    ax[i // 5][i % 5].imshow(img2)
    ax[i // 5][i % 5].get_xaxis().set_visible(False)
    ax[i // 5][i % 5].get_yaxis().set_visible(False)

fig.tight_layout()
fig.show()

In [None]:
def get_labels(root: str) -> list[str]:
    return os.listdir(root)

# Model

### Metrics
- Accuracy
- Precision
- Recall
- Loss (train/val)
- Confusions

### Model architecture

In [None]:
class Model(nn.Module):
    def __init__(self, config: dict[str, Any]):
        super().__init__()
        self.main = nn.Sequential()
        
        output_w = None
        output_h = None
        
        last_l = None
            
        for l in range(config['count_of_conv_layers']):
            self.main.add_module(f'Conv_{l}',
                nn.Conv2d(
                    in_channels=3 if l == 0 else config['l0_filters'] * (2 ** (l - 1)),
                    kernel_size=config['conv_kernel_size'],
                    padding=config['padding'],
                    out_channels=config['l0_filters'] * (2 ** l)
                )
            )
            
            self.main.add_module(f'ConvLeakyReLU_{l}', nn.LeakyReLU())
            
            self.main.add_module(f'MaxPooling_{l}',
                nn.MaxPool2d(kernel_size=config['max_pool_kernel_size'])               
            )
            
            self.main.add_module(f'DropoutConv_{l}', nn.Dropout(config['dropout']))
            
            output_w = int((int((224 if l == 0 else output_w) + 2 * config['padding'] - 1 * (config['conv_kernel_size'] - 1) - 1) + 1) / config['max_pool_kernel_size'])
            output_h = int((int((224 if l == 0 else output_h) + 2 * config['padding'] - 1 * (config['conv_kernel_size'] - 1) - 1) + 1)/ config['max_pool_kernel_size'])
            
            last_l = l
                
        self.main.add_module('Flatten', nn.Flatten())     
                     
        self.main.add_module('Linear_Hidden', 
            nn.Linear(output_w * output_h * (config['l0_filters'] * (2 ** last_l)), config['n_hidden'])
        )
        self.main.add_module('LeakyReLU_Hidden', nn.LeakyReLU())
        self.main.add_module('Dropout_Hidden', nn.Dropout())
        
        self.main.add_module('Linear_Output', nn.Linear(config['n_hidden'], config['n_out']))   
        self.main.add_module('Sigmoid_Output', nn.Sigmoid())
              
    def forward(self, x):
        return self.main(x)

In [None]:
class Agent:
    device: str
    config: dict[str, Any]
    model: Model
    train_dataloader: torch.utils.data.DataLoader
    test_dataloader: torch.utils.data.DataLoader
    
    def __init__(self, config: dict[str, Any]):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.config = config
        self.model = Model(config).to(self.device)
        self.train_dataloader = torch.utils.data.DataLoader(batch_size=self.config['batch_size'], shuffle=True, dataset=train_dataset, num_workers=2)
        self.test_dataloader = torch.utils.data.DataLoader(batch_size=self.config['batch_size'], shuffle=True, dataset=test_dataset, num_workers=2)
    
    def fit(self):
        loss = nn.CrossEntropyLoss()
        
        opt = torch.optim.Adam(params=self.model.parameters(), lr=self.config["learning_rate"], weight_decay=self.config["weight_decay"])
        for epoch in range(self.config["epochs"]):
            log = dict()
            self.train(loss, opt, epoch, log)
            self.val(loss, epoch, log)

            wandb.log({
                "train_loss": log["train_avg_loss"],
                "val_loss": log["val_avg_loss"],
                "accuracy": log["accuracy"],
                "precision": log["precision"],
                "recall": log["recall"]
            })

    def train(self, loss, opt, epoch, log):
        total_loss = 0
        with tqdm(self.train_dataloader, desc=f"Train {epoch}: ") as progress:
            for x, y in progress:
                batch_size = x.size()[0]
                x = x.to(self.device)
                target = torch.zeros(size=(batch_size, self.config['n_out']), dtype=float)
                target[:, y] = 1.0
                target = target.to(self.device)

                predicted = self.model(x)
                # print(f'Size of prediction tensor: {predicted.size()} ({predicted})')
                # print(f'Size of correct truth tensor: {target.size()} ({target})')
                l = loss(predicted, target)

                opt.zero_grad()
                l.backward()
                opt.step()

                total_loss += l.item()

            avg_loss = total_loss / len(self.train_dataloader)
            log["train_avg_loss"] = avg_loss

    def val(self, loss, epoch, log):
        total_loss = 0
        all_preds = []
        all_targets = []

        with tqdm(self.test_dataloader, desc=f"Val {epoch}: ") as progress:
            with torch.no_grad():
                for x, y in progress:
                    batch_size = x.size()[0]
                    x = x.to(self.device)
                    target = torch.zeros(size=(batch_size, self.config['n_out']), dtype=float)
                    target[:, y] = 1.0
                    target = target.to(self.device)

                    predicted = self.model(x)
                    l = loss(predicted, target)
                    preds_np = predicted.argmax(dim=1).cpu().numpy()
                    targets_np = y.cpu().numpy()

                    all_preds.append(preds_np)
                    all_targets.append(targets_np)

                    total_loss += l.item()
                    
            
            all_preds = np.concatenate(all_preds)
            all_targets = np.concatenate(all_targets)
            accuracy, precision, recall, _ = self.compute_metrics(all_preds, all_targets)
            
            avg_loss = total_loss / len(self.test_dataloader)
            
            log["val_avg_loss"] = avg_loss
            log["accuracy"] = accuracy
            log["precision"] = precision
            log["recall"] = recall

    def compute_metrics(self, all_preds: np.ndarray, all_targets: np.ndarray) -> tuple[float, float, float, np.ndarray]:
        num_classes = self.config["n_out"] 
        
        conf = np.zeros((num_classes, num_classes), dtype=np.int64)
        for t, p in zip(all_targets, all_preds):
            conf[t, p] += 1

        accuracy = conf.trace() / conf.sum()

        precision = np.zeros(num_classes)
        recall = np.zeros(num_classes)

        for c in range(num_classes):
            tp = conf[c, c]
            fp = conf[:, c].sum() - tp
            fn = conf[c, :].sum() - tp

            precision[c] = tp / (tp + fp) if (tp + fp) > 0 else 0.0
            recall[c] = tp / (tp + fn) if (tp + fn) > 0 else 0.0

        precision = precision.mean()
        recall = recall.mean()  
        
        return (accuracy, precision, recall, conf)

In [None]:
config = {
    'count_of_conv_layers': 4,    
    'conv_kernel_size': 3,
    'l0_filters': 32, 
    'padding': 1,
    'max_pool_kernel_size': 2,
    
    'n_hidden': 512,
    'n_out': len(get_labels(f'{path}\\Test')),
    
    'batch_size': 52,
    'dropout': 0.3,
    'epochs': 200,
    'learning_rate': 10e-2,
    'weight_decay': 10e-4
}

In [None]:
wandb.init(project='Signs', config=config, reinit='finish_previous')
exit_code = 0

try:
    agent = Agent(wandb.config)
    agent.fit()
    torch.save(agent.model.state_dict(), 'state.pt')
    wandb.save('state.pt')
except Exception as e:
    print(e)
    exit_code = 1    
    
wandb.finish(exit_code)