# Homework 1


## Import libraries and Mount drive

In [None]:
# utils
import os
from google.colab import drive
from tqdm import tqdm
import time

# process data
import csv
import pandas as pd

# plot
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import graphviz 

# counter class
from collections import Counter

# bag-of-words
from sklearn.feature_extraction.text import CountVectorizer, HashingVectorizer, TfidfVectorizer

# metrics
from sklearn.metrics import classification_report, plot_confusion_matrix, PrecisionRecallDisplay, accuracy_score, plot_roc_curve

# dataset splitter
from sklearn.model_selection import train_test_split

# models
from sklearn import svm, tree
from sklearn.naive_bayes import GaussianNB, BernoulliNB, MultinomialNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.linear_model import Perceptron
from sklearn.neural_network import MLPClassifier
from sklearn.ensemble import RandomForestClassifier

# preprocessing
from sklearn.preprocessing import StandardScaler

In [None]:
drive.mount('/content/drive')

path = "drive/My Drive/Homework_dataset"
os.chdir(path)

In [None]:
dataset_file = "mapping_traces_O0.csv"

# read_file = pd.read_csv(dataset_file, delimiter="\t")
# df = pd.DataFrame(read_file, columns=["instructions", "source_line", "bug"])
# df

## Dataset

In [None]:
class MyDataset():
    def __init__(self, 
                 data_file:str, 
                 words_vocabulary:dict, 
                 max_lenght_instructions=27,
                 max_lenght_source_lines=37,
                 blind=False,
                 vectorizer_name:str = "MyBoW") -> None:

        self.data_file = data_file
        self.words_vocabulary = words_vocabulary
        self.max_lenght_instructions = max_lenght_instructions
        self.max_lenght_source_lines = max_lenght_source_lines
        self.blind = blind
        self.vectorizer_name = vectorizer_name

        self.instructions, self.source_lines, self.labels = self.__load_data__()

        # create vectorized representation of the dataset
        if self.vectorizer_name == "MyBoW":
            self.X = self.__vectorize_data_MyBoW__()

            print("\nX shape: (", len(self.X), ",", len(self.X[0]), ")")
            if not self.blind:
                print("labels shape: ", self.labels.shape)

        elif self.vectorizer_name in ["TfidfVectorizer", "CountVectorizer", "HashingVectorizer"]:
            self.data = []

            for (ins, sl) in tqdm(zip(self.instructions, self.source_lines)):
                self.data.append(ins + sl)

            if self.vectorizer_name == "TfidfVectorizer":
                self.vectorizer = TfidfVectorizer(min_df=200, ngram_range=(1,2))

            elif self.vectorizer_name == "CountVectorizer":
                self.vectorizer = CountVectorizer(min_df=200, ngram_range=(1,2))

            elif self.vectorizer_name == "HashingVectorizer":
                self.vectorizer = HashingVectorizer(decode_error="ignore", n_features=2 ** 7, alternate_sign=False, ngram_range=(1,2))
            
            self.X = self.vectorizer.fit_transform(self.data)

            print("\nX shape: ", self.X.shape)
            if not self.blind:
                print("labels shape: ", self.labels.shape)

        else:
            print("Wrong bag-of-word. Choose from the following: \"MyBoW\" (default), \"TfidfVectorizer\", \"CountVectorizer\", \"HashingVectorizer\"") 

        # split dataset into train and test
        if not self.blind:
            self.X_train, self.X_test, self.labels_train, self.labels_test = train_test_split(self.X, self.labels, test_size=0.2, random_state=0)
        else:
            self.X_test = self.X


    def __load_data__(self):
        reader = pd.read_csv(self.data_file, delimiter="\t")
        df = pd.DataFrame(reader, columns=["instructions", "source_line", "bug"])

        if not self.blind:
            labels = df.bug
        else:
            labels = []

        return df.instructions, df.source_line, labels
        

    def __vectorize_data_MyBoW__(self):
        X = []

        for (ins, sl) in tqdm(zip(self.instructions, self.source_lines)):

            instruction = ins.strip().split()
            source_line = sl.strip().split()

            temp_instructions = []
            temp_source_lines = []

            # take the index that corresponds to each word from the words vocabulary
            for elem in instruction:
                elem = elem.replace('"', "")
                if elem in self.words_vocabulary:
                    temp_instructions.append(self.words_vocabulary[elem])
                else:
                    temp_instructions.append(self.words_vocabulary["<unk>"])

            for elem in source_line:
                elem = elem.replace('"', "")
                if elem in self.words_vocabulary:
                    temp_source_lines.append(self.words_vocabulary[elem])
                else:
                    temp_source_lines.append(self.words_vocabulary["<unk>"])

            # padding vectors
            temp_instructions += [0]*(self.max_lenght_instructions - len(temp_instructions))
            temp_source_lines += [0]*(self.max_lenght_source_lines - len(temp_source_lines))

            # trunc vectors
            temp_instructions = temp_instructions[:self.max_lenght_instructions]
            temp_source_lines = temp_source_lines[:self.max_lenght_source_lines]

            X.append(temp_instructions + temp_source_lines)

        return X


    # def compress(self, vector, lenght):
    #     compressed_vector = []
    #     div = len(vector)//lenght
    #     init = 0

    #     if div == 0:
    #         compressed_vector += vector
    #         compressed_vector += [0]*(lenght - len(vector))
    #     else:
    #         for i in range(lenght-1):
    #             compressed_vector.append(np.mean(vector[init:init+div]))
    #             init += div
    #         if init <= len(vector):
    #             compressed_vector.append(np.mean(vector[init:]))
                
    #     return compressed_vector

    def dataset_info(self):
        if self.vectorizer_name == "MyBoW":
            print("\nSize of training set: %d" %len(self.X_train))
            print("Size of test set: %d" %len(self.X_test))

            print('First training sample')
            id = 0
            print("    x_train_%d = %r" %(id,self.X_train[id]))
            print("    y_train_%d = %r" %(id,list(self.labels_train)[id]))

            print('First test sample')
            id = 0
            print("    x_test_%d = %r" %(id,self.X_test[id]))
            print("    y_test_%d = %r\n" %(id,list(self.labels_test)[id]))
        else:
            print("\nSize of training set: ", self.X_train.shape)
            print("Size of test set: ", self.X_test.shape)

            print('First training sample')
            id = 0
            print("    x_train_%d = %r" %(id,self.X_train[id].todense()))
            print("    y_train_%d = %r" %(id,list(self.labels_train)[id]))

            print('First test sample')
            id = 0
            print("    x_test_%d = %r" %(id,self.X_test[id].todense()))
            print("    y_test_%d = %r\n" %(id,list(self.labels_test)[id]))


