In [None]:
import pickle as pkl
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import json
import re
import datetime
import os

In [None]:
DATA_FOLDER = "../../data"

### Import

In [None]:
def load_data(dir_path=DATA_FOLDER):
    """Load data from pickle files

    Keyword arguments:
    file_path -- path to pickle files
    Return: dicts for each file
    """

    with open(dir_path + "/output_data_features_class", "rb") as f:
        features_class = pkl.load(f)
    with open(dir_path + "/output_data_features_num", "rb") as f:
        features_num = pkl.load(f)
    with open(dir_path + "/output_data_votes_class", "rb") as f:
        votes_class = pkl.load(f)
    with open(dir_path + "/output_data_votes_num", "rb") as f:
        votes_num = pkl.load(f)

    return features_class, features_num, votes_class, votes_num


In [None]:
fc, fn, vc, vn = load_data("../../data/")
with open("../../data/kev.json", "r") as f:
    kev = json.load(f)['vulnerabilities']
kev_set = {i["cveID"] for i in kev}
with open("../../data/users.json", "r") as f:
  users = json.load(f)

df_interns = pd.read_csv("../../data/interns.csv", names=["Nome", "email",  "team", "project"])
df_mentors = pd.read_csv("../../data/mentors.csv", names=["Nome", "email",  "team"])
df_team = pd.read_csv("../../data/crivo_team.csv", names=["Nome", "email",  "team", "project"])
df_interns["role"] = "intern"
df_mentors["role"] = "mentor"

df_interns = df_interns[["Nome", "email", "role"]]
df_mentors = df_mentors[["Nome", "email", "role"]]
df_users = pd.concat([df_interns, df_mentors, df_team], axis=0).copy().reset_index(drop=True)
df_users = df_users.astype({"Nome": "string", "email": "string", "role": "string"})

cat_users = users["crivo"]
num_users = users["crivo-num"]
dev_users = users["crivo-dev"]

In [None]:
df_cred_cat = pd.read_csv("../../data/cred_cat.csv", names=["Nome", "email", "pass"], skipinitialspace=True)[["email", "pass"]]
df_cred_cat = df_cred_cat.astype({"email": "string", "pass": "string"})

df_cred_num = pd.read_csv("../../data/cred_num.csv", names=["Nome", "email", "pass"], skipinitialspace=True)[["email", "pass"]]
df_cred_num = df_cred_num.astype({"email": "string", "pass": "string"})

df_cred_cat["email"].head(40)

In [None]:
cat_users = users["crivo"]
cat_users = {str(v): k for k, v in cat_users.items()}
cat_users_df = pd.DataFrame(list(zip(cat_users.keys(), cat_users.values())), columns=["email", "user_id"]).convert_dtypes()
cat_users_df = cat_users_df.merge(df_users, on="email", how="left")
cat_users_df = cat_users_df.merge(df_cred_cat, on="email", how="left")
cat_users_df = cat_users_df[["user_id", "Nome", "email", "pass", "role"]]
cat_users_df = cat_users_df.astype({"user_id": "int64", "email": "string", "role": "string"})
cat_users_df[["user_id", "Nome", "email", "role"]].head(40)

In [None]:
num_users = users["crivo-num"]
num_users = {str(v): k for k, v in num_users.items()}
num_users_df = pd.DataFrame(list(zip(num_users.keys(), num_users.values())), columns=["email", "user_id"]).convert_dtypes()
num_users_df = num_users_df.merge(df_users, on="email", how="left")
num_users_df = num_users_df.merge(df_cred_num, on="email", how="left")
num_users_df = num_users_df[["user_id", "Nome", "email", "role", "pass" ]]
num_users_df = num_users_df.astype({"user_id": "int64", "Nome": "string", "email": "string", "role": "string"})
num_users_df[["user_id", "Nome", "email", "role"]].head(40)

### Exploratory Data Analysis

