In [None]:
# Source: https://www.kaggle.com/code/polomarco/ecg-classification-cnn-lstm-attention-mechanism

In [None]:
import os
#import itertools
#import time
import random

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
#import matplotlib.pyplot as plt
#import matplotlib.colors as mcolors
#import seaborn as sns

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from torch.optim.lr_scheduler import (CosineAnnealingLR,
                                      CosineAnnealingWarmRestarts,
                                      StepLR,
                                      ExponentialLR)
import sklearn.utils
from sklearn.model_selection import train_test_split
#from sklearn.metrics import precision_recall_curve
#from sklearn.metrics import classification_report, confusion_matrix
#from sklearn.metrics import accuracy_score, auc, f1_score, precision_score, recall_score

In [None]:
class Config:
    csv_path = ''
    seed = 69
    device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

    max_words_in_sentence = 10
    video_folder = 'video'

    # attn_state_path = 'attn.pth'
    # attn_logs = 'attn.csv'
    
    train_csv_path = 'sentences.csv'
    # test_csv_path = '.csv'

def seed_everything(seed: int):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)

config = Config()
seed_everything(config.seed)

In [None]:
# read cvs file
sentences = pd.read_csv('sentences.csv')

# unique words
word_set = set()
sentences.iloc[:,2].str.lower().str.split().apply(word_set.update)
sorted_word_set = sorted(word_set)
print('Unique words',sorted_word_set)

# create word encoding
encodings = { k:v for v,k in enumerate(sorted_word_set)}
print('Word encodings',encodings)

# converts a sentence with zero padded encoding list
def get_sentence_encoded(sentence):
    encoded = [encodings[key] for key in sentence.split()]
    return  encoded + list([0]) * (config.max_words_in_sentence - len(encoded))

# print(get_sentence_encoded('mən hansı sənəd vermək'))
# print(get_sentence_encoded('mən bakı yaşamaq'))

# generate (video file name, encoding list)
# Good recommendation on not to iterate over DFs like this:
# https://stackoverflow.com/questions/16476924/how-to-iterate-over-rows-in-a-dataframe-in-pandas
# but it's not my case - I have fewer rows and one to many with videos.
df = pd.DataFrame(columns=["id", "video_file","encoding"])

for index, row in sentences.head(2).iterrows():
    id = row[0]
    phrase = row[2].lower()
    encoded = get_sentence_encoded(phrase)
    # iterate over video folders
    dir = config.video_folder+'/'+str(id)
    for filename in os.listdir(dir):
        f = os.path.join(dir, filename)
        # checking if it is a file
        if os.path.isfile(f):
            entry = pd.DataFrame.from_dict({"id": id, "video_file": f, "encoding": [encoded]})
            df = pd.concat([df, entry], ignore_index = True)

print(df)

Unique words ['1', '2', 'ad', 'ana', 'ata', 'azərbaycan', 'bakı', 'bu', 'dünən', 'getmək', 'hansı', 'iş', 'mən', 'necə', 'nə', 'olmaq', 'ora', 'orda', 'oğul', 'paytaxt', 'qız', 'siz', 'subay', 'sənəd', 'var', 'vermək', 'yaşamaq', 'yox']
Word encodings {'1': 0, '2': 1, 'ad': 2, 'ana': 3, 'ata': 4, 'azərbaycan': 5, 'bakı': 6, 'bu': 7, 'dünən': 8, 'getmək': 9, 'hansı': 10, 'iş': 11, 'mən': 12, 'necə': 13, 'nə': 14, 'olmaq': 15, 'ora': 16, 'orda': 17, 'oğul': 18, 'paytaxt': 19, 'qız': 20, 'siz': 21, 'subay': 22, 'sənəd': 23, 'var': 24, 'vermək': 25, 'yaşamaq': 26, 'yox': 27}
  id                       video_file                          encoding
0  1  video/1/2022-04-19 15-44-38.mp4  [21, 2, 14, 0, 0, 0, 0, 0, 0, 0]
1  1  video/1/2022-04-22 11-15-55.mp4  [21, 2, 14, 0, 0, 0, 0, 0, 0, 0]
2  1  video/1/2022-04-21 17-23-55.mp4  [21, 2, 14, 0, 0, 0, 0, 0, 0, 0]
3  2  video/2/2022-04-19 15-17-13.mp4  [12, 6, 26, 0, 0, 0, 0, 0, 0, 0]


In [None]:
class SLDataset(Dataset):

    def __init__(self, df):
        # shuffle and save
        self.df = sklearn.utils.shuffle(df) 

    def __getitem__(self, idx):
        video = df.iloc[idx,1]
        encoding = df.iloc[idx,2]
        # keep frames with hands

        # get last convolutional layer for each
        video_features = 0

        return video, encoding

    def __len__(self):
        return len(self.df)

def get_dataloader(df, phase: str, batch_size: int = 96) -> DataLoader:
    '''
    Dataset and DataLoader.
    Parameters:
        phase: training or validation phase.
        batch_size: data per iteration.
    Returns:
        data generator
    '''
    train_df, val_df = train_test_split(df, test_size=0.15, random_state=config.seed)#, stratify=df['id'])
    train_df, val_df = train_df.reset_index(drop=True), val_df.reset_index(drop=True)
    df = train_df if phase == 'train' else val_df
    dataset = SLDataset(df)
    dataloader = DataLoader(dataset=dataset, batch_size=batch_size, num_workers=4)
    return dataloader

dl = get_dataloader(df,'train',4)
next(iter(dl))

[('video/1/2022-04-19 15-44-38.mp4',
  'video/1/2022-04-22 11-15-55.mp4',
  'video/1/2022-04-21 17-23-55.mp4'),
 [tensor([21, 21, 21]), tensor([2, 2, 2]), tensor([14, 14, 14])]]