### Analysis

In [None]:
# compute instructions and source_code average lenght
# in order to find the better lenght for MyBoW vectors

def average_len(dataset_file):
    sum_instructions = 0
    sum_source_lines = 0
    list_len_instructions = []
    list_len_source_lines = []

    reader = pd.read_csv(dataset_file, delimiter="\t")
    df = pd.DataFrame(reader, columns=["instructions", "source_line", "bug"])

    counter = Counter()

    for (ins, sl) in tqdm(zip(df.instructions, df.source_line)):
        ins = ins.strip().split()
        s_l = sl.strip().split()

        list_len_instructions.append(len(ins))
        sum_instructions += len(ins)
        
        list_len_source_lines.append(len(s_l))
        sum_source_lines += len(s_l)

    len_instructions = int(sum_instructions/len(reader))
    len_source_lines = int(sum_source_lines/len(reader))

    print("\nAverage len instructions: ", len_instructions)
    print("Average len source lines: ", len_source_lines, "\n")

    return list_len_instructions, list_len_source_lines

ins, s_l = average_len(dataset_file)

plt.hist(s_l, bins = 100, range= (0, 100))
#plt.savefig('histogram_zoom.png')

### Words vocaulary for BoW

In [None]:
def words_vocabulary(file_name, min_freq=100):
    reader = pd.read_csv(file_name, delimiter="\t")
    df = pd.DataFrame(reader, columns=["instructions", "source_line", "bug"])

    counter = Counter()

    for (ins, sl) in tqdm(zip(df.instructions, df.source_line)):
        instruction = ins.replace('"', "").strip().split()
        source_line = sl.replace('"', "").strip().split()

        for token in instruction:
            counter[token] += 1

        for token in source_line:
            counter[token] += 1

    counter = counter.most_common()

    words_vocab = {}
    for i, elem in enumerate(counter):
        if elem[1]>=min_freq:
            words_vocab.update({elem[0]:i+2})
    
    words_vocab.update({"<pad>":0})
    words_vocab.update({"<unk>":1})

    return words_vocab

words_vocab = words_vocabulary(dataset_file)
print("\nlenght: ", len(words_vocab))
print("\nwords vocab: ", words_vocab)

### Dataset

In [None]:
# create the dataset
# bow = "MyBoW" (default), "TfidfVectorizer", "CountVectorizer", "HashingVectorizer"
dataset = MyDataset(data_file = dataset_file, words_vocabulary = words_vocab, vectorizer_name = "TfidfVectorizer")

# print dataset info
dataset.dataset_info()

## Model

### Choose model

In [None]:
models = {
    "tree": tree.DecisionTreeClassifier(min_samples_split=12, random_state=0, class_weight="balanced"),
    "forest": RandomForestClassifier(n_estimators = 100, min_samples_split=10, n_jobs=-1, verbose=1, random_state=0, class_weight="balanced"),
    "svm": svm.SVC(max_iter = 10000),
    "linear-svm": svm.LinearSVC(max_iter = 10000, random_state=0, class_weight = "balanced"),
    "gaussian": GaussianNB(),
    "bernoulli": BernoulliNB(),
    "multinomial": MultinomialNB(alpha=0.1),
    "kneighbors": KNeighborsClassifier(n_neighbors=5, n_jobs=-1),
    "logistic-regression": LogisticRegression(max_iter=10000, verbose = 1, n_jobs = -1, random_state=0, class_weight="balanced"),
    "perceptron": Perceptron(early_stopping = True, verbose=3, max_iter=200, n_iter_no_change=5, random_state=0, class_weight="balanced"),
    "mlp": MLPClassifier(hidden_layer_sizes = (1), early_stopping = True, max_iter = 200, verbose=3, n_iter_no_change=5, random_state=0)
}