In [None]:
def get_cvelist(text):
    pattern = r"^\*\*CVEs\*\*: (.*)$"
    match = re.search(pattern, text, re.MULTILINE)
    if match:
        cve_list = match.group(1).split(', ')
        if cve_list == ['']:
            return None
        else:
            return list(set(cve_list))
    else:
        return None

In [None]:
IN_KEV = 1
HAS_CVE = 2
NO_CVE = 3

def process_finding_df(iter):
  df = pd.DataFrame(iter)
  df = df[["id","title","description"]]
  df["id"] = df["id"] - 1
  df["cves"] = df["description"].apply(get_cvelist)
  df["class"] = df["cves"].apply(lambda cve_list: NO_CVE if not cve_list else IN_KEV if any(cve in kev_set for cve in cve_list) else HAS_CVE)
  return df

df_fn = process_finding_df(fn)
df_fc = process_finding_df(fc)

In [None]:
df_fn.head(2)

In [None]:
df_fc.head(2)

In [None]:
# NUMERIC VOTES
df_vn = pd.DataFrame(vn)

# Converting column to numeric
df_vn["vote"] = pd.to_numeric(df_vn["vote_num"])

# Renaming ambiguous columns
df_vn = df_vn.rename(columns={"id": "finding_id"})

# 0-based indexing for findings
df_vn["finding_id"] = df_vn["finding_id"] - 1

# Adding finding class between IN_KEV, HAS_CVE & NO_CVE
df_vn["finding_class"] = df_vn.join(df_fn.set_index("id"), on="finding_id")["class"]

# Converting timestamp to pandas datetime
df_vn["timestamp"] = pd.to_datetime(df_vn["timestamp"], format="ISO8601")

# Setting up avg related metrics
df_vn["avg"] = df_vn["finding_id"].map(df_vn.groupby("finding_id").agg({"vote": "mean"})["vote"])
df_vn["avg_error"] = abs(df_vn["vote"] - df_vn["avg"])

# Setting up title variable
df_vn["finding_title"] = pd.merge(df_vn, df_fn[["id", "title"]], left_on="finding_id", right_on="id")['title']


# Setting up variance related metrics
df_vn["var"] = df_vn["finding_id"].map(df_vn.groupby("finding_id").agg({"vote": "var"})["vote"])
df_vn["var_percentile"] = df_vn["finding_id"].map(df_vn.groupby("finding_id").agg({"vote": "var"})["vote"].rank(pct=True))

df_vn.head(10)

In [None]:
# CLASS VOTES
df_vc = pd.DataFrame(vc)

class_map = {
    "Mild": 1,
    "Moderate": 2,
    "Severe": 3,
    "Critical": 4,
}

# Converting classes to numeric
# class_map = {class_name: index+1 for index, class_name in enumerate(df_vc["vote_class"].unique())}
df_vc["vote"] = df_vc["vote_class"].map(class_map)

# Renaming ambiguous columns
df_vc = df_vc.rename(columns={"id": "finding_id"})

# 0-based indexing for findings
df_vc["finding_id"] = df_vc["finding_id"] - 1

# Adding finding class between IN_KEV, HAS_CVE & NO_CVE
df_vc["finding_class"] = df_vc.join(df_fc.set_index("id"), on="finding_id")["class"]

# Converting timestamp to pandas datetime
df_vc["timestamp"] = pd.to_datetime(df_vc["timestamp"], format="ISO8601")

# Setting up avg related metrics
df_vc["avg"] = df_vc["finding_id"].map(df_vc.groupby("finding_id").agg({"vote": "mean"})["vote"])
df_vc["avg_error"] = abs(df_vc["vote"] - df_vc["avg"])

# Setting up title variable
df_vc["finding_title"] = pd.merge(df_vc, df_fc[["id", "title"]], left_on="finding_id", right_on="id")["title"]

