In [3]:
from sklearn.feature_extraction.text import CountVectorizer
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score, f1_score
from sklearn.ensemble import RandomForestClassifier
from transformers import (
    Trainer,
    TrainingArguments,
    AutoModel,
    AutoTokenizer,
    AutoModelForMaskedLM,
    AutoModelForSequenceClassification,
)
import json
from sklearn.metrics import confusion_matrix
import spacy

In [4]:
encode = spacy.load("en_core_web_lg")

In [5]:
negative_example_ids = []
negative_example_json = json.load(open("negative_examples_100.json"))
for example in negative_example_json:
    negative_example_ids.append((example["ap"], example["technique"]))

In [6]:
with open("w_dict.json", "r") as f:
    w_dict = json.load(f)
with open("ap_dict.json", "r") as f:
    ap_dict = json.load(f)
with open("technique_dict.json", "r") as f:
    technique_dict = json.load(f)
with open("tactic_dict.json", "r") as f:
    tactic_dict = json.load(f)
with open("cwe_names.json", "r") as f:
    cwe_names = json.load(f)
with open("ap_names.json", "r") as f:
    ap_names = json.load(f)
with open("technique_names.json", "r") as f:
    technique_names = json.load(f)
with open("tactic_names.json", "r") as f:
    tactic_names = json.load(f)
with open("ap_mitigation_descriptions.json", "r") as f:
    ap_mitigation_descriptions = json.load(f)
with open("cwe_mitigation_descriptions.json", "r") as f:
    cwe_mitigation_descriptions = json.load(f)
with open("tech_mitigation_names.json", "r") as f:
    tech_mitigation_names = json.load(f)
with open("tech_detection_names.json", "r") as f:
    tech_detection_names = json.load(f)
with open("ap_detection_descriptions.json", "r") as f:
    ap_detection_descriptions = json.load(f)
with open("cwe_detection_descriptions.json", "r") as f:
    cwe_detection_descriptions = json.load(f)

In [7]:
f = open("cwe_mitigation_ids_temp.json")
w_mitigation = json.load(f)

f = open("capec_mitigation_temp.json")
ap_mitigation = json.load(f)

f = open("technique_mitigation_temp.json")
technique_mitigation = json.load(f)

f = open("technique_detection_temp.json")
technique_detection = json.load(f)

f = open("capec_detection_temp.json")
ap_detection = json.load(f)

f = open("cwe_detection_temp.json")
w_detection = json.load(f)

In [8]:
positive_example_ids = []
for ap in ap_dict:
    for technique in ap_dict[ap]["techniques"]:
        positive_example_ids.append((ap, technique))

In [9]:
example_ids = positive_example_ids + negative_example_ids

In [10]:
ap_name_vectorizer = CountVectorizer()
ap_name_vectorizer.fit(ap_names)

technique_name_vectorizer = CountVectorizer()
technique_name_vectorizer.fit(technique_names)

cwe_name_vectorizer = CountVectorizer()
cwe_name_vectorizer.fit(cwe_names)

tactic_name_vectorizer = CountVectorizer()
tactic_name_vectorizer.fit(tactic_names)

ap_mitigation_vectorizer = CountVectorizer()
cwe_mitigation_vectorizer = CountVectorizer()
tech_mitigation_vectorizer = CountVectorizer()

ap_mitigation_vectorizer.fit(ap_mitigation_descriptions)
cwe_mitigation_vectorizer.fit(cwe_mitigation_descriptions)
tech_mitigation_vectorizer.fit(tech_mitigation_names)

ap_detection_vectorizer = CountVectorizer()
cwe_detection_vectorizer = CountVectorizer()
tech_detection_vectorizer = CountVectorizer()

ap_detection_vectorizer.fit(ap_detection_descriptions)
cwe_detection_vectorizer.fit(cwe_detection_descriptions)
tech_detection_vectorizer.fit(tech_detection_names)

CountVectorizer()

In [11]:
device = "cpu"

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
pretrained_model = AutoModel.from_pretrained("bert-base-uncased").to(device)

