Import the librairies and modules:

In [2]:
import networkx as nx
import os
import csv
import re
import numpy as np
from BAG_Code_tw520.BayesianAttackGraphForCVSS import parse_dot
from Threat_Inteligence.CWE_tree.parse import parse_xml
from BAG_Code_tw520.BayesianAttackGraphForCVSS import change_prob
from BAG_Code_tw520.createANDtable import create_AND_table
from BAG_Code_tw520.createORtable import create_OR_table

from pgmpy.inference.ExactInference import BeliefPropagation
from pgmpy.models import BayesianNetwork
from pgmpy.factors.discrete import TabularCPD


To prepare the simulation use the following to put the graph in memory:

In [3]:
def parse_dot(dot_string):
    nodes = {}
    edges = []

    # Définir une expression régulière pour extraire les informations de chaque nœud
    # node_pattern = re.compile("(\d+)")
    node_pattern = re.compile(r'\s+(\d+)\s+\[\s*label="([^"]+)"\s+shape="([^"]+)"\s+CVE="([^"]+)"\s*\];')

    # Définir une expression régulière pour extraire les arêtes
    edge_pattern = re.compile(r'\s+(\d+)\s*->\s*(\d+)\s+\[\s+color="[^"]+"\s*\];')

    # Parcourir chaque ligne du texte
    for line in dot_string.split('\n'):
        # Vérifier si la ligne correspond à un nœud
        node_match = node_pattern.match(line)
        # Vérifier si la ligne correspond à une arête
        edge_match = edge_pattern.match(line)
        if node_match:
            node_id = int(node_match.group(1))
            label = node_match.group(2)
            shape = node_match.group(3)
            node_type = 'AND' if shape == "ellipse" else 'OR'
            cveID = node_match.group(4).strip("\'")
            nodes[node_id] = {'label': label, 'type': node_type, 'CVE': cveID}
        elif edge_match:
            source = int(edge_match.group(1))
            target = int(edge_match.group(2))
            edges.append((source, target))
    model = BayesianNetwork(edges)
    # Read the probabilities for each CVE
    cvss_dict = {}
    with open('C:/Users/docuser/Documents/ImperialWork/Threat_Inteligence/epss_scores-2024-04-25.csv', mode='r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            cvss_dict[row['cve']] = row['epss']
    for elem in nodes.items():
        r = elem[1]['type'] == 'OR'
        #We look for the source nodes
        source = []
        probs = []
        for edge in edges:
            if edge[1] == elem[0]:
                source.append(edge[0])
                if elem[1]['CVE'] == "null":
                    # We use the probability associated to the rule if not null
                    tmp = float(elem[1]['label'].split(':')[2])
                    if tmp != 0:
                        print(elem[1]['label'])
                        probs.append(float(tmp))
                    else: 
                        probs.append(0.9999999)
                else:
                    probs.append(float(cvss_dict[elem[1]['CVE']]))
        npa = len(source)
        #We draw the probability from the distribution of CVSS scores
        if r:
            cpt = create_OR_table(probs)
        else:
            cpt = create_AND_table(probs)
        if npa:
            cpd = TabularCPD(elem[0], 2, cpt.T, source, evidence_card=2*np.ones(npa))
        else:
            cpd = TabularCPD(elem[0], 2, cpt.T)
        #Insert the conditional probability table into the Bayesian Network object
        model.add_cpds(cpd)
    return model, edges, nodes


def change_prob(BAG, edges, nodes, cwe_dict, Gcwe, prop0, factor, evidence_loc):
    src_node = set()
    for c_node in evidence_loc:
        src_node.update(get_all_ancestors(BAG, c_node))
    dst_node = set(nodes) - src_node
    for dst_n in dst_node:
        dst_k = dst_n
        dst_cve = nodes[dst_k]['CVE']
        if dst_cve == "null":
            continue
        print(dst_cve)
        dst_cwe = cwe_dict[dst_cve]
        if dst_cwe != 'NVD-CWE-Other' and dst_cwe != "NVD-CWE-noinfo":
            for e in edges:
                if e[1] == dst_k:
                    source = [e[0]]
                    new_prob = BAG.get_cpds(dst_k).values[1][1]
                    for s in src_node:
                        src_k = s
                        src_cve = nodes[src_k]['CVE']
                        if src_cve == "null":
                            continue
                        src_cwe = cwe_dict[src_cve]
                        if src_cwe != 'NVD-CWE-Other' and src_cwe != "NVD-CWE-noinfo":
                            dist = nx.shortest_path_length(Gcwe, source=src_cwe.split('-')[1], target=dst_cwe.split('-')[1])
                            print("CVE source : " + nodes[src_k]['label'])
                            tmp_prob = factor**(dist+1) * prop0.query([src_k],evidence=evidence_loc).values[1]
                            print(222222222222222222222222222222)
                            new_prob = tmp_prob + new_prob - new_prob*tmp_prob
                    props = create_OR_table([new_prob])
                    BAG.remove_cpds(dst_k)
                    BAG.add_cpds(TabularCPD(dst_k, 2, props.T, source, evidence_card=2*np.ones(1)))

def get_all_ancestors(BAG, node, ancestors=set()):
    parents = BAG.get_parents(node)
    ancestors.update(parents)
    for p in parents:
        get_all_ancestors(BAG, p, ancestors)
    return ancestors


In [4]:
# Name of the simulation
simulation = "OCD_kcwe"

# Path to the folder containing the tree
path = "C:/Users/docuser/Documents/ImperialWork/Personnal_simulations/output_" + simulation + "/strongly_connected_components/"
file_name = "ag-nocycles.dot"
path_to_dot = path + file_name
output_file = path + file_name[:-4] + "_inference.txt"

# We all read from the file, adding probabilities in the same time
BAG, edges, nodes = parse_dot(open(path_to_dot, 'r').read())

# This is the reference BAG, before the attacker has compromised any node
BAG_ref = BAG.copy()
prop0 = BeliefPropagation(BAG_ref)

# We create a dictionary to get the node number from the label
inverted_nodes = {int(v['label'].split(':')[0]): k for k, v in nodes.items()}

2:RULE 10 (permissions move):0.8
2:RULE 10 (permissions move):0.8
4:RULE 6 (valid credentials may be found):0.8
22:RULE 3 (host may be vulnerable):0.2
33:RULE 5 (low access is possible):0.8


In [5]:
print(BAG.get_cpds(3))

+------+------+-----------------------+
| 4    | 4(0) | 4(1)                  |
+------+------+-----------------------+
| 3(0) | 1.0  | 9.999999994736442e-08 |
+------+------+-----------------------+
| 3(1) | 0.0  | 0.9999999             |
+------+------+-----------------------+


In [6]:
# Introducing link between CWEs themselves
Gcwe = nx.Graph()
parse_xml(os.getcwd()+'/Threat_Inteligence/CWE_tree/1000.xml', Gcwe)

# Introducing link between CWEs and CVEs
cwe_file = csv.DictReader(open(os.getcwd()+'/Threat_Inteligence/CVE_to_CWE.csv', 'r'))
cwe_dict = {row['Vulnerability']: row['weakness'] for row in cwe_file}

## When the attacker become dcsync and we want to recalculate for domain admin
src_node = [7, 10]
dst_node = [ 14, 16, 12, 2]

In [7]:
# We list the CVE that are before the attacker position
evidence_loc = {inverted_nodes[e]: 1 for e in [5, 18]} # Root and admin(host)
# The factor between two successives notes in the CWE tree
factor = 0.9
# We always start from the original probabilities of the CVEs
BAG = BAG_ref.copy()

# We modify the conditional probabilities according to the attacks that have been done
change_prob(BAG, edges, nodes, cwe_dict, Gcwe, prop0, factor, evidence_loc)

CVE-2020-1472
CVE source : 19:RULE 4 (classic quick compromission attack is possible against host or domain):0.2
222222222222222222222222222222
CVE source : 23:RULE 4 (classic quick compromission attack is possible against host or domain):0.2
222222222222222222222222222222
CVE source : 25:RULE 4 (classic quick compromission attack is possible against host or domain):0.2


  phi.values = phi.values / phi1.values


222222222222222222222222222222
CVE source : 27:RULE 8 (admin access is possible):0.2
222222222222222222222222222222
CVE source : 30:RULE 9 (privilege escalation):0.2
222222222222222222222222222222
CVE-2022-26925
CVE source : 19:RULE 4 (classic quick compromission attack is possible against host or domain):0.2
222222222222222222222222222222
CVE source : 23:RULE 4 (classic quick compromission attack is possible against host or domain):0.2
222222222222222222222222222222
CVE source : 25:RULE 4 (classic quick compromission attack is possible against host or domain):0.2
222222222222222222222222222222
CVE source : 27:RULE 8 (admin access is possible):0.2
222222222222222222222222222222
CVE source : 30:RULE 9 (privilege escalation):0.2
222222222222222222222222222222
CVE-2021-42278
CVE source : 19:RULE 4 (classic quick compromission attack is possible against host or domain):0.2
222222222222222222222222222222
CVE source : 23:RULE 4 (classic quick compromission attack is possible against host or 

In [8]:
honey_pot = set()
get_all_ancestors(BAG_ref, 1, honey_pot)
src_node = set()
for c_node in evidence_loc:
    get_all_ancestors(BAG_ref, c_node, src_node)
honey_pot = honey_pot - src_node
honney_pot = [((BAG.get_cpds(h).values[-1][-1]-BAG_ref.get_cpds(h).values[-1][-1])/BAG_ref.get_cpds(h).values[-1][-1], nodes[h]['CVE'], nodes[h]['label']) for h in honey_pot if nodes[h]['CVE'] != "null"]
print(min(honney_pot, key=lambda x: x[0])[1:3])

('CVE-2022-26925', '10:RULE 0 (mitm attack is possible):0.2')


In [9]:
#We use the pgmpy library to perform the inference
prop = BeliefPropagation(BAG)

cibles = [1]
for cible in cibles:
    print(prop0.query([inverted_nodes[cible]]))
    print(prop0.query([inverted_nodes[cible]], evidence=evidence_loc))
    print(prop.query([inverted_nodes[cible]], evidence=evidence_loc))

+------+----------+
| 1    |   phi(1) |
| 1(0) |   0.2185 |
+------+----------+
| 1(1) |   0.7815 |
+------+----------+
+------+----------+
| 1    |   phi(1) |
| 1(0) |   0.0708 |
+------+----------+
| 1(1) |   0.9292 |
+------+----------+
+------+----------+
| 1    |   phi(1) |
| 1(0) |   0.0488 |
+------+----------+
| 1(1) |   0.9512 |
+------+----------+


In [10]:
# distance = nx.shortest_path_length(Gcwe, source='119', target='255')
# print("Distance between nodes '1004' and '732':", distance)

# print(BAG.get_cpds(inverted_nodes[7]))
# table = create_OR_table([0.1])
# source = []
# for edge in edges:
#     if edge[1] == inverted_nodes[7]:
#         source.append(edge[0])
# npa = len(source)
# BAG.add_cpds(TabularCPD(inverted_nodes[7], 2, table.T, source, evidence_card=2*np.ones(npa)))
# print(BAG.get_cpds(inverted_nodes[7]))


# output = open(output_file, 'w')
# for node in nodes.items():
#     output.write(f'{nodes[node[0]]["label"]} : {prop.query([node[0]])}\n')

In [51]:
from pgmpy.factors.discrete import TabularCPD
from pgmpy.models import BayesianNetwork
from pgmpy.inference import BeliefPropagation
bayesian_model = BayesianNetwork([('A', 'B'), ('A', 'C'), ('B', 'D'), ('C', 'D'), ('Imp', 'B'), ('Imp', 'C'), ('PE', 'B'), ('MITM', 'C')])

cpt_a =[[0.1], [0.9]]
cpd_a = TabularCPD('A', 2, [[0], [1]])
cpd_imp = TabularCPD('Imp', 2, cpt_a)
cpd_pe = TabularCPD('PE', 2, [[0], [1]])
cpd_mitm = TabularCPD('MITM', 2, [[0], [1]])
cpt_b = create_AND_table([0.5, 1, 1])
cpd_b = TabularCPD('B', 2, cpt_b.T, ['A', 'PE', 'Imp'], [2, 2, 2])
cpt_c = create_AND_table([0.5, 1, 1])
cpd_c = TabularCPD('C', 2, cpt_c.T, ['A', 'MITM', 'Imp'], [2, 2, 2])
cpt_d = create_OR_table([1, 1])
cpd_d = TabularCPD('D', 2, cpt_d.T, ['B', 'C'], [2, 2])

bayesian_model.add_cpds(cpd_a, cpd_b, cpd_c, cpd_d, cpd_imp, cpd_pe, cpd_mitm)
belief_propagation = BeliefPropagation(bayesian_model)
print(bayesian_model.get_cpds('Imp'))
print(belief_propagation.query(variables=['D'], show_progress=True))
# print(belief_propagation.query(variables=['B'], evidence={'Imp' : 1}, show_progress=False))
# bayesian_model = BayesianNetwork([('A', 'B'), ('B', 'C'), ('C', 'D')])
# cpt_a = create_OR_table([])
# cpd_a = TabularCPD('A', 2, cpt_a.T)
# cpt_b = create_AND_table([0.1])
# cpd_b = TabularCPD('B', 2, cpt_b.T, ['A'], [2])
# cpt_c = create_AND_table([0.3])
# cpd_c = TabularCPD('C', 2, cpt_c.T, ['B'], [2])
# cpt_d = create_OR_table([0.2])
# cpd_d = TabularCPD('D', 2, cpt_d.T, ['C'], [2])

# bayesian_model.add_cpds(cpd_a, cpd_b, cpd_c, cpd_d)
# belief_propagation = BeliefPropagation(bayesian_model)
# print(bayesian_model.get_cpds('D'))
# print(belief_propagation.query(variables=['B'], evidence={'D': 1}, show_progress=False))


+--------+-----+
| Imp(0) | 0.1 |
+--------+-----+
| Imp(1) | 0.9 |
+--------+-----+
+------+----------+
| D    |   phi(D) |
| D(0) |   0.3250 |
+------+----------+
| D(1) |   0.6750 |
+------+----------+


In [None]:
BAG.to_graphviz()

ImportError: requires pygraphviz http://pygraphviz.github.io/