# Setting up variance related metrics
df_vc["var"] = df_vc["finding_id"].map(df_vc.groupby(by="finding_id").agg({"vote": "var"})["vote"])
df_vc["var_percentile"] = df_vc["finding_id"].map(df_vc.groupby("finding_id").agg({"vote": "var"})["vote"].rank(pct=True))

df_vc.head(10)

In [None]:
df_vc[df_vc["user_id"] == 15]

In [None]:

def plot_votes_vs_mean(user, votes_user: pd.DataFrame, usermail, filename, categorical=False):

    x = np.array(votes_user["avg"])
    y = np.array(votes_user["vote"])
    a, b = np.polyfit(x, y, 1)

    y_hat = a * x + b
    y_mean = np.mean(y)

    ss_res = np.sum((y - y_hat)**2)
    ss_tot = np.sum((y - y_mean)**2)
    r2 = 1 - (ss_res / ss_tot)


    if categorical:
      plot_x = np.array([-0.5, 4])
      plot_y = a * plot_x + b
      plt.figure(figsize=(9, 6))
      # todo-> change shapes
      plt.scatter(votes_user[votes_user["finding_class"] == IN_KEV]["avg"], votes_user[votes_user["finding_class"] == IN_KEV]["vote"], color="coral", label="Vulnerabilidades com CVE no KEV", marker="v")
      plt.scatter(votes_user[votes_user["finding_class"] == HAS_CVE]["avg"], votes_user[votes_user["finding_class"] == HAS_CVE]["vote"], color="yellowgreen", label="Vulnerabilidades com CVEs", marker="o")
      plt.scatter(votes_user[votes_user["finding_class"] == NO_CVE]["avg"], votes_user[votes_user["finding_class"] == NO_CVE]["vote"], color="cornflowerblue", label="Vulnerabilidades sem CVEs", marker="s")
      plt.plot(plot_x, plot_y, color="red", label=f"Regressão Linear (R2 = {r2:.2f})")
      plt.legend()
      plt.grid(True, linestyle=":", linewidth=0.5, color='black', alpha=0.2)
      # plt.title(f"Votos do Usuário vs Média de Votos")
      plt.xlim(0.8,4.2)
      plt.ylim(0.8,4.2)
      plt.yticks([1, 2, 3, 4], [ "Mild", "Moderate", "Severe", "Critical"])
      plt.xticks([1.375, 2.125, 2.875, 3.625], [ "Mild", "Moderate", "Severe", "Critical"])
      plt.ylabel("Análises do Residente")
      plt.xlabel("Média das Análises de Todos os Residentes")

      categorical_dir = "../../data/votebymean/categorical/"
      if not os.path.exists(categorical_dir):
        os.makedirs(categorical_dir, exist_ok=True)
      plt.savefig(f"{categorical_dir}{filename}")

      user_dir = f"../../data/byuser/{usermail}/"
      if not os.path.exists(user_dir):
        os.makedirs(user_dir, exist_ok=True)
      plt.savefig(f"{user_dir}/{filename}")

    else:

      plot_x = np.array([-1, 10])
      plot_y = a * plot_x + b
      plt.figure(figsize=(9, 6))
      # todo-> change shapes
      plt.scatter(votes_user[votes_user["finding_class"] == IN_KEV]["avg"], votes_user[votes_user["finding_class"] == IN_KEV]["vote"], color="coral", label="Vulnerabilidades com CVE no KEV", marker="v")
      plt.scatter(votes_user[votes_user["finding_class"] == HAS_CVE]["avg"], votes_user[votes_user["finding_class"] == HAS_CVE]["vote"], color="yellowgreen", label="Vulnerabilidades com CVEs", marker="o")
      plt.scatter(votes_user[votes_user["finding_class"] == NO_CVE]["avg"], votes_user[votes_user["finding_class"] == NO_CVE]["vote"], color="cornflowerblue", label="Vulnerabilidades sem CVEs", marker="s")
      plt.plot(plot_x, plot_y, color="red", label=f"Regressão Linear (R2 = {r2:.2f})")
      plt.legend()
      plt.grid(True, linestyle=":", linewidth=0.5, color='black', alpha=0.2)
      # plt.title(f"Votos do Usuário vs Média de Votos")
      plt.xlim(-1,11)
      plt.ylim(-1,11)
      plt.yticks(np.arange(0, 11, 1))
      plt.xticks(np.arange(0, 11, 1))
      plt.ylabel("Análises do Residente")
      plt.xlabel("Média das Análises de Todos os Residentes")

      numerical_dir = "../../data/votebymean/numerical/"
      if not os.path.exists(numerical_dir):
        os.makedirs(numerical_dir, exist_ok=True)
      plt.savefig(f"{numerical_dir}{filename}.png")

      user_dir = f"../../data/byuser/{usermail}/"
      if not os.path.exists(user_dir):
        os.makedirs(user_dir, exist_ok=True)
      plt.savefig(f"{user_dir}/{filename}")

    plt.close()

