In [None]:
%autosave 0

In [None]:
import os
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
tqdm.pandas()
import json

from transformers import RobertaModel, AutoTokenizer

from sklearn.metrics import precision_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_curve
from sklearn.metrics import auc
from sklearn.multiclass import OneVsRestClassifier
from sklearn import svm

from itertools import cycle

from matplotlib import pyplot as plt

import math
import torch
import random

In [None]:
np.random.seed(42)
torch.manual_seed(0)
random.seed(0)

In [None]:
TRAIN = "prepared_data/train.csv"
TEST = "prepared_data/test.csv"

In [None]:
train = pd.read_csv(TRAIN)

In [None]:
train

In [None]:
train.is_uart.sum()

In [None]:
train.is_i2c.sum()

In [None]:
train.is_none.sum()

In [None]:
train.is_spi.sum()

In [None]:
test = pd.read_csv(TEST)
test

In [None]:
test.is_uart.sum()

In [None]:
test.is_i2c.sum()

In [None]:
test.is_none.sum()

In [None]:
test.is_spi.sum()

# load models

In [None]:
MODEL_PATH = "models/codebert_all_updated8oct/checkpoint-150"
model = RobertaModel.from_pretrained(MODEL_PATH)
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)

In [None]:
# sample = train.iloc[0].features
# sample

In [None]:
# model_input = tokenizer(sample, return_tensors="pt", truncation=True, max_length=512, padding='max_length')

In [None]:
# with torch.no_grad():
#     # print(model(**model_input).pooler_output.numpy().tolist()[0])

# train

In [None]:
def get_labels(df):
    is_spi = df.is_spi.values
    is_uart = df.is_uart.values
    is_i2c = df.is_i2c.values
    is_none = df.is_none.values
    return np.column_stack((is_uart, is_spi, is_i2c, is_none))

def convert_to_vectors(x):
    model_input = tokenizer(x, return_tensors="pt", truncation=True, max_length=512, padding='max_length')
    with torch.no_grad():
        return model(**model_input).pooler_output.numpy().tolist()[0]

In [None]:
train_labels = get_labels(train)

In [None]:
train_features_raw = train.features.progress_apply(lambda x: convert_to_vectors(x))
train_features = np.vstack(train_features_raw.values)

In [None]:
train_features.shape, train_labels.shape

In [None]:
test_labels = get_labels(test)

In [None]:
test_features_raw = test.features.progress_apply(lambda x: convert_to_vectors(x))
test_features = np.vstack(test_features_raw.values)

In [None]:
test_features.shape, test_labels.shape

In [None]:
classifier = OneVsRestClassifier(
    svm.SVC(kernel="linear", probability=True, random_state=0)
)

In [None]:
y_score = classifier.fit(train_features, train_labels).decision_function(test_features)

In [None]:
y_score.shape

In [None]:
def compute_ROC(y_scores, y_labels, n_classes):
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_labels[:, i], y_scores[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
    return fpr, tpr, roc_auc

In [None]:
lw = 2
n_classes=4
fpr, tpr, roc_auc = compute_ROC(y_score, test_labels, 4)

In [None]:
# First aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))

# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
    mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])

# Finally average it and compute AUC
mean_tpr /= n_classes

fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

# Plot all ROC curves
plt.figure(figsize=(10,10))
plt.plot(
    fpr["macro"],
    tpr["macro"],
    label="average ROC curve (area = {0:0.2f})".format(roc_auc["macro"]),
    color="navy",
    linestyle=":",
    linewidth=4,
)

colors = cycle(["maroon", "darkorange", "darkviolet", 'darkslategray'])
for i, color in zip(range(n_classes), colors):
    plt.plot(
        fpr[i],
        tpr[i],
        color=color,
        lw=lw,
        label="ROC curve of class {0} (area = {1:0.2f})".format(i, roc_auc[i]),
    )

plt.plot([0, 1], [0, 1], "k--", lw=lw)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel("False Positive Rate", fontsize=20)
plt.ylabel("True Positive Rate", fontsize = 20)
# plt.title("SVM Results")
plt.legend(loc="lower right")
plt.show()

# classify real world queries

In [None]:
json_path = "prepared_data/real_world_queries_input.json"

In [None]:
with open(json_path, 'r') as f:
    queries = json.load(f)
    
queries = pd.DataFrame(queries)

In [None]:
test_labels = np.vstack(queries.label.values)

In [None]:
test_features_raw = queries.feature.progress_apply(lambda x: convert_to_vectors(x))
test_features = np.vstack(test_features_raw.values)

In [None]:
test_features.shape, test_labels.shape

In [None]:
y_score = classifier.decision_function(test_features)

In [None]:
n_labels = 4
lw = 2
fpr, tpr, roc_auc = compute_ROC(y_score, test_labels, 4)

In [None]:
# First aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))

# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
    mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])

# Finally average it and compute AUC
mean_tpr /= n_classes

fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])

# Plot all ROC curves
plt.figure(figsize=(10,10))
plt.plot(
    fpr["macro"],
    tpr["macro"],
    label="average ROC curve (area = {0:0.2f})".format(roc_auc["macro"]),
    color="navy",
    linestyle=":",
    linewidth=4,
)

colors = cycle(["maroon", "darkorange", "darkviolet", 'darkslategray'])
for i, color in zip(range(n_classes), colors):
    plt.plot(
        fpr[i],
        tpr[i],
        color=color,
        lw=lw,
        label="ROC curve of class {0} (area = {1:0.2f})".format(i, roc_auc[i]),
    )

plt.plot([0, 1], [0, 1], "k--", lw=lw)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel("False Positive Rate", fontsize=20)
plt.ylabel("True Positive Rate", fontsize = 20)
# plt.title("SVM Results")
plt.legend(loc="lower right")
plt.show()