model_path = "bert_base"
finetuned_model = AutoModelForMaskedLM.from_pretrained(model_path).to(device)

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.weight', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [12]:
def vector_encoding(
    encoding_type, text, vectorizer=None, bert_output_type=None, bert_finetuned=False
):
    if encoding_type == "None":
        return text
    elif encoding_type == "BoW":
        return vectorizer_transform(text, vectorizer)
    elif encoding_type == "spaCy":
        return spaCy_vector(text)
    elif encoding_type == "BERT":
        if bert_finetuned:
            model = finetuned_model
        else:
            model = pretrained_model

        if bert_output_type == "pooler_output":
            return get_pooler_output(model, text)
        elif bert_output_type == "hidden_state":
            return get_hidden_state(model, text)


def vectorizer_transform(input_to_BoW, vectorizer):
    return vectorizer.transform([input_to_BoW])[0].toarray().flatten()


def spaCy_vector(text):
    return encode(text).vector


def get_pooler_output(model, text):
    inputs = tokenizer(text.lower(), truncation=True, return_tensors="pt").to(device)
    outputs = model(**inputs)
    pooled_output = outputs.pooler_output
    return pooled_output.detach().cpu().numpy().flatten()


def get_hidden_state(model, text):
    inputs = tokenizer(text.lower(), truncation=True, return_tensors="pt").to(device)
    outputs = model(**inputs, output_hidden_states=True)
    hidden_states = outputs.hidden_states
    return hidden_states[-1][:, 0, :].detach().cpu().numpy().flatten()


def append_data(
    encoding_type,
    data_combo,
    ap,
    technique,
    bert_output_type=None,
    bert_finetuned=False,
):
    output = []
    vectorizer = CountVectorizer()
    if data_combo == "A0":
        vectorizer.fit(ap_names + technique_names)

    elif data_combo == "A1":
        vectorizer.fit(ap_names + technique_names + cwe_names + tactic_names)

    elif data_combo == "A1 + MI":
        vectorizer.fit(
            ap_names
            + technique_names
            + cwe_names
            + tactic_names
            + cwe_mitigation_descriptions
            + ap_mitigation_descriptions
            + tech_mitigation_names
        )

    elif data_combo == "A1 + D":
        vectorizer.fit(
            ap_names
            + technique_names
            + cwe_names
            + tactic_names
            + cwe_detection_descriptions
            + ap_detection_descriptions
            + tech_detection_names
        )

    elif data_combo == "A1 + MI + D":
        vectorizer.fit(
            ap_names
            + technique_names
            + cwe_names
            + tactic_names
            + cwe_mitigation_descriptions
            + ap_mitigation_descriptions
            + tech_mitigation_names
            + cwe_detection_descriptions
            + ap_detection_descriptions
            + tech_detection_names
        )

    output.append(ap_dict[ap]["name"])
    output.append(technique_dict[technique]["name"])

    if "A1" in data_combo:
        for cwe in ap_dict[ap]["cwes"]:
            output.append(w_dict[cwe]["name"])

        for tac in technique_dict[technique]["tactics"]:
            output.append(tactic_dict[tac]["name"])

    if data_combo in ["A1 + MI", "A1 + MI + D"]:
        for cwe in ap_dict[ap]["cwes"]:
            for mitigation in w_dict[cwe]["mitigations"]:
                for cwe_mit in w_mitigation:
                    if mitigation == cwe_mit["_id"]:
                        output.append(cwe_mit["metadata"]["Description"])

        for mitigation in ap_dict[ap]["mitigations"]:
            for ap_mit in ap_mitigation:
                if mitigation == ap_mit["_id"]:
                    output.append(ap_mit["metadata"])

        for mitigation in technique_dict[technique]["mitigations"]:
            for tech_mit in technique_mitigation:
                if mitigation == tech_mit["_id"]:
                    output.append(tech_mit["name"])

    if data_combo in ["A1 + D", "A1 + MI + D"]:
        for cwe in ap_dict[ap]["cwes"]:
            for detection in w_dict[cwe]["detections"]:
                for cwe_det in w_detection:
                    if detection == cwe_det["_id"]:
                        output.append(cwe_det["metadata"]["Description"])

        for detection in ap_dict[ap]["detections"]:
            for ap_det in ap_detection:
                if detection == ap_det["_id"]:
                    output.append(ap_det["metadata"])

        for detection in technique_dict[technique]["detections"]:
            for tech_det in technique_mitigation:
                if detection == tech_det["_id"]:
                    output.append(tech_det["metadata"])

    output = " ".join(output)
    return vector_encoding(
        encoding_type, output, vectorizer, bert_output_type, bert_finetuned
    )


