## CVE Analysis Engine

### Global Setup

In [None]:
import pandas as pd
import numpy as np
import cvss
import cvss.exceptions
import nltk

nltk.download("stopwords", quiet=True, raise_on_error=True)
nltk.download("wordnet", quiet=True, raise_on_error=True)


In [None]:
df = pd.read_json("../data/cve/cves.json")

df.columns

In [None]:
def _calc_cvss_score(v: str) -> float:
    try:
        return cvss.CVSS3(v).scores()[0]
    except cvss.exceptions.CVSS3MalformedError:
        return -1.0


df["parsed_scores"] = df["XYZ_CVSS_VECTOR"].dropna().apply(_calc_cvss_score)
df["failed_to_parse"] = df["XYZ_CVSS_SCORE"].notna() & (df["XYZ_CVSS_SCORE"] != df["parsed_scores"])
# df[["XYZ_CVSS_SCORE", "parsed_scores", "failed_to_parse"]].to_csv("../unparseable_vectors.csv")


In [None]:
df.head()

### Attempt 1: Logistic Regression

Logistic Regression is often referred to as the _discriminative_
counterpart of Naive Bayes.

Model $P(y | \mathbf{x}_i)$ and assume it takes exactly the form

$$
    P(y | \mathbf{x}_i) = \frac{1}{1 + e^{-y(\mathbf{w}^T\mathbf{x}_i + b)}}
$$

while making few assumptions about $P(\mathbf{x}_i | y)$.
Ultimately it doesn't matter, because we estimate $\mathbf{w}$ and $b$
directly with MLE or MAP to maximize the conditional likelihood of

$$
    \prod_i P(y_i | \mathbf{x}_i; \mathbf{w}, b)
$$

#### MLE

Choose parameters that maximize the conditional likelihood.
The conditional data likelihood $P(\mathbf{y} | X, \mathbf{w})$
is the probability of the observed values $\mathbf{y} \in \mathbb{R}^n$
in the training data conditioned on the feature values $\mathbf{x}_i$.
Note that $X = [\mathbf{x}_1,\dots,\mathbf{x}_n] \in \mathbb{R}^{d \times n}$.
We choose the parameters that maximize this function, and we assume that
the $y_i$ are independent given the input features $\mathbf{x}_i$ and $\mathbf{w}$.

> In my view, for CVE vectors, this assumption is perfectly valid to make

$$
    P(\mathbf{y} | X, \mathbf{w}) = \prod_{i=1}^{n} P(y_i | \mathbf{x}_i, \mathbf{w}) \\
    \hat{\mathbf{w}}_{\text{MLE}}
    = \underset{\mathbf{w}}{\arg\max}
    - \sum_{i=1}^{n}\log(1 + e^{-y_i\mathbf{w}^T\mathbf{x}_i}) \\
    = \underset{\mathbf{w}}{\arg\min} \sum_{i=1}^{n}\log(1 + e^{-y_i\mathbf{w}^T\mathbf{x}_i})
$$

Use gradient descent on the _negative log likelihood_.

$$
    \ell(\mathbf{w}) = \sum_{i=1}^{n}\log(1 + e^{-y_i\mathbf{w}^T\mathbf{x}_i})
$$

### Text preprocessing

1. lowercase all text
1. remove punctuation
1. tokenize
1. remove stop words
1. lemmatization

In [None]:
import nltk
import string

def desc_preprocess(d: str):
    # setup
    stopwords = set(nltk.corpus.stopwords.words("english"))
    lemmatizer = nltk.stem.WordNetLemmatizer()

    # lowercase
    d = d.lower()
    # remove punctuation
    d = d.translate(str.maketrans(string.punctuation, " "*len(string.punctuation)))
    # tokenize
    tokens = d.split()
    # remove stop words
    tokens = [t for t in tokens if t not in stopwords]
    tokens = [lemmatizer.lemmatize(t) for t in tokens]
    return " ".join(tokens)


In [None]:
df["processed_desc"] = df["DESCRIPTION"].apply(desc_preprocess)

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

def create_bow(descs: pd.Series) -> np.ndarray:
    return CountVectorizer().fit_transform(descs).toarray()

