# Model

## Imports

In [None]:
import json
from gensim.models import KeyedVectors
from random import shuffle
from tqdm import tqdm
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
from torch.optim import Adam
from torch.nn.utils.rnn import pad_sequence
from sklearn.preprocessing import normalize
from sklearn.metrics import f1_score, precision_score, recall_score
import os

## Files

- "train_x", "test_x", "valid_x"
- "train_y", "test_y", "valid_y"
- "train_vocab", "test_vocab", "valid_vocab"
- "train_sub", "test_sub", "valid_sub"
- "token_to_idx", "label_to_idx", "domain_to_idx", "hash_to_terminal"
- "domain_classes"
- "embeddings", "model_path"
- "kernel_size", "num_filters", "embedding_dim"
- "dropout", "weight_decay", "learning_rate", "batch_size", "num_epochs"

In [None]:
config_path = "../model_preferences.json"

## Data

In [None]:
class CnnDataset:
    def __init__(self, tokens_file, labels_file, token_to_idx_file, label_to_idx_file):
        self.tokens_list = []
        self.labels = []
        self.token_to_idx = {}
        self.label_to_idx = {}

        with open(tokens_file, "r", encoding="UTF-8") as f:
            for x in tqdm(f):
                x = x.replace('\x00', '').strip()
                self.tokens_list.append(x.split(u' '))
        with open(labels_file, "r", encoding="UTF-8") as f:
            for x in tqdm(f):
                x = x.replace('\x00', '').strip()
                self.labels.append(x.split('\n')[0])
        self.token_to_idx["@@PAD@@"] = 0
        self.token_to_idx["@@UNK@@"] = 1
        idx = 2
        with open(token_to_idx_file, "r", encoding="UTF-8") as f:
            for x in tqdm(f):
                x = x.replace('\x00', '').strip()
                self.token_to_idx[x.split('\n')[0]] = idx
                idx += 1
        idx = 0
        with open(label_to_idx_file, "r", encoding="UTF-8") as f:
            for x in tqdm(f):
                x = x.replace('\x00', '').strip()
                self.label_to_idx[x.split('\n')[0]] = idx
                idx += 1
        
    def __len__(self):
        return len(self.tokens_list)

    def numericalize(self, tokens):
        indices = [self.token_to_idx.get(token) for token in tokens]
        
        return np.array(indices, dtype=np.int64)
        
    def __getitem__(self, idx):
        tokens = self.tokens_list[idx]
        label = self.labels[idx]
        label = self.label_to_idx.get(label)
        indices = self.numericalize(tokens)
        return torch.tensor(indices), torch.tensor(label, dtype=torch.int64)

In [None]:
class CnnDataloader:
    def __init__(self, dataset, batch_size=1024, shuffle=True):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indices = np.arange(len(dataset))
        
    def __iter__(self):
        if self.shuffle:
            np.random.shuffle(self.indices)
        
        for start_idx in range(0, len(self.dataset), self.batch_size):
            batch_indices = self.indices[start_idx:start_idx + self.batch_size]
            batch_tokens, batch_labels = [], []
            for idx in batch_indices:
                token, label = self.dataset[idx]
                batch_tokens.append(token)
                batch_labels.append(label)
            
            batch_tokens = pad_sequence(batch_tokens, batch_first=True, padding_value=0)
            batch_labels = torch.stack(batch_labels)
            yield batch_tokens, batch_labels
            
    def __len__(self):
        return int(np.ceil(len(self.dataset) / self.batch_size))

In [None]:
with open(config_path) as fp:
    config = json.load(fp)

In [None]:
train_dataset = CnnDataset(config["train_x"], config["train_y"], config["token_to_idx"], config["label_to_idx"])
valid_dataset = CnnDataset(config["valid_x"], config["valid_y"], config["token_to_idx"], config["label_to_idx"])
test_dataset = CnnDataset(config["test_x"], config["test_y"], config["token_to_idx"], config["label_to_idx"])

In [None]:
train_loader = CnnDataloader(train_dataset, batch_size=config["batch_size"], shuffle=True)
val_loader   = CnnDataloader(valid_dataset, batch_size=config["batch_size"], shuffle=False)
test_loader  = CnnDataloader(test_dataset, batch_size=1, shuffle=False)

## Embeddings

In [None]:
if config["embeddings"] !=  "":
    w2v = KeyedVectors.load_word2vec_format(config["embeddings"], binary=True)
    vocab_size = len(train_dataset.token_to_idx)
    embeddings = np.zeros((vocab_size, config["embedding_dim"]))
    embeddings[train_dataset.token_to_idx['@@UNK@@']] = normalize(np.random.uniform(-0.25, 0.25, (1, config["embedding_dim"])), norm='l2', axis=1)
    embeddings[train_dataset.token_to_idx['@@PAD@@']] = np.zeros((1, config["embedding_dim"]))
    for word, idx in train_dataset.token_to_idx.items():
        if word in w2v.wv:  
            embeddings[idx] = w2v.wv[word]
    embeddings = torch.tensor(embeddings, dtype=torch.float64)