def handle_data(
    encoding_type,
    data_combo,
    ap,
    technique,
    bert_output_type=None,
    bert_finetuned=False,
):
    example = []
    try:
        example.append(
            vector_encoding(
                encoding_type,
                ap_dict[ap]["name"],
                ap_name_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )
        example.append(
            vector_encoding(
                encoding_type,
                technique_dict[technique]["name"],
                technique_name_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )
    except KeyError as e:
        print(f"Error {e}")
        return []

    if "A1" in data_combo:
        tactics = []
        for tac in technique_dict[technique]["tactics"]:
            tactics.append(tactic_dict[tac]["name"])
        tactics = " ".join(tactics)
        example.append(
            vector_encoding(
                encoding_type,
                tactics,
                tactic_name_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )

        cwes = []
        for cwe in ap_dict[ap]["cwes"]:
            cwes.append(w_dict[cwe]["name"])
        cwes = " ".join(cwes)
        example.append(
            vector_encoding(
                encoding_type,
                cwes,
                cwe_name_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )

    if data_combo in ["A1 + MI", "A1 + MI + D"]:
        cwe_mitigations = []
        for cwe in ap_dict[ap]["cwes"]:
            for mitigation in w_dict[cwe]["mitigations"]:
                for cwe_mit in w_mitigation:
                    if mitigation == cwe_mit["_id"]:
                        cwe_mitigations.append(cwe_mit["metadata"]["Description"])
        cwe_mitigations = " ".join(cwe_mitigations)
        example.append(
            vector_encoding(
                encoding_type,
                cwe_mitigations,
                cwe_mitigation_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )

        capec_mitigations = []
        for mitigation in ap_dict[ap]["mitigations"]:
            for ap_mit in ap_mitigation:
                if mitigation == ap_mit["_id"]:
                    capec_mitigations.append(ap_mit["metadata"])
        capec_mitigations = " ".join(capec_mitigations)
        example.append(
            vector_encoding(
                encoding_type,
                capec_mitigations,
                ap_mitigation_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )

        tech_mitigations = []
        for mitigation in technique_dict[technique]["mitigations"]:
            for tech_mit in technique_mitigation:
                if mitigation == tech_mit["_id"]:
                    tech_mitigations.append(tech_mit["name"])
        tech_mitigations = " ".join(tech_mitigations)
        example.append(
            vector_encoding(
                encoding_type,
                tech_mitigations,
                tech_mitigation_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )

    if data_combo in ["A1 + D", "A1 + MI + D"]:
        cwe_detections = []
        for cwe in ap_dict[ap]["cwes"]:
            for detection in w_dict[cwe]["detections"]:
                for cwe_det in w_detection:
                    if detection == cwe_det["_id"]:
                        cwe_detections.append(cwe_det["metadata"]["Description"])

        cwe_detections = " ".join(cwe_detections)
        example.append(
            vector_encoding(
                encoding_type,
                cwe_detections,
                cwe_detection_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )

        capec_detections = []
        for detection in ap_dict[ap]["detections"]:
            for ap_det in ap_detection:
                if detection == ap_det["_id"]:
                    capec_detections.append(ap_det["metadata"])
        capec_detections = " ".join(capec_detections)
        example.append(
            vector_encoding(
                encoding_type,
                capec_detections,
                ap_detection_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )

        tech_detections = []
        for detection in technique_dict[technique]["detections"]:
            for tech_det in technique_detection:
                if detection == tech_det["_id"]:
                    tech_detections.append(tech_det["name"])
        tech_detections = " ".join(tech_detections)
        example.append(
            vector_encoding(
                encoding_type,
                tech_detections,
                tech_detection_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )

    return np.hstack(example)


def encode_data(
    encoding_type,
    data_combo,
    ap,
    technique,
    bert_output_type=None,
    bert_finetuned=False,
):
    example = []

    try:
        example.append(
            vector_encoding(
                encoding_type,
                ap_dict[ap]["name"],
                ap_name_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )
        example.append(
            vector_encoding(
                encoding_type,
                technique_dict[technique]["name"],
                technique_name_vectorizer,
                bert_output_type,
                bert_finetuned,
            )
        )
    except KeyError as e:
        print(f"Error {e}")
        return []

    if "A1" in data_combo:
        for cwe in ap_dict[ap]["cwes"]:
            example.append(
                vector_encoding(
                    encoding_type,
                    w_dict[cwe]["name"],
                    cwe_name_vectorizer,
                    bert_output_type,
                    bert_finetuned,
                )
            )

        for tac in technique_dict[technique]["tactics"]:
            example.append(
                vector_encoding(
                    encoding_type,
                    tactic_dict[tac]["name"],
                    tactic_name_vectorizer,
                    bert_output_type,
                    bert_finetuned,
                )
            )

    if data_combo in ["A1 + MI", "A1 + MI + D"]:
        for cwe in ap_dict[ap]["cwes"]:
            for mitigation in w_dict[cwe]["mitigations"]:
                for cwe_mit in w_mitigation:
                    if mitigation == cwe_mit["_id"]:
                        example.append(
                            vector_encoding(
                                encoding_type,
                                cwe_mit["metadata"]["Description"],
                                cwe_mitigation_vectorizer,
                                bert_output_type,
                                bert_finetuned,
                            )
                        )

        for mitigation in ap_dict[ap]["mitigations"]:
            for ap_mit in ap_mitigation:
                if mitigation == ap_mit["_id"]:
                    example.append(
                        vector_encoding(
                            encoding_type,
                            ap_mit["metadata"],
                            ap_mitigation_vectorizer,
                            bert_output_type,
                            bert_finetuned,
                        )
                    )

        for mitigation in technique_dict[technique]["mitigations"]:
            for tech_mit in technique_mitigation:
                if mitigation == tech_mit["_id"]:
                    example.append(
                        vector_encoding(
                            encoding_type,
                            tech_mit["name"],
                            tech_mitigation_vectorizer,
                            bert_output_type,
                            bert_finetuned,
                        )
                    )

    if data_combo in ["A1 + D", "A1 + MI + D"]:
        for cwe in ap_dict[ap]["cwes"]:
            for detection in w_dict[cwe]["detections"]:
                for cwe_det in w_detection:
                    if detection == cwe_det["_id"]:
                        example.append(
                            vector_encoding(
                                encoding_type,
                                cwe_det["metadata"]["Description"],
                                cwe_detection_vectorizer,
                                bert_output_type,
                                bert_finetuned,
                            )
                        )

        for detection in ap_dict[ap]["detections"]:
            for ap_det in ap_detection:
                if detection == ap_det["_id"]:
                    example.append(
                        vector_encoding(
                            encoding_type,
                            ap_det["metadata"],
                            ap_detection_vectorizer,
                            bert_output_type,
                            bert_finetuned,
                        )
                    )

        for detection in technique_dict[technique]["detections"]:
            for tech_det in technique_detection:
                if detection == tech_det["_id"]:
                    example.append(
                        vector_encoding(
                            encoding_type,
                            tech_det["name"],
                            tech_detection_vectorizer,
                            bert_output_type,
                            bert_finetuned,
                        )
                    )

    return np.hstack(example)

In [13]:
def get_classifications(name: str):
    print(f"Classify {name}")
    examples = []
    labels = []
    for i, (ap, technique) in enumerate(example_ids):
        if name == "BERT":
            _example = handle_data(
                "BERT",
                "A1 + MI",
                ap,
                technique,
                bert_output_type="hidden_state",
                bert_finetuned=True,
            )
        elif name == "BoW":
            _example = handle_data("BoW", "A1 + MI", ap, technique)
        elif name == "spaCy":
            _example = handle_data("spaCy", "A1 + MI", ap, technique)

        if len(_example) == 0:
            continue

        examples.append(_example)
        if i < len(example_ids) / 2:
            labels.append(1)
        else:
            labels.append(0)

        if i > 1:
            assert len(_example) == len(examples[-2])

    X_train, X_test, y_train, y_test = train_test_split(
        examples, labels, test_size=0.3, random_state=0
    )

    clf = RandomForestClassifier(random_state=0)
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)

    FPs = []
    FNs = []

    test_example_ids = {}

    for i in range(len(X_test)):
        for j in range(len(examples)):
            if (X_test[i] == examples[j]).all():
                test_example_ids[i] = example_ids[j]
                break

    for i in range(len(y_pred)):
        if y_pred[i] == 1 and y_test[i] != y_pred[i]:
            FPs.append(test_example_ids[i])
        if y_pred[i] == 0 and y_test[i] != y_pred[i]:
            FNs.append(test_example_ids[i])

    with open(f"Best_{name}_FP_ids.json", "w") as f:
        json.dump(FPs, f)

    with open(f"Best_{name}_FN_ids.json", "w") as f:
        json.dump(FNs, f)

    confusion_matrix(y_test, y_pred)

In [14]:
for name in ("BoW", "spaCy", "BERT"):
    get_classifications(name)

Classify BoW
Error 'capec/capec_01246'
Error 'capec/capec_01137'
Error 'capec/capec_01211'
Error 'capec/capec_01320'
Error 'capec/capec_01239'
Error 'technique/technique_00648'
Error 'capec/capec_01296'
Error 'technique/technique_00640'
Error 'capec/capec_01211'
Error 'capec/capec_01256'
Error 'capec/capec_01131'
Error 'technique/technique_00641'
Error 'capec/capec_01228'
Error 'capec/capec_01304'
Error 'capec/capec_01279'
Error 'capec/capec_01224'
Error 'capec/capec_01283'
Error 'capec/capec_01276'
Error 'capec/capec_01283'
Error 'technique/technique_00625'
Error 'capec/capec_01312'
Error 'technique/technique_00642'
Error 'capec/capec_01276'
Error 'capec/capec_01152'
Error 'capec/capec_01227'
Error 'technique/technique_00642'
Error 'capec/capec_01159'
Error 'capec/capec_01310'
Error 'capec/capec_01270'
Error 'technique/technique_00665'
Error 'technique/technique_00635'
Error 'capec/capec_01312'
Error 'technique/technique_00635'
Error 'capec/capec_01213'
Error 'technique/technique_0071

In [15]:
def gather_text_data(ap, technique, data_combo):
    output = {}
    output["AP"] = ap_dict[ap]["name"]
    output["Technique"] = technique_dict[technique]["name"]

    if "A1" in data_combo:
        output["CWEs"] = []
        for cwe in ap_dict[ap]["cwes"]:
            output["CWEs"].append(w_dict[cwe]["name"])
        output["Tactics"] = []
        for tac in technique_dict[technique]["tactics"]:
            output["Tactics"].append(tactic_dict[tac]["name"])

    if "A1 + MI" in data_combo:
        output["CWE Mitigations"] = []
        for cwe in ap_dict[ap]["cwes"]:
            for mitigation in w_dict[cwe]["mitigations"]:
                for cwe_mit in w_mitigation:
                    if mitigation == cwe_mit["_id"]:
                        output["CWE Mitigations"].append(
                            cwe_mit["metadata"]["Description"]
                        )

        output["AP Mitigations"] = []
        for mitigation in ap_dict[ap]["mitigations"]:
            for ap_mit in ap_mitigation:
                if mitigation == ap_mit["_id"]:
                    output["AP Mitigations"].append(ap_mit["metadata"])

        output["Technique Mitigations"] = []
        for mitigation in technique_dict[technique]["mitigations"]:
            for tech_mit in technique_mitigation:
                if mitigation == tech_mit["_id"]:
                    output["Technique Mitigations"].append(tech_mit["name"])

    return output

In [16]:
f = open("Best_BoW_FN_ids.json")
BoW_FN = json.load(f)
f = open("Best_spaCy_FN_ids.json")
spaCy_FN = json.load(f)
f = open("Best_BERT_FN_ids.json")
bert_FN = json.load(f)

In [17]:
BoW_FN = set(tuple(x) for x in BoW_FN)
spaCy_FN = set(tuple(x) for x in spaCy_FN)
bert_FN = set(tuple(x) for x in bert_FN)

print("BoW FNs ", len(BoW_FN))
print("spaCy FNs ", len(spaCy_FN))
print("bert FNs ", len(bert_FN))

BoW FNs  7
spaCy FNs  7
bert FNs  7


In [18]:
all_three_encodings = BoW_FN & spaCy_FN & bert_FN
len(all_three_encodings)

5

In [19]:
BoW_and_spaCy = (BoW_FN & spaCy_FN) - bert_FN
len(BoW_and_spaCy)

1

In [20]:
BoW_and_bert = (BoW_FN & bert_FN) - spaCy_FN
len(BoW_and_bert)

1

In [21]:
spaCy_and_bert = (spaCy_FN & bert_FN) - BoW_FN
len(spaCy_and_bert)

1

In [22]:
only_BoW = BoW_FN - spaCy_FN - bert_FN
len(only_BoW)

0

In [23]:
only_spaCy = spaCy_FN - BoW_FN - bert_FN
len(only_spaCy)

0

In [24]:
only_bert = bert_FN - BoW_FN - spaCy_FN
len(only_bert)

0

In [25]:
f = open("Best_BoW_FP_ids.json")
BoW_FP = json.load(f)
f = open("Best_spaCy_FP_ids.json")
spaCy_FP = json.load(f)
f = open("Best_BERT_FP_ids.json")
bert_FP = json.load(f)

BoW_FP = set(tuple(x) for x in BoW_FP)
spaCy_FP = set(tuple(x) for x in spaCy_FP)
bert_FP = set(tuple(x) for x in bert_FP)

print("BoW FPs ", len(BoW_FP))
print("spaCy FPs ", len(spaCy_FP))
print("bert FPs ", len(bert_FP))

BoW FPs  11
spaCy FPs  11
bert FPs  12


In [27]:
all_three_encodings = BoW_FP & spaCy_FP & bert_FP
len(all_three_encodings)

7

In [28]:
BoW_and_spaCy = (BoW_FN & spaCy_FN) - bert_FN
len(BoW_and_spaCy)

1

In [29]:
BoW_and_bert = (BoW_FN & bert_FN) - spaCy_FN
len(BoW_and_bert)

1

In [30]:
spaCy_and_bert = (spaCy_FN & bert_FN) - BoW_FN
len(spaCy_and_bert)

1

In [31]:
only_BoW = BoW_FN - spaCy_FN - bert_FN
len(only_BoW)

0

In [32]:
only_spaCy = spaCy_FN - BoW_FN - bert_FN
len(only_spaCy)

0

In [33]:
only_bert = bert_FN - BoW_FN - spaCy_FN
len(only_bert)

0

In [36]:
# get all negative examples
all_negative_examples = []
for ap in ap_dict:
    for technique in technique_dict:
        if technique not in ap_dict[ap]["techniques"]:
            all_negative_examples.append((ap, technique))

all_negative_examples = list(set(all_negative_examples) - set(example_ids))