In [None]:
import torch as torch
import torch.nn as nn
import torch.nn.functional as F

import pandas as pd
import pickle

# PyTorch TensorBoard support
from torch.utils.tensorboard import SummaryWriter
from datetime import datetime

from sklearn.model_selection import KFold
from tqdm.notebook import tqdm

from sklearn import svm

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!tar xvzf /content/drive/MyDrive/final.tar.gz
!tar xvzf /content/drive/MyDrive/vectors.tar.gz
!cp  /content/drive/MyDrive/tokenizer.py tokenizer.py
!cp /content/drive/MyDrive/models/* .

In [None]:
from tokenizer import Tokenizer

In [None]:
tokenlist = ["", *[f"variable{x}" for x in range(20)], *[f"function{x}" for x in range(30)],  "continue", "unsigned", "default", "typedef", "define", "double", "extern", "signed", "sizeof", "static", "struct", "switch", "return", "break", "const", "float", "short", "union", "while", "auto", "case", "char", "else", "enum", "goto", "long", "main", "void", "for", "int", "do", "if", " ", "!", "?", "_", "\"", "#", "$", "%", "&", "’", "(", ")", "*", "+", ",", "-", ".", "/", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", ":", ";", "<", "=", ">", "@", "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "[", "\\", "]", "⌃", "‘", "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z", "{", "|", "}", "∼"]
token_to_ix = {0:0}
for i in enumerate(tokenlist):
    token_to_ix[i[1]]=i[0]
def prepare_sequence(seq, to_ix):
    idxs = [to_ix.get(w,0) for w in seq]
    return torch.tensor(idxs, dtype=torch.long)
def get_group(ix):
    if ix>=token_to_ix["variable0"] and ix<=token_to_ix["variable19"]:
        return "variable"
    if ix>=token_to_ix["function0"] and ix<=token_to_ix["function29"]:
        return "function"
    if ix>=token_to_ix["continue"] and ix<=token_to_ix["if"]:
        return "keyword"
    if ix>=token_to_ix["A"] and ix<=token_to_ix["Z"]:
        return "alphabet"
    if ix>=token_to_ix["a"] and ix<=token_to_ix["z"]:
        return "alphabet"
    if ix>=token_to_ix["0"] and ix<=token_to_ix["9"]:
        return "numbers"
    return "punctuation"

In [None]:
class CDataset(torch.utils.data.Dataset):
    def __init__(self, file, limit=None):
        with open(file, 'rb') as f:
            if limit==None:
                self.data = [(prepare_sequence(tk,token_to_ix), prepare_sequence(tg,token_to_ix)) for tk, tg in pickle.load(f)]
            else:
                self.data = [(prepare_sequence(tk,token_to_ix), prepare_sequence(tg,token_to_ix)) for tk, tg in pickle.load(f).head(limit)]
    def __len__(self):
        return len(self.data)
    def __getitem__(self, idxs):
        return self.data[idxs]

dataset = CDataset('final.pckl')

In [None]:
class CCM(nn.Module):
    def __init__(self, embedding_dim=32, hidden_dim=64, num_layers=4, vocab_size=178):
        super(CCM, self).__init__()
        self.token_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.dropout = nn.Dropout(p=0.5)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, dropout=0.5, batch_first=True)
        self.hidden2tag = nn.Linear(hidden_dim, vocab_size)
    def forward(self, x):
        embeds = self.token_embeddings(x)
        dropout = self.dropout(embeds.float())
        lstm_out, _ = self.lstm(dropout.view(x.size(dim=0), x.size(dim=1), -1))
        tag_space = self.hidden2tag(lstm_out.view(x.size(dim=0), x.size(dim=1), -1))
        tag_prob = F.softmax(tag_space, dim=2)
        return tag_prob.transpose(1,2)
model=CCM()
model.to(device)

In [None]:
model.load_state_dict(torch.load("/content/deep_model_20230411_121014_fold_4_epoch_0", map_location=device))

In [None]:
loss_fn = nn.CrossEntropyLoss()

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-8)

In [None]:
def train_one_epoch(epoch_index, tb_writer):
    running_loss = 0.
    last_loss = 0.
    # Here, we use enumerate(training_loader) instead of
    # iter(training_loader) so that we can track the batch
    # index and do some intra-epoch reporting
    for i, data in enumerate(training_loader):
        # Every data instance is an input + label pair
        inputs, labels = data

        # Zero your gradients for every batch!
        optimizer.zero_grad()

        # Make predictions for this batch
        outputs = model(inputs.to(device))

        # Compute the loss and its gradients
        loss = loss_fn(outputs, labels.to(device))
        loss.backward()

        # Adjust learning weights
        nn.utils.clip_grad_norm_(model.parameters(), 5)
        optimizer.step()

        # Gather data and report?
        if not torch.isnan(loss):
            running_loss += loss.item()
        else:
            running_loss += 10
        if i % 1000 == 999:

            last_loss = running_loss / 1000 # loss per batch
            print('  batch {} loss: {}'.format(i + 1, last_loss))
            tb_x = epoch_index * len(training_loader) + i + 1
            tb_writer.add_scalar('Loss/train', last_loss, tb_x)
            running_loss = 0.
            torch.cuda.empty_cache()
            mem = torch.cuda.mem_get_info(device)
            print('  CUDA Memory: free {} / total {}'.format(mem[0], mem[1]))
    return last_loss

In [None]:
%load_ext tensorboard

In [None]:
%tensorboard --logdir runs

In [None]:
!rm -rf runs/*

In [None]:
# Initializing in a separate cell so we can easily add more epochs to the same run?
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
writer = SummaryWriter('runs/error_detector_trainer_{}'.format(timestamp))
run_number = 0

batch_size=1
folds=5
epochs=1

kfold=KFold(n_splits=folds, shuffle=True)

best_vloss = 1_000_000.

for fold,(train_idx,test_idx) in enumerate(kfold.split(dataset)):
    print('FOLD {}:'.format(fold+1))
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_idx)
    training_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, num_workers=2, sampler=train_subsampler)
    validation_loader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, num_workers=2, sampler=test_subsampler)

    for epoch_number in range(epochs):
        # Make sure gradient tracking is on, and do a pass over the data
        model.train(True)
        avg_loss = train_one_epoch(fold*epochs+epoch_number, writer)

        # We don't need gradients on to do reporting
        model.train(False)

        running_vloss = 0.0
        for i, vdata in enumerate(validation_loader):
            vinputs, vlabels = vdata
            voutputs = model(vinputs.to(device))
            vloss = loss_fn(voutputs, vlabels.to(device))
            if not(torch.isnan(vloss)):
                running_vloss += vloss.item()

        avg_vloss = running_vloss / (i + 1)
        print('LOSS train {} valid {}'.format(avg_loss, avg_vloss))

        # Log the running loss averaged per batch
        # for both training and validation
        writer.add_scalars('Training vs. Validation Loss',
                        { 'Training' : avg_loss, 'Validation' : avg_vloss },
                        fold*epochs+epoch_number + 1)
        writer.flush()

        # Track best performance, and save the model's state
        if avg_vloss < best_vloss:
            best_vloss = avg_vloss
        model_path = 'deep_model_{}_fold_{}_epoch_{}'.format(timestamp, fold, epoch_number)
        g_model_path = '/content/drive/MyDrive/models/deep_model_{}_fold_{}_epoch_{}'.format(timestamp, fold, epoch_number)

        torch.save(model.state_dict(), model_path)
        torch.save(model.state_dict(), g_model_path)

In [None]:
tokenizer = Tokenizer(c_str='''
#include <stdio.h>
int printf();
int main(){
  for (int i=0; i<200; i++){
    printf("%d\\n", i);
  }
}''')
print(tokenizer.full_tokenize())

In [None]:
def get_tokens(s):
    tokenizer = Tokenizer(c_str=s)
    res = tokenizer.full_tokenize()[0]
    tag = [*res, ""][1:]
    return (res, tag)
def run_model(model, x):
    model.train(False)
    inputs = prepare_sequence(x, token_to_ix).to(device).view(1,-1)
    token_probs = model(inputs)
    return token_probs

In [None]:
def localise(model, x):
    p = run_model(model, x).transpose(0,2).view(len(x), 178)
    listCorrections = []
    for t in range(len(x)):
        xBtn = tokenlist[p[t].argmax()]
        try:
            if x[t+1]!=xBtn:
                listCorrections.append([p[t].argmin().item(), x[t], x[t+1], xBtn, t, t+1, t+2])
        except:
            continue
    listCorrections.sort(key=lambda x: x[0])
    return listCorrections
def feature_vectors(model, x):
    count = [0]*178
    for i in x:
        count[token_to_ix.get(i)]+=1
    loc = localise(model, x)
    v1 = v2 = v3 = [0]*178
    v1[token_to_ix.get(loc[0][1],0)]=loc[0][4]
    v2[token_to_ix.get(loc[0][2],0)]=loc[0][5]
    v3[token_to_ix.get(loc[0][3],0)]=loc[0][6]
    return v1+v2+v3+count
def probabalise(model, x):
    p = run_model(model, x).transpose(0,2).view(len(x), 178)
    #177,p(token)
    #batchno
    #len(string)-p(nexttoken)
    print(p.size())
    print(p[0].size())
    listCorrections = []
    y=[*x,0][1:]
    for t, pt in enumerate(p):
        # try:
        listCorrections.append((x[t], y[t], pt[token_to_ix[y[t]]].item(), pt[pt.argmax()].item(), tokenlist[pt.argmax()]))
        # except:
        #     pass
    return listCorrections

In [None]:
s="""
#include <stdio.h>

int ret(int x){
  return --x;
}

int main(){
  printf("Hello World!\n")
  int x = ret(1);
  return x;
}
"""
" ".join(Tokenizer(c_str=s).full_tokenize()[0])

In [None]:
probabalise(model, get_tokens(s)[0])

In [None]:
localise(model, get_tokens(s)[0])

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

y_test = np.array([])
y_preb_probs = np.array([])
y_pred = np.array([])
nb_classes = 178
head_dataset = CDataset('final.pckl',200)
data_loader = torch.utils.data.DataLoader(head_dataset, batch_size=1, num_workers=2)
confusion_matrix = np.zeros((nb_classes, nb_classes))
with torch.no_grad():
    for i, (inputs, classes) in tqdm(enumerate(data_loader), desc="data", total=len(dataset)):
        inputs = inputs.to(device)
        classes = classes.to(device)
        outputs = model(inputs)
        y_test = np.append(y_test, inputs.cpu().detach().numpy())
        y_preb_probs = np.append(y_preb_probs, outputs.cpu().detach().numpy().reshape)
        _, preds = torch.max(outputs, 1)
        y_pred = np.append(y_pred, preds.view(-1).cpu().detach().numpy())
        for t, p in tqdm(zip(classes.view(-1), preds.view(-1)), desc="tagging", leave=False):
                confusion_matrix[t.long(), p.long()] += 1

plt.figure(figsize=(30,30))
gmap = {"variable":0, "function":1, "keyword": 2, "alphabet": 3, "numbers": 4, "punctuation": 5}
gcm=np.zeros((6, 6))
for ix, i in enumerate(confusion_matrix):
    for jx, j in enumerate(i):
        gcm[gmap[get_group(ix)]][gmap[get_group(jx)]]+=j
class_names = ["variable", "function", "keyword", "alphabet", "numbers", "punctuation"]
df_cm = pd.DataFrame(gcm, index=class_names, columns=class_names).astype(int)
fig, ax = plt.subplots(figsize=(10,10))
sns.set(font_scale=1.3)
heatmap = sns.heatmap(df_cm, annot=True, fmt="d", cmap="Blues",ax=ax,annot_kws={'size': 20})


heatmap.yaxis.set_ticklabels(heatmap.yaxis.get_ticklabels(), rotation=0, ha='right',fontsize=20)
heatmap.xaxis.set_ticklabels(heatmap.xaxis.get_ticklabels(), rotation=45, ha='right',fontsize=20)
plt.ylabel('True label',fontsize=20)
plt.xlabel('Predicted label',fontsize=20)
plt.show()

In [None]:
plt.show()

In [None]:
%matplotlib inline

In [None]:
confusion_matrix

In [None]:
gcm

In [None]:
y_test.shape

In [None]:
y_preb_probs.shape

In [None]:
y_preb_probs=np.array([x/sum(x) for x in y_preb_probs.reshape((-1,178))])

In [None]:
sum(y_preb_probs[0])

In [None]:
y_pred.shape

In [None]:
from sklearn.metrics import roc_auc_score
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import matthews_corrcoef
from sklearn.metrics import log_loss
print(roc_auc_score(y_test, y_preb_probs, average="weighted", multi_class="ovr"))
print(cohen_kappa_score(y_test, y_pred))
print(matthews_corrcoef(y_test, y_pred))
#print(log_loss(y_test, y_preb_probs))

In [None]:
confusion_matrix
total = sum(sum(confusion_matrix))
correct = sum(np.diag(confusion_matrix))
accuracy = correct/total
print(f"Accuracy: {accuracy:.2%}")


In [None]:
# Calculate TP, TN, FP, FN for each class
TP = np.diag(confusion_matrix)
FP = np.sum(confusion_matrix, axis=0) - TP
FN = np.sum(confusion_matrix, axis=1) - TP

# Calculate metrics for each class
precision = np.zeros(nb_classes)
recall = np.zeros(nb_classes)
f1_score = np.zeros(nb_classes)

for i in range(nb_classes):
    if TP[i] == 0 and (FP[i] == 0 or FN[i] == 0):
        precision[i] = 0
        recall[i] = 0
    else:
        precision[i] = TP[i] / (TP[i] + FP[i])
        recall[i] = TP[i] / (TP[i] + FN[i])
    f1_score[i] = 2 * (precision[i] * recall[i]) / (precision[i] + recall[i]) if (precision[i] + recall[i]) > 0 else 0

# Print metrics in percentage
print("Accuracy: {:.2%}".format(accuracy))
print("Precision: {:.2%}".format(np.mean(precision)))
print("Recall: {:.2%}".format(np.mean(recall)))
print("F1-score: {:.2%}".format(np.mean(f1_score)))


In [None]:
import ipywidgets as widgets
slider = widgets.Textarea()
display(slider)

In [None]:
print(slider.value)

In [None]:
probabalise(model, get_tokens(slider.value)[0])

In [None]:
localise(model, get_tokens(slider.value)[0])

In [None]:
with open("vectors.pckl", 'rb') as f:
    vectors=pickle.load(f)

In [None]:
vectors=pd.DataFrame(vectors)

In [None]:
vectors

In [None]:
X = vectors.head(10000).apply(lambda x: x.vec1+x.vec2+x.vec3+x["count"], axis=1)
y = vectors.head(10000).op

In [None]:
clf = svm.SVC()
clf.fit(list(X), y)

In [None]:
y_pred = clf.predict(list(X))

In [None]:
from sklearn import metrics
accuracy = metrics.accuracy_score(y,y_pred)
accuracy
print("Accuracy: {:.2%}".format(accuracy))

In [None]:
with open("svc.pckl", 'rb') as f:
    clf = pickle.load(f)

In [None]:
X

In [None]:
y

In [None]:
with open("svc.pckl", 'wb') as f:
    pickle.dump(clf, f)

In [None]:
s="""
#include <stdio.h>

int ret(int x){
  return --x;
}

int main(){
  printf("Hello World!\n")
  int x = ret(1);
  return x;
}
"""
x=Tokenizer(c_str=s).full_tokenize()[0]
x

In [None]:
model.load_state_dict(torch.load("/content/drive/MyDrive/models/deep_model_20230411_121014_fold_4_epoch_0", map_location=device))
l= localise(model, x)[0]
fv= [feature_vectors(model, x)]
l

In [None]:
print(fv)

In [None]:
clf.predict(fv)