In [None]:
import os
import time
from tqdm import tqdm
import torch
import torch.nn as nn
from torch.utils.data import Dataset,DataLoader
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

In [None]:
CFG = {
    'fold_num': 5,
    'seed': 719,
    'vgg_embedding_features': 8631,
    'lip_kpts': 40,
    'eye_kpts': 16,
    'device': torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),
    'train_bs': 32,
    'test_bs': 32,
    'epochs': 25,
    'num_workers': 2,
}

# Dataset

In [None]:
class MyDataset(Dataset):
    def __init__(self, fold,
                 data_root=None,
                ):
        super().__init__()
        face_feats = []
        lip_feats = []
        r_eye_feats = []
        l_eye_feats = []
        labels = []
        for fold_num in fold:
            face_feats.append(torch.load(os.path.join(data_root, fold_num, 'X_face.pt'), map_location=CFG['device']))
            lip_feats.append(torch.load(os.path.join(data_root, fold_num, 'X_lip.pt'), map_location=CFG['device']))
            l_eye_feats.append(torch.load(os.path.join(data_root, fold_num, 'X_l_eye.pt'), map_location=CFG['device']))
            r_eye_feats.append(torch.load(os.path.join(data_root, fold_num, 'X_r_eye.pt'), map_location=CFG['device']))
            labels.append(torch.load(os.path.join(data_root, fold_num, 'Y.pt'), map_location=CFG['device']))
                
        self.face_feats = torch.concatenate(face_feats, axis=0)
        self.lip_feats = torch.concatenate(lip_feats, axis=0)
        self.l_eye_feats = torch.concatenate(l_eye_feats, axis=0)
        self.r_eye_feats = torch.concatenate(r_eye_feats, axis=0)
        self.labels = torch.concatenate(labels, axis=0)

    def __len__(self):
        return self.face_feats.shape[0]

    def __getitem__(self, index: int):

        # get labels
        label = self.labels[index]
        face_feat = self.face_feats[index]
        lip_feat = self.lip_feats[index]
        l_eye_feat = self.l_eye_feats[index]
        r_eye_feat = self.r_eye_feats[index]
        
        return face_feat, lip_feat, l_eye_feat, r_eye_feat, label

In [None]:
dataset = MyDataset(fold=['fold2'], data_root='/kaggle/input/sust-feature-data/data/data')
loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=CFG['train_bs'],
    pin_memory=False,
    drop_last=False,
    shuffle=False,
)
pbar = tqdm(enumerate(loader), total=len(loader))
for step, (face_feat, lip_feat, l_eye_feat, r_eye_feat, label) in pbar:
    print(face_feat.shape)
    print(lip_feat.shape)
    print(l_eye_feat.shape)
    print(r_eye_feat.shape)
    print(label)
    if step > 3: break

# Model

In [None]:
class KeypointFeatureExtraction(nn.Module):
    def __init__(self, input_shape, output_shape, drop_prob=0.25):
        super(KeypointFeatureExtraction, self).__init__()

        self.keypoint_feature_extractor = nn.Sequential(
                nn.Linear(input_shape, output_shape),
                nn.SELU(),
            )
    def forward(self, x):
        return self.keypoint_feature_extractor(x)

# class ImageFeatureExtraction(nn.Module):
#     def __init__(self,in_feature,hidden_feature,output_feature,num_layers):
#         super(ImageFeatureExtraction, self).__init__()
#         self.ln1 = nn.Linear(in_feature,hidden_feature)
#         self.bn = nn.LayerNorm(hidden_feature)
#         self.ln2 = nn.ModuleList([nn.Linear(hidden_feature,hidden_feature) for _ in range(num_layers)])
#         self.bn2 = nn.ModuleList([nn.LayerNorm(hidden_feature) for _ in range(num_layers)])

#         self.ln3 = nn.Linear(hidden_feature,output_feature)
#         self.relu = nn.ReLU()
#     def forward(self, x):
#         x = self.relu(self.bn(self.ln1(x)))
#         for ln,bn in zip(self.ln2,self.bn2):
#              x = self.relu(bn(ln(x)))
#         return self.ln3(x)
    
class FeatureExtraction(nn.Module):
    def __init__(self, output_shape, img_drop_prob=0.5, kpt_drop_prob=0.3):
        super(FeatureExtraction, self).__init__()

        self.face_feature_extractor = nn.Sequential(
            nn.Linear(8631, output_shape), 
            nn.ELU(),
        )
        
        self.lip_extractor = KeypointFeatureExtraction(CFG['lip_kpts']*2, output_shape, kpt_drop_prob)
        self.eye_extractor = KeypointFeatureExtraction(CFG['eye_kpts']*2, output_shape, kpt_drop_prob)
        w = torch.empty(4,1)
        nn.init.xavier_uniform_(w)
        self.W = nn.Parameter(w)

    def forward(self, x_face, x_lip, x_l_eye, x_r_eye):
        # Face feature extractor
        face_feat = self.face_feature_extractor(x_face)
        lip_feat = self.lip_extractor(x_lip)
        l_eye_feat = self.eye_extractor(x_l_eye)
        r_eye_feat = self.eye_extractor(x_r_eye)
        w0, w1, w2, w3 = self.W
        out = w0[0]*face_feat + w1[0]*lip_feat + w2[0]*l_eye_feat + w3[0]*r_eye_feat

        return out