model_name = "linear-svm"
model = models[model_name]
model_name = model_name + "_" + dataset.vectorizer_name

### Train model

In [None]:
# scaler = StandardScaler()
# scaler.fit(dataset.X_train.toarray())
# data = scaler.transform(dataset.X_train.toarray())
# print(data)
# model.fit(data, dataset.labels_train)

model.fit(dataset.X_train, dataset.labels_train)

### Test model

In [None]:
# make predictions of the test and print classification report
y_pred = model.predict(dataset.X_test)

cr = classification_report(dataset.labels_test, y_pred, labels=None, target_names = ["correct", "bugged"], output_dict=True)
# print(classification_report(dataset.labels_train, y_pred, labels=None, target_names = ["correct", "bugged"], digits=4))

df = pd.DataFrame(cr).transpose()
print(df.to_latex())
df

In [None]:
# confusion matrix

plt.rcParams["figure.figsize"] = (7,7)
plot_confusion_matrix(model, dataset.X_test, dataset.labels_test, display_labels = ["correct", "bugged"], normalize="true")

path = "images/cm_" + model_name + ".png"
plt.savefig(path)

In [None]:
# tree plot

# dot_data = tree.export_graphviz(model, out_file=None,  
#                                 class_names=["correct", "incorrect"],  
#                                 filled=True, rounded=True,  
#                                 special_characters=True)  
# graph = graphviz.Source(dot_data)  
# graph.render("my_tree") 

In [None]:
# roc curve
y_score = model.predict_proba(dataset.X_test)

svc_disp = plot_roc_curve(model, dataset.X_test, dataset.labels_test)
plt.show()

In [None]:
# total plot

accuracy = {}
times = {}
for model in tqdm(models):
    print("Model: " + model)
    m = models[model]
    tick = time.time()
    if model == "gaussian":
        m.fit(dataset.X_train.toarray(), dataset.labels_train)
        times[model] = time.time() - tick
        pred = m.predict(dataset.X_test.toarray())
        accuracy[model] = accuracy_score(dataset.labels_test, pred)
    else:
        m.fit(dataset.X_train, dataset.labels_train)
        times[model] = time.time() - tick
        pred = m.predict(dataset.X_test)
        accuracy[model] = accuracy_score(dataset.labels_test, pred)

def autolabel(rectangles):
    """attach some text vi autolabel on rectangles."""
    for rect in rectangles:
        height = rect.get_height()
        ax.text(
            rect.get_x() + rect.get_width() / 2.0,
            1.05 * height,
            "%.4f" % height,
            ha="center",
            va="bottom",
        )
        plt.setp(plt.xticks(x_pos, keys)[1], rotation=45)

keys = list(accuracy.keys())
acc = list(accuracy.values())

f, ax = plt.subplots(figsize=(18,5))
bar_colors = ["limegreen", "greenyellow","gold", "orange", "coral","hotpink", "violet", "darkviolet", "mediumblue", "deepskyblue", "turquoise"]
x_pos = [0,4,8,12,16,20,24,28,32, 36, 40]
plt.subplots_adjust(bottom=0.15)
rectangles = plt.bar(x_pos, acc, width=2, color = bar_colors)
autolabel(rectangles)


plt.savefig("images/accuracy_my")

f, ax = plt.subplots(figsize=(18,5))
x_pos = [0,4,8,12,16,20,24,28,32, 36, 40]
bar_colors = ["limegreen", "greenyellow","gold", "orange", "coral","hotpink", "violet", "darkviolet", "mediumblue", "turquoise"]
t = list(times.values())
rectangles = plt.bar(x_pos, t, width=2, color = bar_colors)
plt.subplots_adjust(bottom=0.15)
autolabel(rectangles)
plt.savefig("images/times_count_my")

In [None]:
# precision recall plot
PrecisionRecallDisplay.from_estimator(model, dataset.X_test, dataset.labels_test, name=model_name)


path = "images/pdr_" + model_name + ".png"
plt.savefig(path)
plt.show()

## Blind Test

In [None]:
blind_test_file = "blind_test.csv"

# create the dataset 
blind_dataset = MyDataset(blind_test_file, words_vocabulary = words_vocab, bow = "MyBoW", blind = True)

# predict the blind test
y_pred_blind = model.predict(blind_dataset.X_test)


# save prediction on file
# with open("1834906.txt", "w+") as f:
#     for p in y_pred_blind:
#         f.write(p + "\n")