In [None]:
class RNNAttentionModel(nn.Module):
    def __init__(
        self,
        input_size,
        hid_size,
        rnn_type,
        bidirectional,
        n_classes=5,
        kernel_size=5,
    ):
        super().__init__()
 
        self.rnn_layer = nn.LSTM(
                input_size = 46, #hid_size * 2 if bidirectional else hid_size,
                hidden_size = hid_size,
                num_layers = 1,
                dropout = 0,
                bidirectional = bidirectional,
                batch_first = True)
        
        self.conv1 = ConvNormPool(
            input_size=input_size,
            hidden_size=hid_size,
            kernel_size=kernel_size,
        )
        self.conv2 = ConvNormPool(
            input_size=hid_size,
            hidden_size=hid_size,
            kernel_size=kernel_size,
        )
        self.avgpool = nn.AdaptiveMaxPool1d((1))
        self.attn = nn.Linear(hid_size, hid_size, bias=False)
        self.fc = nn.Linear(in_features=hid_size, out_features=n_classes)
        
    def forward(self, input):
        x = self.conv1(input)
        x = self.conv2(x)
        x_out, hid_states = self.rnn_layer(x)
        x = torch.cat([hid_states[0], hid_states[1]], dim=0).transpose(0, 1)
        x_attn = torch.tanh(self.attn(x))
        x = x_attn.bmm(x_out)
        x = x.transpose(2, 1)
        x = self.avgpool(x)
        x = x.view(-1, x.size(1) * x.size(2))
        x = F.softmax(self.fc(x), dim=-1)
        return x

In [None]:
class Meter:
    def __init__(self, n_classes=5):
        self.metrics = {}
        self.confusion = torch.zeros((n_classes, n_classes))
    
    def update(self, x, y, loss):
        x = np.argmax(x.detach().cpu().numpy(), axis=1)
        y = y.detach().cpu().numpy()
        self.metrics['loss'] += loss
        self.metrics['accuracy'] += accuracy_score(x,y)
        self.metrics['f1'] += f1_score(x,y,average='macro')
        self.metrics['precision'] += precision_score(x, y, average='macro', zero_division=1)
        self.metrics['recall'] += recall_score(x,y, average='macro', zero_division=1)
        
        self._compute_cm(x, y)
        
    def _compute_cm(self, x, y):
        for prob, target in zip(x, y):
            if prob == target:
                self.confusion[target][target] += 1
            else:
                self.confusion[target][prob] += 1
    
    def init_metrics(self):
        self.metrics['loss'] = 0
        self.metrics['accuracy'] = 0
        self.metrics['f1'] = 0
        self.metrics['precision'] = 0
        self.metrics['recall'] = 0
        
    def get_metrics(self):
        return self.metrics
    
    def get_confusion_matrix(self):
        return self.confusion

In [None]:
class Trainer:
    def __init__(self, net, lr, batch_size, num_epochs):
        self.net = net.to(config.device)
        self.num_epochs = num_epochs
        self.criterion = nn.CrossEntropyLoss()
        self.optimizer = AdamW(self.net.parameters(), lr=lr)
        self.scheduler = CosineAnnealingLR(self.optimizer, T_max=num_epochs, eta_min=5e-6)
        self.best_loss = float('inf')
        self.phases = ['train', 'val']
        self.dataloaders = {
            phase: get_dataloader(phase, batch_size) for phase in self.phases
        }
        self.train_df_logs = pd.DataFrame()
        self.val_df_logs = pd.DataFrame()
    
    def _train_epoch(self, phase):
        print(f"{phase} mode | time: {time.strftime('%H:%M:%S')}")
        
        self.net.train() if phase == 'train' else self.net.eval()
        meter = Meter()
        meter.init_metrics()
        
        for i, (data, target) in enumerate(self.dataloaders[phase]):
            data = data.to(config.device)
            target = target.to(config.device)
            
            output = self.net(data)
            loss = self.criterion(output, target)
                        
            if phase == 'train':
                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()
            
            meter.update(output, target, loss.item())
        
        metrics = meter.get_metrics()
        metrics = {k:v / i for k, v in metrics.items()}
        df_logs = pd.DataFrame([metrics])
        confusion_matrix = meter.get_confusion_matrix()
        
        if phase == 'train':
            self.train_df_logs = pd.concat([self.train_df_logs, df_logs], axis=0)
        else:
            self.val_df_logs = pd.concat([self.val_df_logs, df_logs], axis=0)
        
        # show logs
        print('{}: {}, {}: {}, {}: {}, {}: {}, {}: {}'
              .format(*(x for kv in metrics.items() for x in kv))
             )
        fig, ax = plt.subplots(figsize=(5, 5))
        cm_ = ax.imshow(confusion_matrix, cmap='hot')
        ax.set_title('Confusion matrix', fontsize=15)
        ax.set_xlabel('Actual', fontsize=13)
        ax.set_ylabel('Predicted', fontsize=13)
        plt.colorbar(cm_)
        plt.show()
        
        return loss
    
    def run(self):
        for epoch in range(self.num_epochs):
            self._train_epoch(phase='train')
            with torch.no_grad():
                val_loss = self._train_epoch(phase='val')
                self.scheduler.step()
            
            if val_loss < self.best_loss:
                self.best_loss = val_loss
                print('\nNew checkpoint\n')
                self.best_loss = val_loss
                torch.save(self.net.state_dict(), f"best_model_epoc{epoch}.pth")
            #clear_output()

In [None]:
attn_model = RNNAttentionModel(1, 64, 'lstm', False).to(config.device)
attn_model.load_state_dict(
    torch.load(config.attn_state_path,
               map_location=config.device)
);
attn_model.eval();
logs = pd.read_csv(config.attn_logs)