X = create_bow(df["processed_desc"])

X.shape

#### Problem statement

There are two sides to my problem:

1. **Given input descriptions, predict the cvss vector.**
   This is a multi-label, multi-class classification problem.
   Some potential strategies are defined below.
1. **Given input descriptions, suggest a cvss score directly.**
   This is probably a regression problem, although it can
   be converted into a classification problem with buckets
   score buckets of some discrete size.

- Independent labels: train a separate classifier for each label, probably using softmax regression
- Dependent labels: classifier chains - input to a classifier includes output from another
- Dependent labels: label powerset - transform problem into a multi-class problem
  with one multi-class classifier is trained on all unique label combinations
  found in the training data.  Deals efficiently with label correlations.

I need to make a decision regarding the independence assumption of my labels.
I find it intellectually interesting to explore the correlation statistics
between the category + label combinations.  Two methods for establishing
correlation between categories is
- Chi-square test of independence
- Cramer's V

#### Next steps

- Look at documentation to make sure I've got my problem statements right.
  Does vector suggestion deliver value?
- Perform a *Cramer's V* analysis on training examples
- Based on the output of this, decide on ml strategy


In [None]:
from typing import Union

metrics = {
    "AV": "Attack Vector",
    "AC": "Attack Complexity",
    "PR": "Privileges Required",
    "UI": "User Interaction",
    "S": "Scope",
    "C": "Confidentiality",
    "I": "Integrity",
    "A": "Availability",
}


def _vec_parse(vec: str, metric: str):
    return cvss.CVSS3(vec).get_value_description(metric)

def clean_cvss_vector(vec: Union[str, float]) -> Union[str, float]:
    if pd.isna(vec): return np.nan
    try:
        return cvss.CVSS3(vec).clean_vector()
    except cvss.exceptions.CVSS3MalformedError:
        pass

    # fix common problems
    assert type(vec) is str
    vec = vec.upper()
    vec = vec.rstrip(".")
    vec = vec.replace(" ", "")
    vec = vec.rstrip("/")
    try:
        vec = "CVSS:3.1/" + vec[vec.index("AV:"):]
    except ValueError:
        pass
    # vec = vec.removeprefix("VECTOR:")
    # if vec.startswith("AV"): vec = "CVSS:3.1/" + vec
    # if vec.startswith("/AV"): vec = "CVSS:3.1" + vec

    # try again
    try:
        return cvss.CVSS3(vec).clean_vector()
    except cvss.exceptions.CVSS3MalformedError:
        return np.nan
    

def extract_cvss_vector_components(df: pd.DataFrame, vector: pd.Series):
    for metric in metrics.keys():
        df[metric] = vector.dropna().apply(lambda v: _vec_parse(v, metric))
    return df


In [None]:
df["vector"] = df.XYZ_CVSS_VECTOR.apply(clean_cvss_vector)

In [None]:
df = extract_cvss_vector_components(df, df["vector"])
print(f"rows with valid vectors: {df['vector'].count()}")

df.to_csv("../df.csv")

In [None]:
df[["processed_desc", "AV"]].to_csv("../for_autogluon.csv", index=False)

Pearson's $\Chi^2$ test