In [None]:
class DrowsinessClassifier(nn.Module):
    def __init__(self, output_size=2, feature_size=512, hidden_dim=512, n_layers=1, drop_prob=0.5):
        super(DrowsinessClassifier, self).__init__()
        self.feature_extraction = FeatureExtraction(feature_size)
        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.lstm = nn.LSTM(feature_size, hidden_dim, n_layers, batch_first=True)
#         self.fc = nn.Linear(hidden_dim, output_size)
        self.fc1 = nn.Linear(self.hidden_dim, 64)
        self.selu = nn.SELU()
        self.dropout = nn.Dropout(0.3)
        self.fc2 = nn.Linear(64, output_size)
        self._reinitialize()
    
    def _reinitialize(self):
        """
        Tensorflow/Keras-like initialization
        """
        for name, p in self.named_parameters():
            if 'lstm' in name:
                if 'weight_ih' in name:
                    nn.init.xavier_uniform_(p.data)
                elif 'weight_hh' in name:
                    nn.init.orthogonal_(p.data)
                elif 'bias_ih' in name:
                    p.data.fill_(0)
                    # Set forget-gate bias to 1
                    n = p.size(0)
                    p.data[(n // 4):(n // 2)].fill_(1)
                elif 'bias_hh' in name:
                    p.data.fill_(0)
            elif 'fc' in name:
                if 'weight' in name:
                    nn.init.xavier_uniform_(p.data)
                elif 'bias' in name:
                    p.data.fill_(0)

    def forward(self, x_face, x_lip, x_l_eye, x_r_eye):
        x = self.feature_extraction(x_face, x_lip, x_l_eye, x_r_eye)
        # embeddings and lstm_out
        lstm_out, hidden = self.lstm(x)
        lstm_out = lstm_out[:, -1, :]
#         out = self.fc(lstm_out)
        out = self.dropout(lstm_out)
        out = self.fc1(out)
        out = self.selu(out)
        out = self.fc2(out)

        return out

# Training

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_params_path = os.path.join(savedir, 'best_model_{}.pt'.format(model.__class__.__name__ ))

    torch.save(model.state_dict(), best_model_params_path)
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for step, (face_feats, lip_feats, l_eye_feats, r_eye_feats, labels) in tqdm(enumerate(dataloaders[phase]), total = len(dataloaders[phase])):
                labels = labels.to(CFG['device'])
                face_feats = face_feats.to(CFG['device'])
                lip_feats = lip_feats.to(CFG['device']) 
                l_eye_feats = l_eye_feats.to(CFG['device'])
                r_eye_feats = r_eye_feats.to(CFG['device'])
                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(face_feats, lip_feats, l_eye_feats, r_eye_feats)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * labels.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # deep copy the model
            if phase == 'val' and epoch_acc >= best_acc:
                print('Save best epoch')
                best_acc = epoch_acc
                torch.save(model.state_dict(), best_model_params_path)

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(torch.load(best_model_params_path))
    return model

In [None]:
training_data = MyDataset(fold=['fold3', 'fold4', 'fold5'], data_root='/kaggle/input/sust-feature-data/data/data')
train_dataloader = torch.utils.data.DataLoader(
    training_data,
    batch_size=CFG['train_bs'],
    pin_memory=False,
    drop_last=False,
    shuffle=True,
)
testing_data = MyDataset(fold=['fold2'], data_root='/kaggle/input/sust-feature-data/data/data')
test_dataloader = torch.utils.data.DataLoader(
    testing_data,
    batch_size=CFG['test_bs'],
    pin_memory=False,
    drop_last=False,
    shuffle=False,
)

In [None]:
def seed_everything(seed):
#     random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
#     np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(42)

dataloaders = {'train': train_dataloader, 'val': test_dataloader}
dataset_sizes = {'train': len(training_data), 'val': len(testing_data)}
savedir = '/kaggle/working/weights'
if not os.path.exists(savedir):
    os.makedirs(savedir)

model = DrowsinessClassifier().to(CFG['device'])
    
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)
model = train_model(model, criterion, optimizer, exp_lr_scheduler,num_epochs=25)

# Evaluation

In [None]:
dataset = MyDataset(fold=['fold1'], data_root='/kaggle/input/sust-feature-data/data/data')

In [None]:
result=[]
ground_truth = []
for i in tqdm(range(len(dataset)), total=len(dataset)):
    face_feats, lip_feats, l_eye_feats, r_eye_feats, label = dataset[i]
    with torch.no_grad():
        outputs = model(
            torch.unsqueeze(face_feats, 0), 
            torch.unsqueeze(lip_feats, 0), 
            torch.unsqueeze(l_eye_feats, 0), 
            torch.unsqueeze(r_eye_feats, 0)
        )
        _, preds = torch.max(outputs, 1)
        result.append(preds.item())
        ground_truth.append(label.item())

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.metrics import classification_report

# accuracy: (tp + tn) / (p + n)
accuracy = accuracy_score(ground_truth, result)
print('Accuracy: %f' % accuracy)
# precision tp / (tp + fp)
precision = precision_score(ground_truth, result)
print('Precision: %f' % precision)
# recall: tp / (tp + fn)
recall = recall_score(ground_truth, result)
print('Recall: %f' % recall)
# f1: 2 tp / (2 tp + fp + fn)
f1 = f1_score(ground_truth, result)
print('F1 score: %f' % f1)
print(classification_report(ground_truth, result))
matrix = confusion_matrix(ground_truth, result)
disp = ConfusionMatrixDisplay(confusion_matrix=matrix, display_labels=[0, 1])
disp.plot()