## Model

In [None]:
class CodeCNN(nn.Module):
    def __init__(self, 
                 kernel_size, 
                 embedding_dim, 
                 num_filters, 
                 padding_idx, 
                 num_labels,
                 num_classes, 
                 vocab_size,
                 dropout=0.5,
                 pretrained_embeddings=None,
                 pretrained_domains=None,
                 freeze_embedding=False,
                 freeze_domains=False):
        super(CodeCNN, self).__init__()
        
        self.kernel_size = kernel_size
        self.embedding_dim = embedding_dim
        self.num_filters = num_filters
        self.padding_idx = padding_idx
        self.num_labels = num_labels
        self.num_classes = num_classes
        self.vocab_size = vocab_size
        self.dropout = nn.Dropout(dropout)
        self.num_domains = num_labels // num_classes

        # Pads the input tensor boundaries with a constant value.
        # padding (int, tuple) = (kernel_size - 1, 0)
        self.pad = nn.ConstantPad1d(self.kernel_size - 1, self.padding_idx)

        # Embedding layer
        # [emb_dim, vocab_size]
        if pretrained_embeddings is not None:
            self.embedding = nn.Embedding.from_pretrained(pretrained_embeddings, 
                                                          freeze=freeze_embedding)
        else:
            self.embedding = nn.Embedding(num_embeddings=self.vocab_size,
                                          embedding_dim=self.embedding_dim,
                                          padding_idx=self.padding_idx,
                                          max_norm=1.0)

        # 1-D Convolution
        # [emb_dim * kernel_size, num_filters]
        self.conv = nn.Conv1d(1, self.num_filters, self.embedding_dim * self.kernel_size,
                              stride=self.embedding_dim, bias=True)
        
        # Matrix that represents domains 
        # [num_filters, num_domains]
        if pretrained_domains is not None:
            pretrained_domains = F.normalize(pretrained_domains, p=2, dim=1)
            self.attention_domains = nn.Embedding.from_pretrained(pretrained_domains,
                                                                  freeze=freeze_domains)
        else:
            self.attention_domains = nn.Embedding(num_embeddings=self.num_domains,
                                                  embedding_dim=self.num_filters,
                                                  padding_idx=self.padding_idx,
                                                  max_norm=1.0)
            nn.init.xavier_uniform_(self.attention_domains.weight)

        # List of fc-layers: one for each domain
        # [num_filters, num_classes]
        self.fcs = nn.ModuleList([
            nn.Linear(self.num_filters, self.num_classes)
            for _ in range(self.num_domains)
        ])

        # Batch normalization
        # [num_filters]
        self.bn_conv = nn.BatchNorm1d(self.num_filters)

    def forward(self, inputs):
        data, domain = inputs
        
        # Make all sentences equal in length
        # [batch_size, seq_len]
        x = self.pad(data)
        
        # Apply embedding layer to each word
        # [batch, seq_len, emb_dim]
        embedded_x = self.embedding(x).float() 
        
        # Reshape 
        # [batch_size, 1, seq_len * embedding_dim]
        embedded_x = embedded_x.view(-1, 1, x.shape[1] * self.embedding_dim)
        # Apply all convolution filters in parallel
        # [batch_size, num_filters, seq_len]
        features = self.conv(embedded_x)  

        activations = self.bn_conv(features)

        # ReLU
        activations = F.relu(activations)
 
        # Dropout
        activations = self.dropout(activations)
        
        # Choose domains
        # [batch_size, num_filters]
        domains = self.attention_domains(domain)

        # Get attention weights
        # [batch_size, seq_len]
        attention_weights = torch.einsum('bf,bfs->bs', domains, activations)
        attention_weights = F.softmax(attention_weights, dim=1)

        # Compute the weighted sum
        # [batch_size, num_filters]
        result = torch.einsum('bs,bfs->bf', attention_weights, activations)
        
        # Get logits
        # [batch_size, num_domains]
        logits = torch.stack([
            self.fcs[domain[i]](result[i])
            for i in range(result.shape[0])
        ])
        
        return {'features': features,
                'activations': activations,
                'domains': domains,
                'logits': logits}

    def get_embeddings(self):
        return self.embedding.weight
        
    def get_filters(self):
        return self.conv.weight.squeeze(), self.conv.bias

    def get_attention_domains(self):
        return self.attention_domains.weight
    
    def get_fc_weights(self):
        return [(fc.weight.data, fc.bias.data) for fc in self.fcs]