def plot_rank_error_time_series(intern, votes_intern, x_ticks=False):
    votes_intern.sort_values(by="timestamp", ascending=True, inplace=True)

    plt.figure(figsize=(12, 8))
    plt.scatter(votes_intern["timestamp"], votes_intern["avg_error"])

    plt.ylim(-0.5, 11)
    plt.yticks(np.arange(0, 11, 1))
    plt.ylabel("Erro absoluto em relação a média")
    plt.grid(True, linestyle=":", linewidth=0.5, color='black', alpha=0.2)
    plt.title("Série temporal do erro em relação ao valor médio do voto")

    if x_ticks:
        delta_timestamp = votes_intern["timestamp"].max() - votes_intern["timestamp"].min()

        if delta_timestamp < datetime.timedelta(minutes=1):
          plt.gca().xaxis.set_major_locator(mdates.SecondLocator(bysecond=range(0, 60, 2)))
          plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%d/%m\n%H:%M:%S"))
        elif delta_timestamp < datetime.timedelta(hours=0.5):
          plt.gca().xaxis.set_major_locator(mdates.MinuteLocator(interval=1))
          plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%d/%m\n%H:%M:%S"))
        elif delta_timestamp < datetime.timedelta(hours=1):
          plt.gca().xaxis.set_major_locator(mdates.MinuteLocator(interval=5))
          plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%d/%m\n%H:%M:%S"))
        elif delta_timestamp < datetime.timedelta(hours=3):
          plt.gca().xaxis.set_major_locator(mdates.MinuteLocator(interval=10))
          plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%d/%m\n%H:%M:%S"))
        elif delta_timestamp < datetime.timedelta(hours=6):
          plt.gca().xaxis.set_major_locator(mdates.HourLocator(interval=1))
          plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%d/%m\n%H:%M:%S"))
        elif delta_timestamp < datetime.timedelta(days=1):
          plt.gca().xaxis.set_major_locator(mdates.HourLocator(interval=4))
          plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%d/%m\n%H:%M"))
        elif delta_timestamp < datetime.timedelta(days=7):
          plt.gca().xaxis.set_major_locator(mdates.DayLocator(interval=1))
          plt.gca().xaxis.set_major_formatter(mdates.DateFormatter("%d/%m"))
    else:
        plt.gca().xaxis.set_ticks([])

        plt.xlabel("Tempo")
    plt.savefig(f"../../data/vote_error_timeseries/{intern}.png", )