- [Pearson's chi-squared test - Wikipedia](https://en.wikipedia.org/wiki/Pearson%27s_chi-squared_test)

Based on the below analysis, a couple of the highest correlations:

- AV:L & PR:L highly correlated
- AV:N & PR:L highly negatively correlated
- PR:L & UI:R highly negatively correlated
- PR:N & UI:R highly correlated
- PR:H & S:C highly correlated
- I:L & S:C very highly correlated
- A:N & S:C very highly correlated
- C:* & I* extemely correlated both positively and negatively

In [None]:
import statsmodels.api as sm
import itertools

crosstabs = {}

def perform_independence_test(df: pd.DataFrame):
    for c0, c1 in itertools.combinations(metrics.keys(), 2):
        xtab = pd.crosstab(df[c0], df[c1])
        crosstabs[":".join((c0,c1))] = xtab
        print(f"\n=== {c0} & {c1}")
        print(sm.stats.Table(xtab).resid_pearson)

perform_independence_test(df)

### Testing for statistical significance of cross-category dependence

$H_{0_{\alpha, \beta}}$: metric $\alpha$ and metric $\beta$ are independent

Use standard significance level $\alpha = 0.5$.

In [None]:
import scipy

xtab = crosstabs["C:I"]
alpha = 0.5
chi2stat, pvalue, dof, expected_frequency = scipy.stats.chi2_contingency(xtab)
chi2stat, pvalue, dof, expected_frequency, pvalue <= alpha

### Understanding CVSS Vectors

The following table lists out the metrics comprised by the CVSS vector.
More info can be found in the [CVSS v3.1 Specification Document](https://www.first.org/cvss/specification-document).
Each metric contributes a predefined amount to the overall CVE CVSS score,
and the score is completely determined by the values of these metrics.
Values in the table below are ordered from most to least severe.

| **Base metric** | **Base metric type** | **Description** | **Possible values** |
|---|---|---|---|
| Attack Vector (AV) | Exploitability | This metric reflects the context by which vulnerability exploitation is possible. | Network, Adjacent, Local, Physical |
| Attack Complexity (AC) | Exploitability | This metric describes the conditions beyond the attacker’s control that must exist in order to exploit the vulnerability. | Low, High |
| Privileges Required (PR) | Exploitability | This metric describes the level of privileges an attacker must possess before successfully exploiting the vulnerability. | None, Low, High |
| User Interaction (UI) | Exploitability | This metric captures the requirement for a human user, other than the attacker, to participate in the successful compromise of the vulnerable component. | None, Required |
| Scope (S) | Scope¹ | The Scope metric captures whether a vulnerability in one vulnerable component impacts resources in components beyond its security scope. | Changed, Unchanged |
| Confidentiality (C) | Impact | This metric measures the impact to the confidentiality of the information resources managed by a software component due to a successfully exploited vulnerability. | High, Low, None |
| Integrity (I) | Impact | This metric measures the impact to integrity of a successfully exploited vulnerability. | High, Low, None |
| Availability (A) | Impact | This metric measures the impact to the availability of the impacted component resulting from a successfully exploited vulnerability. | High, Low, None |

<br>

> ¹Scope was introduced in CVSS3.1.

For our model, the precise meaning of these metrics and their subcategories is unimportant.
The relevant question is: _how independent are these metrics and subcategories?_

Assumption of label independence certainly makes our job easier,
as it leaves the door open for more basic machine learning algorithms
like **multi-category logistic regression**.

In [None]:
import numpy as np
import scipy.stats

# Generate two sets of data
data1 = np.random.normal(0, 1, 1000)
data2 = np.random.normal(0.1, 1, 1000)

# Perform a t-test
t_stat, p_value = scipy.stats.ttest_ind(data1, data2)

print(f"t-statistic: {t_stat}")
print(f"p-value: {p_value}")


Need to understand if the results in the crosstable are statistically significant
Then, select an approach for training (multi-class log reg?)

Then, try a basic run

Ok, I understand `pvalue` enough to proceed.
Let's find the p-values of all combinations
and put it in a table

In [None]:
pvalues = {}
def calculate_p_values(df: pd.DataFrame):
    for c0, c1 in itertools.combinations(metrics.keys(), 2):
        xtab = pd.crosstab(df[c0], df[c1])
        chi2stat, pvalue, _, _ = scipy.stats.chi2_contingency(xtab)
        pvalues[".".join((c0,c1))] = pvalue

calculate_p_values(df)

In [None]:
from prettytable import PrettyTable

pt = PrettyTable()
pt.field_names = ["Metric Combination", "Independence p-value"]
pt.align = "l"

for mc, pval in pvalues.items():
    if pval < 0.05:
        pval = "<0.05 (!)"
    pt.add_row((mc, pval))

pt

In [None]:
statistically_independent = len(list(filter(lambda p: p > 0.05, pvalues.values())))
print(
    f"Only {statistically_independent} combinations are statistically independent out"
    f" of {len(pvalues)} combinations of cvss metrics"
)


---

### Selecting the data and algorithm

To start, I will use the UI metric because
1. it is binomial
1. it has a good split between the two values

I will use a simple MLE logistic regression with gradient descent.

In [None]:
df["UI"].hist()

In [None]:
df_clean = df.dropna(subset=["UI"])
X = create_bow(df_clean["processed_descs"].dropna())
# Absorb bias into X
X = np.insert(X, 0, 1, axis=1)
X.shape

In [None]:
Y = np.where(df["UI"].dropna() == "Required", 1, 0)
Y.shape

In [None]:
tt_split = 1000
# Transpose X such that examples are column vectors
X_train, X_test = X[tt_split:].T, X[:tt_split].T
Y_train, Y_test = Y[tt_split:], Y[:tt_split]
X_train.shape, Y_train.shape

In [None]:
def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1 / (1 + np.exp(-z))


def init_params(n_features):
    return np.zeros(n_features)


def num_incorrect(labels: np.ndarray, predictions: np.ndarray) -> np.ndarray:
    c1 = (predictions > 0.5) & (labels == 1)
    c2 = (predictions < 0.5) & (labels == 0)

    return np.logical_or(c1, c2)


In [None]:
alpha = 0.1
grad_desc_cycles = 300
w = init_params(len(X_train))

In [None]:
for i in range(grad_desc_cycles):
    z = w.dot(X_train)
    predictions = sigmoid(z)
    errors = Y_train - predictions
    grad = errors.dot(X_train.T) / len(Y_train)
    w = w + alpha * grad

    if i % 10: continue
    predictions_test = sigmoid(w.dot(X_test))
    n_correct = np.count_nonzero(num_incorrect(Y_test, predictions_test))
    print(np.around(predictions, 3), np.sum(errors), f"({n_correct} / {len(Y_test)})")


#### Multinomial Logistic Regression
Now let's try a different metric, `C` (Confidentiality).
This metric has 3 possible values, so I'll need to use
Multinomial Logistic Regression

In [None]:
from sklearn.preprocessing import OneHotEncoder

metric = "AV"

df_clean = df.dropna(subset=[metric])
X = create_bow(df_clean["processed_descs"].dropna())
# Absorb bias into X
X = np.insert(X, 0, 1, axis=1)

Y = OneHotEncoder(sparse_output=False).fit_transform(
    df[metric].dropna().to_numpy().reshape(-1, 1)
)

X.shape, Y.shape


In [None]:
df[metric].hist()

In [None]:
tt_split = 1000
# Transpose X such that examples are column vectors
X_train, X_test = X[tt_split:].T, X[:tt_split].T
Y_train, Y_test = Y[tt_split:], Y[:tt_split]
X_train.shape, Y_train.shape

In [None]:
def softmax(Z: np.ndarray):
    return np.exp(Z) / np.sum(np.exp(Z), axis=0)


def sigmoid(z: np.ndarray) -> np.ndarray:
    return 1 / (1 + np.exp(-z))


def init_params(n_features, n_categories):
    return np.zeros((n_features, n_categories))

def categorical_cross_entropy_loss(ohY: np.ndarray, A: np.ndarray) -> float:
    assert np.all(
        (ohY.sum(axis=1) == 1) & np.all((ohY == 0) | (ohY == 1), axis=1)
    )  # one-hot encoding
    EPSILON = 1e-8 # avoid math exceptions if A happens to contain 0
    return -np.mean(np.sum(ohY * np.log(A + EPSILON), axis=1), dtype=float)

def num_correct(Y: np.ndarray, A: np.ndarray) -> int:
    return np.sum(np.argmax(Y, axis=1) == np.argmax(A, axis=1))

In [None]:
W = init_params(len(Y_train.T), len(X_train))
alpha = 0.11
grad_desc_cycles = 200

for i in range(grad_desc_cycles):
    Z = W.dot(X_train)
    predictions = softmax(sigmoid(Z))
    errors = Y_train.T - predictions
    grad = errors.dot(X_train.T) / len(Y_train)
    W = W + alpha * grad

    if i % 10: continue
    print(num_correct(Y_train, predictions.T), len(Y_train))
    print(categorical_cross_entropy_loss(Y_train, predictions.T))

|metric|best result|
|---|---|
|C	| 1608 |
|I	| 1581 |
|AV	| 1897 |
|UI	| 1895 |
|S	| 2211 |

---

### Pytorch

I now want to explore using the same techniques,
but with a purpose-built library like `pytorch`.

In [None]:
from sklearn.preprocessing import OneHotEncoder

metric = "AV"

df_clean = df.dropna(subset=[metric])
X = create_bow(df_clean["processed_descs"].dropna())
# Absorb bias into X
# X = np.insert(X, 0, 1, axis=1)

Y = OneHotEncoder(sparse_output=False).fit_transform(
    df[metric].dropna().to_numpy().reshape(-1, 1)
).argmax(axis=1)

X.shape, Y.shape


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# float32 and long are the usual data types
# for features and labels in PyTorch
X_train_torch = torch.from_numpy(X).float()
Y_train_torch = torch.from_numpy(Y).long()
X_train_torch.shape, Y_train_torch.shape

In [None]:
X_train_torch.dtype, Y_train_torch.dtype

In [None]:
model = nn.Linear(X_train_torch.size(1), len(Y_train_torch.unique()))
model

In [None]:
optimizer = optim.SGD(model.parameters(), lr=0.11)
# Combines LogSoftMax and NLLLoss
loss_fn = nn.CrossEntropyLoss()

In [None]:
epochs = 228

for i in range(epochs):
    optimizer.zero_grad()
    outputs = model(X_train_torch)

    loss = loss_fn(outputs, Y_train_torch)
    loss.backward()

    optimizer.step()

    if i % 10: continue
    _, predicted = torch.max(outputs, 1)
    correct = (predicted == Y_train_torch).sum().item()
    print(f"Epoch {i} Accuracy: {correct/len(Y_train_torch)} Loss: {loss.item()}")


In [None]:
from dataclasses import dataclass
from enum import Enum
import logging

logging.basicConfig(
    format="[%(levelname)-8s] (%(name)s) %(message)s",
    level=logging.DEBUG,
)

log = logging.getLogger(__name__)


class CVSSMetricType(Enum):
    Exploit = "Exploit"
    Impact = "Impact"
    Scope = "Scope"


@dataclass
class CVSSMetricMeta:
    type: CVSSMetricType
    abbrev: str
    name: str
    # ordered from higher to lower CVSS Score weighting
    categories: list[str]


CVSS_BASE_METRICS = {
    "AV": CVSSMetricMeta(
        CVSSMetricType.Exploit,
        "AV",
        "Attack Vector",
        ["Network", "Adjacent", "Local", "Physical"],
    ),
    "AC": CVSSMetricMeta(
        CVSSMetricType.Exploit, "AC", "Attack Complexity", ["Low", "High"]
    ),
    "PR": CVSSMetricMeta(
        CVSSMetricType.Exploit, "PR", "Privileges Required", ["None", "Low", "High"]
    ),
    "UI": CVSSMetricMeta(
        CVSSMetricType.Exploit, "UI", "User Interaction", ["None", "Required"]
    ),
    "S": CVSSMetricMeta(CVSSMetricType.Scope, "S", "Scope", ["Changed", "Unchanged"]),
    "C": CVSSMetricMeta(
        CVSSMetricType.Impact, "C", "Confidentiality", ["High", "Low", "None"]
    ),
    "I": CVSSMetricMeta(
        CVSSMetricType.Impact, "I", "Integrity", ["High", "Low", "None"]
    ),
    "A": CVSSMetricMeta(
        CVSSMetricType.Impact, "A", "Availability", ["High", "Low", "None"]
    ),
}


class CVEEngineModel:
    def __init__(self, n_features: int):
        # assume same learn rate for each metric
        self.learn_rate = 0.10
        self.loss_fn = nn.CrossEntropyLoss()
        self.training_epochs = 1000

        self.models = {}
        self.optimizers = {}

        for metric_meta in CVSS_BASE_METRICS.values():
            model = nn.Linear(n_features, len(metric_meta.categories))
            self.models[metric_meta.abbrev] = model
            self.optimizers[metric_meta.abbrev] = optim.SGD(
                model.parameters(), lr=self.learn_rate
            )
        assert self.models.keys() == self.optimizers.keys()

    def display_parameters(self):
        print("== models ==")
        for metric, model in self.models.items():
            print(f"metric: {metric}\tnumber of categories: {model.out_features}")

        print("\n== parameters ==")
        print(f"learn rate: {self.learn_rate}")
        print(f"num features: {list(self.models.values())[0].in_features}")

    def _train_metric(self, X_train: torch.Tensor, Y_train: torch.Tensor, metric: str):
        for i in range(self.training_epochs):
            self.optimizers[metric].zero_grad()
            outputs = self.models[metric](X_train)

            loss = self.loss_fn(outputs, Y_train)
            loss.backward()

            self.optimizers[metric].step()

            if i % 100:
                continue
            log.debug(f"metric: {metric}\tepoch: {i:2}\tloss: {loss}")

    def _validate_Y_properties(self, Y_train: torch.Tensor):
        assert Y_train.shape[1] == len(self.models)
        values, _ = torch.max(Y_train, dim=0)
        assert list(values) == [len(metric_meta.categories) - 1 for metric_meta in CVSS_BASE_METRICS.values()]
            

    def train_all(self, X_train: torch.Tensor, Y_train: torch.Tensor):
        """Trains all models on the provided training data.
        :param metric_labels: a map from CVSS metric metriciations
                              to the Y_train torch.Tensor that is
                              associated with X_train.
        """
        self._validate_Y_properties(Y_train)

        for i, metric in enumerate(self.models.keys()):
            log.debug(f"++ training metric {i}: {metric}")
            self._train_metric(X_train, Y_train[:,i], metric)

    def predict(self, X: torch.Tensor) -> tuple[np.ndarray, np.ndarray]:
        """Returns predictions and confidence scores
        indexed by cvss metric"""

        predictions = np.zeros((X.shape[0], len(self.models)))
        confidence_scores = np.zeros((X.shape[0], len(self.models)))

        for i, (metric, model) in enumerate(self.models.items()):
            prob = nn.functional.softmax(model(X), dim=1)

            pred = torch.argmax(prob, dim=1)

            confidence = prob[range(prob.shape[0]), pred]

            predictions[:, i] = pred.numpy()
            confidence_scores[:, i] = confidence.detach().numpy()

        assert predictions.shape == confidence_scores.shape
        return predictions, confidence_scores

    @staticmethod
    def compute_accuracy(Y_true: np.ndarray, Y_pred: np.ndarray):
        """Computes the accuracy for each metric
        by measuring the proportion of correct predictions"""
        assert Y_true.shape == Y_pred.shape
        return np.mean(Y_true == Y_pred, axis=0)


---

My next task is to do a full run of training
and then print out the results on the test data.

I want to see a table like this:

- av: (45 / 100)
- i: (77 / 100)
- ...

#### Step 1: prepare labels and features

In [None]:
from sklearn.preprocessing import LabelEncoder

# pick any metric to remove NaNs
df_clean = df.dropna(subset=["AV"]).copy()

for metric in CVSS_BASE_METRICS.keys():
    encoder = LabelEncoder()
    df_clean[metric] = encoder.fit_transform(df[metric].dropna())

Y_np = df_clean[list(CVSS_BASE_METRICS.keys())].values
Y = torch.from_numpy(Y_np)


In [None]:
X_np = create_bow(df_clean["processed_descs"])
X = torch.from_numpy(X_np).float()

In [None]:
train_split = 0.8
i = int(0.8 * len(X))
X_train, X_test = X[:i], X[i:]
Y_train, Y_test = Y[:i], Y[i:]

In [None]:
X_train.shape, Y_train.shape

In [None]:
cvem = CVEEngineModel(X_train.shape[1])
cvem.display_parameters()

In [None]:
cvem.train_all(X_train, Y_train)

In [None]:
pred, cs = cvem.predict(X_test)
pred, cs

In [None]:
# pct correct
np.mean(Y_test.numpy() == pred, axis=0)

In [None]:
# average confidence scores
np.mean(cs, axis=0)

Some thoughts

- Want to allow for a "human-in-the-loop" feedback mechanism
- Using "active learning", the model asks for feedback when it is less confident
- Probably what I would use is closer to "online learning", where I can simply
  scrape the data after the fact and retrain.