In [None]:
class Trainer:
    def __init__(self, model, train_loader, valid_loader, test_loader, config):
        self.model = model
        self.train_loader = train_loader
        self.valid_loader = valid_loader
        self.test_loader = test_loader
        self.config = config
        self.device = torch.device("cuda" if config.get("cuda", False) else "cpu")
        self.model.to(self.device)
        self.optimizer = Adam(self.model.parameters(), lr=config.get("learning_rate", 1e-3), weight_decay=config.get("weight_decay", 1e-4))
        self.criterion = nn.CrossEntropyLoss()

    def _run_epoch(self, loader, desc="Evaluating"):
        epoch_loss = 0
        corrects = 0
        all_preds = []
        all_labels = []
        num_classes = len(self.config["domain_classes"])

        for batch in tqdm(loader, desc=desc):
            batch_x, batch_y = batch 
            batch_x = torch.LongTensor(batch_x).to(self.device)
            batch_y = torch.LongTensor(batch_y).to(self.device)
            
            domain = batch_y // num_classes  
            
            outputs = self.model((batch_x, domain))
            logits = outputs['logits']
            loss = self.criterion(logits, batch_y % num_classes)
            epoch_loss += loss.item()
            
            pred_labels = torch.max(logits, 1)[1]
            corrects += (pred_labels == (batch_y % num_classes)).sum().item()
            
            all_preds.extend(pred_labels.cpu().numpy())
            all_labels.extend((batch_y % num_classes).cpu().numpy())

        avg_loss = epoch_loss / len(loader)
        accuracy = corrects / (len(loader.dataset)) * 100
        f1 = f1_score(all_labels, all_preds, average="macro")
        precision = precision_score(all_labels, all_preds, average="macro")
        recall = recall_score(all_labels, all_preds, average="macro")

        return avg_loss, accuracy, f1, precision, recall

    def train_epoch(self):
        """
        Run one training epoch.
        """
        self.model.train()
        epoch_loss = 0
        corrects = 0
        all_preds = []
        all_labels = []
        num_classes = len(self.config["domain_classes"])

        for batch in tqdm(self.train_loader, desc="Training"):
            batch_x, batch_y = batch  # adjust based on your DataLoader
            batch_x = torch.LongTensor(batch_x).to(self.device)
            batch_y = torch.LongTensor(batch_y).to(self.device)
            domain = batch_y // num_classes
            
            self.optimizer.zero_grad()
            outputs = self.model((batch_x, domain))
            logits = outputs['logits']
            loss = self.criterion(logits, batch_y % num_classes)
            loss.backward()
            self.optimizer.step()
            
            epoch_loss += loss.item()
            pred_labels = torch.max(logits, 1)[1]
            corrects += (pred_labels == (batch_y % num_classes)).sum().item()
            all_preds.extend(pred_labels.cpu().numpy())
            all_labels.extend((batch_y % num_classes).cpu().numpy())

        avg_loss = epoch_loss / len(self.train_loader)
        accuracy = corrects / (len(self.train_loader.dataset)) * 100
        f1 = f1_score(all_labels, all_preds, average="macro")
        precision = precision_score(all_labels, all_preds, average="macro")
        recall = recall_score(all_labels, all_preds, average="macro")
        
        return avg_loss, accuracy, f1, precision, recall

    def validate(self):
        self.model.eval()
        with torch.no_grad():
            return self._run_epoch(self.valid_loader, desc="Validating")

    def test(self):
        self.model.eval()
        with torch.no_grad():
            return self._run_epoch(self.test_loader, desc="Testing")


## Train

In [None]:
#######################
model_name = "model"
torch.cuda.set_device(0)
#######################
concrete_model_path = config["model_path"] + "/" + model_name;
log_path = concrete_model_path + "/logs"
weights_path = concrete_model_path + "/weights"

if not os.path.exists(config["model_path"]):
    os.makedirs(config["model_path"])
if not os.path.exists(concrete_model_path):
    os.makedirs(concrete_model_path)
if not os.path.exists(log_path):
    os.makedirs(log_path)
if not os.path.exists(weights_path):
    os.makedirs(weights_path)

model = CodeCNN(kernel_size=config["kernel_size"], embedding_dim=config["embedding_dim"],
                            num_filters=config["num_filters"], padding_idx=0,
                            num_labels=len(train_dataset.label_to_idx), num_classes=len(config["domain_classes"]), vocab_size=len(train_dataset.token_to_idx))
trainer = Trainer(model, train_loader, val_loader, test_loader, config)