def generate_html_data(user, df_user, categorical=False):
    top3_highest_diff = df_user.sort_values(by="avg_error", ascending=False).iloc[:3]
    top3_highest_diff = top3_highest_diff[["finding_id", "avg", "vote", "avg_error", "var", "finding_title", "var_percentile"]].rename(columns={"finding_id": "id", "avg": "avg_rank", "vote": "user_rank", "finding_title": "title"})
    # round values
    top3_highest_diff["avg_rank"] = top3_highest_diff["avg_rank"].round(2)
    top3_highest_diff["user_rank"] = top3_highest_diff["user_rank"].round(2)
    top3_highest_diff["avg_error"] = top3_highest_diff["avg_error"].round(2)
    top3_highest_diff["var_percentile"] = top3_highest_diff["var_percentile"].round(2) * 100
    top3_highest_diff["title"] = top3_highest_diff["title"].astype("string")

    hightlight_vuln = top3_highest_diff.to_dict('records')

    final_vulns = []
    for vuln in hightlight_vuln:
      if not (vuln["avg_rank"] - vuln["var"]**0.5 <= vuln["user_rank"] <= vuln["avg_rank"] - vuln["var"]**0.5):
        final_vulns.append(vuln)


    html_data = {
    "highlight_vuln": final_vulns,
    "is_empty": False,
    "is_categorical": categorical
  }

    return html_data

In [None]:
from email_generator import generate_html

In [None]:
cat_users_df

In [None]:
df_users.sort_values(by="Nome")

In [None]:
import shutil

def generate_user(user: int, is_categorical: bool):
  html_data = {}

  if not is_categorical:
    df_user = df_vn[df_vn["user_id"] == user].copy()
    user_name = num_users_df[num_users_df["user_id"] == user]["Nome"].values[0].split()[0]
    user_mail = num_users_df[num_users_df["user_id"] == user]["email"].values[0]
    user_pass = num_users_df[num_users_df["user_id"] == user]["pass"].values[0]
  else:
    df_user = df_vc[df_vc["user_id"] == user].copy()
    user_name = cat_users_df[cat_users_df["user_id"] == user]["Nome"].values[0].split()[0]
    user_mail = cat_users_df[cat_users_df["user_id"] == user]["email"].values[0]
    user_pass = cat_users_df[cat_users_df["user_id"] == user]["pass"].values[0]

  html_data["intern_name"] = user_name

  if len(df_user) < 11:
    html_data["is_empty"] = True
    html_data["EMAIL_INTERN"] = user_mail
    html_data["SENHA_INTERN"] = user_pass
    html_data["is_email"] = True
    generate_html(html_data, usermail=user_mail, filename="email.html", categorical=is_categorical)
    html_data["is_email"] = False
    generate_html(html_data, usermail=user_mail, filename="index.html", categorical=is_categorical)
    return

  #1 - Scatterplot with one point per vulnerability (x = average rank, y = intern’s rank)
  plot_votes_vs_mean(user, df_user, user_mail, "scatterplot.png", is_categorical)

  #3 - Sort rankings by time, then plot average error for each ranking (x = vulnerability ranking order by time; y = |rank(intern) - rank(average)|)
  # plot_rank_error_time_series(user, df_user, x_ticks=True)

  #2 - Get 3 vulnerabilities with the biggest difference between average and intern's rank
  html_data.update(generate_html_data(user, df_user, categorical=is_categorical))

  html_data["is_email"] = True
  generate_html(html_data, usermail=user_mail, filename="email.html", categorical=is_categorical)
  html_data["is_email"] = False
  generate_html(html_data, usermail=user_mail, filename="index.html", categorical=is_categorical)

  src = f"../../data/importance/feat_importance_user_{user_mail}_2x1.png"
  dst_dir = f"../../data/byuser/{user_mail}/"
  dst = os.path.join(dst_dir, "importance.png")
  if os.path.exists(src):
    if not os.path.exists(dst_dir):
      os.makedirs(dst_dir, exist_ok=True)
    shutil.copy(src, dst)

In [None]:
for user in num_users_df["user_id"].unique():
  generate_user(user, False)


In [None]:
for user in cat_users_df["user_id"].unique():
  generate_user(user, True)