In [None]:
print("\t".join(["Epoch", "Train Loss", "Train Acc", "Train F1", "Train Precision", "Train Recall", "Eval Loss", "Eval Acc", "Eval F1", "Eval Precision", "Evall Recall", "Best Eval Acc"]))
metrics = {"train_loss": [], "train_acc": [], "train_f1":[], "train_prec":[], "train_rec":[], "eval_loss": [], "eval_acc": [], "eval_f1":[], "eval_prec":[], "eval_rec":[], "best": -1}
best_val_loss = float('inf')
patience = 5
patience_counter = 0
for epoch in range(config["num_epochs"]):
    train_loss, train_acc, train_f1, train_prec, train_rec = trainer.train_epoch()
    eval_loss, eval_acc, eval_f1, eval_prec, eval_rec = trainer.validate()
    metrics["train_loss"].append(train_loss)
    metrics["train_acc"].append(train_acc)
    metrics["train_f1"].append(train_f1)
    metrics["train_prec"].append(train_prec)
    metrics["train_rec"].append(train_rec)
    
    metrics["eval_loss"].append(eval_loss)
    metrics["eval_acc"].append(eval_acc)
    metrics["eval_f1"].append(eval_f1)
    metrics["eval_prec"].append(eval_prec)
    metrics["eval_rec"].append(eval_rec)

    if eval_acc > metrics["best"]:
        metrics["best"] = eval_acc
        torch.save(model, concrete_model_path + "/model.pt")
        
    if eval_loss < best_val_loss:
        best_val_loss = eval_loss
        patience_counter = 0
    else:
        patience_counter += 1
         
    print(f"{I}\t{train_loss:.5f}\t{train_acc:.2f}\t{train_f1:.2f}\t{train_prec:.2f}\t{train_rec:.2f}\t{eval_loss:.5f}\t{eval_acc:.2f}\t{eval_f1:.2f}\t{eval_prec:.2f}\t{eval_rec:.2f}\t{metrics['best']:.2f}")

    if (patience_counter >= patience):
        print("Early stopping triggered")
        break
with open(log_path + "/metrics.json", "w") as fp:
    json.dump(metrics, fp, indent=4)

## Test

In [None]:
print("\t".join(["Test Acc", "Test F1", "Test Precision", "Test Recall"]))
loss, acc, f1, prec, rec = trainer.test()
print(f"{acc:.2f}\t{f1:.2f}\t{prec:.2f}\t{rec:.2f}")

## Save weights

In [None]:
def save_embeddings(model, word2idx, file_path):
    embeddings = model.get_embeddings().detach().cpu().numpy().astype(np.float32)
    vocab_size = len(word2idx)
    emb_dim = embeddings.shape[1]

    with open(file_path, 'wb') as f:
        header = f"{vocab_size} {emb_dim}\n"
        f.write(header.encode('utf-8'))

        for word, idx in word2idx.items():
            word_part = word.encode('utf-8') + b' '
            f.write(word_part)
            f.write(embeddings[idx].tobytes())

    
def save_mat(file_path, tens):
    with open(file_path, "wb") as f:
        f.write(tens.detach().cpu().numpy().T.tobytes())

In [None]:
# conv
# [num_filters, kernel_size * emb_dim]
# [num_filters]
convs, bias = model.get_filters()
# domains
# [num_domains, num_filters]
at_dom = model.get_attention_domains()
# embeddings
# [vocab_size, emb_dim]
emb = model.get_embeddings()
# bn_conv
# [num_filters]
alpha = model.bn_conv.weight
# [num_filters]
beta = model.bn_conv.bias
# [num_filters]
mean = model.bn_conv.running_mean
# [num_filters]
var = model.bn_conv.running_var
# fc layer
lst = model.get_fc_weights()
w_lst = []
b_lst = []
for elem in lst:
    w,b = elem
    w_lst.append(w)
    b_lst.append(b)
# [num_domains * num_classes, num_filters]    
fc_mat = torch.cat(w_lst, dim=0)
# [num_domains * num_classes] 
fc_bias = torch.cat(b_lst, dim=0)

In [None]:
save_embeddings(model, train_dataset.token_to_idx, weights_path + "/embeddings.bin")

In [None]:
save_mat(f"{weights_path}/conv_matrix.bin", convs)
save_mat(f"{weights_path}/conv_bias.bin", bias)
save_mat(f"{weights_path}/attention_domains.bin", at_dom)
save_mat(f"{weights_path}/bn_alpha.bin", alpha)
save_mat(f"{weights_path}/bn_beta.bin", beta)
save_mat(f"{weights_path}/bn_mean.bin", mean)
save_mat(f"{weights_path}/bn_var.bin", var)
save_mat(f"{weights_pathh}/fc_matrices.bin", fc_mat)
save_mat(f"{weights_path}/fc_biases.bin", fc_bias)