# Installation of Python libraries

In [2]:
!pip install -r requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Import of libraries and definition of functions

In [3]:
from stix2 import FileSystemStore, FileSystemSource
import os
import pandas as pd
import numpy as np
from neo4j import GraphDatabase, Result
from yfiles_jupyter_graphs import GraphWidget
from html2text import html2text as h2t
import re
from functools import reduce
from dotenv import load_dotenv

# neo4j setup
load_dotenv()
URI_NEO4J = os.getenv("URI_NEO4J")
USER_NEO4J = os.getenv("USER_NEO4J")
PASS_NEO4J = os.getenv("PASS_NEO4J")

stix_path = './capec_stix'
attack_pattern_path = f'{stix_path}/attack-pattern'
fs = FileSystemStore(stix_dir=stix_path, bundlify=False)
fs_source = FileSystemSource(stix_dir=stix_path)

macm_file = "Wordpress.macm"

# pandas setup
pd.set_option('display.max_columns', None)
pd.set_option('max_colwidth', None)
pd.set_option('display.max_rows', None)
pd.options.mode.chained_assignment = None  # default='warn'

### Definition of data loading functions in dataframes

In [4]:
def string_to_list(string: str, sepator=r'[ ,]+'):
    if string is None:
        return None
    else:
        return re.split(sepator, string)

def string_to_int_list(string: str, sepator=r'[ ,]+'):
    if string in [None, '', 'None']:
        return None
    else:
        return [int(x) for x in re.split(sepator, string)]
    
def sub_string(string):
    if string is None:
        return None
    else:
        subs = {'*': '', '#': ''}
        string = h2t(str(string))   # convert html in certain columns to text
        string = string.translate(str.maketrans(subs))
        string = re.sub(r'(\S)\n(\S)', r'\1 \2', string)
        string = string.replace('\n ', '\n')
        return string

def list_to_string(list: list, sepator='\n\n'):
    if list is None:
        return None
    else:
        return sepator.join(list)

def dict_to_string(dict: dict):
    if dict is None:
        return None
    else:
        return '\n\n'.join([f"{k}: {v}" for k, v in dict.items()])
    
def external_references_to_string(list: list):
    output = ''
    if list is None:
        return None
    else:
        for reference in list:
            for key in reference:
                output += f"{key}: {reference[key]}\n"
            output += '\n'
        output = output[:-2] # remove last \n\n
        return output

def convert_column_to_text(df: pd.DataFrame):
    for column in ['x_capec_can_follow_refs', 'x_capec_domains', 'object_marking_refs', 'x_capec_prerequisites', 'x_capec_alternate_terms', 'x_capec_can_precede_refs', 'x_capec_resources_required', 'x_capec_example_instances']:
        df[column] = df[column].apply(lambda x: list_to_string(x))

    for column in ['description','x_capec_execution_flow', 'x_capec_extended_description', 'x_capec_example_instances', 'x_capec_resources_required']:
        df[column] = df[column].apply(lambda x: sub_string(x))

    for column in ['x_capec_consequences', 'x_capec_skills_required']:
        df[column] = df[column].apply(lambda x: dict_to_string(x))

    for column in ['external_references']:
        df[column] = df[column].apply(lambda x: external_references_to_string(x))
    return df

def convert_column_to_text_4_panel(df: pd.DataFrame):
    for column in ['x_capec_consequences', 'x_capec_skills_required']:
        df[column] = df[column].apply(lambda x: dict_to_string(x))

    for column in ['external_references']:
        df[column] = df[column].apply(lambda x: external_references_to_string(x))
    return df

def truncate_string_middle(s, n):
    if len(s) <= n:
        # string is already short-enough
        return s
    # half of the size, minus the 3 .'s
    n_2 = int(n) // 2 - 3
    # whatever's left
    n_1 = n - n_2 - 3
    return '{0}...{1}'.format(s[:n_1], s[-n_2:])

def convert_ids_to_capec_ids(df: pd.DataFrame):
    df['capec_id'] = df['external_references'].apply(lambda x: int(x[0]['external_id'].split('-')[1]) if x[0]['source_name'] == 'capec' else None)
    df['capec_childs_id'] = df['x_capec_parent_of_refs'].apply(lambda ids: [int(df.loc[id]['capec_id']) for id in ids] if ids is not None or [] else None)
    df['capec_parents_id'] = df['x_capec_child_of_refs'].apply(lambda ids: [int(df.loc[id]['capec_id']) for id in ids] if ids is not None or [] else None)
    return df

def highlight_attack_patterns(s):
    if s.x_capec_abstraction == 'Meta':
        return ['background-color: #5CC0FF']*len(s)
    elif s.x_capec_abstraction == 'Standard':
        return ['background-color: #85D0FF']*len(s)
    elif s.x_capec_abstraction == 'Detailed':
        return ['background-color: #ADE0FF']*len(s)
    else:
        return ['']

def style_df(df_styler):
    border = '1px solid black !important'
    df_styler.set_table_styles([
        {'selector': 'th', 'props': [('text-align', 'left'), ('border', border)]},
        {'selector': 'td tbody', 'props': [('border', border)]},
        {'selector': 'td', 'props': [('text-align', 'left'), ('border', border), ('max-width', '400px !important'), ('word-wrap', 'break-word'), ('vertical-align', 'top'), ('white-space', 'pre-line')]}
    ])
    df_styler.apply(highlight_attack_patterns, axis=1)
    return df_styler
    
def capec_abstraction_sort(df: pd.DataFrame):
    sorter = ['Meta', 'Standard', 'Detailed']
    df['x_capec_abstraction'] = pd.Categorical(df['x_capec_abstraction'], categories=sorter, ordered=True)
    df = df.sort_values(['x_capec_abstraction', 'capec_id'], ascending=[True, True])
    return df.style.pipe(style_df)

In [5]:
def load_threat_catalog(filename):
    df = pd.read_excel(filename, sheet_name="Threat Components", header=0)
    df.replace(np.nan, None, inplace=True) # replace NaN with None
    df.set_index('TID', inplace=True)
    df = df.astype('str')
    columns_to_convert = ['CapecMeta', 'CapecStandard', 'CapecDetailed']
    for column in columns_to_convert:
        df[column] = df[column].apply(lambda x: string_to_list(x))
    df['Asset'] = df['Asset'].apply(lambda x: x.replace('.', '_'))
    return df

def load_attack_patterns():
    attack_pattern_list = []
    for attack_pattern in [x.removesuffix(".json") for x in os.listdir(attack_pattern_path)]:
        ap = fs.get(attack_pattern)
        attack_pattern_list.append(ap)
    attack_pattern_df = pd.DataFrame(attack_pattern_list)
    attack_pattern_df.set_index('id', inplace=True)
    attack_pattern_df.replace(np.nan, None, inplace=True) # replace NaN with None
    attack_pattern_df = convert_ids_to_capec_ids(attack_pattern_df)
    attack_pattern_df.set_index('capec_id', inplace=True)
    attack_pattern_df.drop(['x_capec_parent_of_refs', 'x_capec_child_of_refs'], axis=1, inplace=True)
    attack_pattern_df.index = pd.CategoricalIndex(attack_pattern_df.index, sorted(attack_pattern_df.index.to_list(), key=lambda x: int(x)))
    return attack_pattern_df

def dataframe_to_str(df: pd.DataFrame):
    df_str = df.copy() # copy the dataframe
    df_str = convert_column_to_text(df_str) # convert all columns to text
    df_str = df_str.astype(str).copy() # convert all columns to string
    return df_str

def get_child_attack_patterns_by_id(parent_id, attack_pattern_df: pd.DataFrame):
    try:
        return attack_pattern_df.loc[parent_id].get('capec_childs_id') or []
    except:
        return None

def get_child_attack_patterns_recursive(parent_id, attack_pattern_df: pd.DataFrame) -> list:
    childs = get_child_attack_patterns_by_id(parent_id, attack_pattern_df)
    if childs is None:
        return []
    else:
        for child in childs:
            childs += get_child_attack_patterns_recursive(child, attack_pattern_df)
        return childs

def get_child_attack_patterns(parent_ids, attack_pattern_df: pd.DataFrame, show_tree=False, show_columns=['name', 'capec_parents_id', 'capec_childs_id', 'x_capec_abstraction', 'description', 'x_capec_extended_description']):
    if type(parent_ids) is not list: parent_ids = [parent_ids]
    childs = [parent_id for parent_id in parent_ids]
    if show_tree:
        childs += [child for parent_id in parent_ids for child in get_child_attack_patterns_recursive(parent_id, attack_pattern_df)]
    childs = list(set(childs))
    try:
        response = attack_pattern_df.loc[childs][show_columns]
        return response
    except:
        return None

def query_attack_patterns(attack_pattern_df: pd.DataFrame, keywords, search_columns:list=['description'], ap_type:list=['Meta', 'Standard', 'Detailed'], show_columns=['name', 'capec_parents_id', 'capec_childs_id', 'x_capec_abstraction', 'description', 'x_capec_extended_description'], query_type='or'):
    if query_type == 'or':
        keywords = '|'.join(keywords)
    elif query_type == 'and':
        keywords = r'(?=.*' + r')(?=.*'.join(keywords) + r')'
    else:
        raise Exception('query_type must be "or" or "and"')
    inds = [attack_pattern_df[x].str.lower().str.contains(keywords.lower()) for x in search_columns]
    type_inds = [attack_pattern_df['x_capec_abstraction'].isin([x]) for x in ap_type]
    response = attack_pattern_df[(reduce(lambda x, y: x | y, inds)) & (reduce(lambda x, y: x | y, type_inds))][show_columns].sort_values(by=['x_capec_abstraction'])
    return response

def read_macm(driver):
    macm_df = driver.execute_query("MATCH (asset) RETURN asset.component_id, asset.application, asset.name, asset.type, asset.app_id", database_='macm', result_transformer_=Result.to_df)
    macm_df.columns = ['Component ID', 'Application', 'Name', 'Type', 'App ID']
    return macm_df

### Definition of data loading functions in Neo4j

In [6]:
def clear_database(driver, database):
    driver.execute_query("MATCH (n) DETACH DELETE n", database_=database)

def create_capec_db(driver, attack_pattern_df: pd.DataFrame, database="capec", show_parent_relationship=True):
    attack_pattern_df_str = dataframe_to_str(attack_pattern_df)
    for index, row in attack_pattern_df_str.iterrows():
        driver.execute_query('''
                MERGE (a:'''+ row['x_capec_abstraction'] + ''' {
                        Capec_Id:$id,
                        Name:$name,
                        Created: $created,
                        Created_By_Ref:$created_by_ref, 
                        Description:$description,
                        External_References:$external_references,
                        Modified:$modified,
                        Object_Marking_Refs:$object_marking_refs,
                        Revoked:$revoked,
                        Spec_Version:$spec_version,
                        Type:$type, 
                        Abstraction:$x_capec_abstraction,
                        Alternate_Terms:$x_capec_alternate_terms,
                        Can_Follow_Refs:$x_capec_can_follow_refs,
                        Can_Precede_Refs:$x_capec_can_precede_refs,
                        Child_Of_Refs:$capec_parents_id,
                        Consequences:$x_capec_consequences,
                        Domains:$x_capec_domains,
                        Example_Instances:$x_capec_example_instances,
                        Execution_Flow:$x_capec_execution_flow,
                        Extended_Description:$x_capec_extended_description,
                        Likelihood_Of_Attack:$x_capec_likelihood_of_attack,
                        Parent_Of_Refs:$capec_childs_id,
                        Peer_Of_Refs:$x_capec_peer_of_refs,
                        Prerequisites:$x_capec_prerequisites,
                        Resources_Required:$x_capec_resources_required,
                        Skills_Required:$x_capec_skills_required,
                        Status:$x_capec_status,
                        Typical_Severity:$x_capec_typical_severity,
                        Version:$x_capec_version
                    })
                ''', parameters_={'id': index} | row.to_dict(), database_=database)
    if show_parent_relationship:
        parents = ["Meta", "Standard"]
        for parent_type in parents:
            for parent in attack_pattern_df.query(f"x_capec_abstraction == '{parent_type}'").index:
                parent_attack_pattern = attack_pattern_df.loc[parent]
                for child in get_child_attack_patterns_by_id(parent, attack_pattern_df):
                    if child in attack_pattern_df.index:
                        driver.execute_query(f"""
                            MATCH (parent {{Capec_Id: {parent_attack_pattern.name}}}),
                                    (child {{Capec_Id: {attack_pattern_df.loc[child].name}}})
                            MERGE (parent)-[:parent_of]->(child)
                        """, database_=database)

def create_threat_catalog_db(driver, threat_catalog_df: pd.DataFrame, database="threats"):
    for index, row in threat_catalog_df.iterrows():
        driver.execute_query('''
                MERGE (a:'''+ row['Asset'] + ''' {
                        TID:$TID,
                        Asset:$Asset,
                        Threat:$Threat,
                        Description:$Description,
                        STRIDE:$STRIDE,
                        Compromised:$Compromised,
                        PreConfidentiality:$PreC,
                        PreIntegrity:$PreI,
                        PreAvailability:$PreA,
                        PreCondition:$Precondition,
                        PostConfidentiality:$PostC,
                        PostIntegrity:$PostI,
                        PostAvailability:$PostA,
                        PostCondition:$PostCondition,
                        CapecMeta:$CapecMeta,
                        CapecStandard:$CapecStandard,
                        CapecDetailed:$CapecDetailed
                    })
                ''', parameters_={'TID': index} | row.to_dict(), database_=database)
        
def create_unified_db(driver, attack_pattern_df: pd.DataFrame, threat_catalog_df: pd.DataFrame, database="capecthreats"):
    create_capec_db(driver, attack_pattern_df, database)
    create_threat_catalog_db(driver, threat_catalog_df, database)
    for index, row in threat_catalog_df.iterrows():
        for capec_id in row['CapecMeta'] + row['CapecStandard'] + row['CapecDetailed']:
            if capec_id != 'None':
                driver.execute_query(f"""
                    MATCH (threat {{TID: "{index}"}}),
                            (capec {{Capec_Id: {capec_id}}})
                    CALL apoc.create.relationship(threat, "has_capec_" + capec.Abstraction, NULL, capec) YIELD rel
                    RETURN rel
                """, database_=database)

def create_enhanched_macm_db(driver, attack_pattern_df: pd.DataFrame, threat_catalog_df: pd.DataFrame, macm_df:pd.DataFrame, macm_file, database="emacm"):
    load_macm(macm_file, driver, database)
    for index, row in macm_df.iterrows():
        related_threat_catalog_df = threat_catalog_df[threat_catalog_df['Asset'] == row['Type'].replace('.', '_')]
        related_attack_pattern = [int(id) for ids in related_threat_catalog_df['CapecMeta'].to_list() + related_threat_catalog_df['CapecStandard'].to_list() + related_threat_catalog_df['CapecDetailed'].to_list() for id in ids if id != 'None']
        related_attack_pattern = list(set(related_attack_pattern))
        related_attack_pattern_df = attack_pattern_df.loc[related_attack_pattern]
        create_capec_db(driver, related_attack_pattern_df, database, show_parent_relationship=False)
        for capec_id in related_attack_pattern:
            if capec_id != 'None':
                driver.execute_query(f"""
                    MATCH (macm {{component_id: "{row['Component ID']}"}}),
                            (capec {{Capec_Id: {capec_id}}})
                    CALL apoc.create.relationship(macm, "has_capec_" + capec.Abstraction, NULL, capec) YIELD rel
                    RETURN rel
                """, database_=database)

def load_macm(filename, driver, database='macm'):
    with open(filename, 'r') as f:
        query = f.read()
        driver.execute_query(query, database_=database)

## Loading Attack Patterns and Threat Catalog into DataFrames

In [7]:
attack_pattern_df = load_attack_patterns()
threat_catalog_df = load_threat_catalog("ThreatCatalogComplete.xlsx")

Convert the DataFrame columns to strings for compatibility with Neo4j

In [8]:
attack_pattern_df_str = dataframe_to_str(attack_pattern_df)

## Loading databases into Neo4j

Connecting to the Neo4j database

In [9]:
driver = GraphDatabase.driver(URI_NEO4J, auth=(USER_NEO4J, PASS_NEO4J))
driver.verify_connectivity()

Creation of the Capec graph

In [10]:
clear_database(driver, 'capec')
create_capec_db(driver, attack_pattern_df=attack_pattern_df)

Creation of the Threat Catalog graph

In [11]:
clear_database(driver, "threats")
create_threat_catalog_db(driver, threat_catalog_df)

Creation of the Capec-Threat Catalog graph

In [12]:
clear_database(driver, "capecthreats")
create_unified_db(driver, threat_catalog_df=threat_catalog_df, attack_pattern_df=attack_pattern_df, database="capecthreats")

## Load the MACM data into Neo4j

Loading the MACM of the system under examination into Neo4j

In [13]:
clear_database(driver, "macm")
load_macm(macm_file, driver)

In [24]:
with driver.session(database="macm") as session:
    query_out = session.run("MATCH a=((n)-[r1]->(p)) RETURN a").graph()
macm_graph = GraphWidget(graph=query_out)
macm_graph.directed = True
macm_graph.set_sidebar(enabled=True, start_with="Neighborhood")
macm_graph.show()

GraphWidget(layout=Layout(height='500px', width='100%'))

## Representation of the Capec graph with an external library

In [14]:
node_styles = {
    "Meta": {"color": "red", "label": "Name"},
    "Standard": {"color": "blue", "label": "Name", "shape": "box"},
    "Detailed": {"color": "green", "label": "Name", "shape": "hexagon"},
}

with driver.session(database="capec") as session:
    query_out = session.run("MATCH a=((n)-[r1]->(p)) RETURN a").graph()
graph = GraphWidget(graph=query_out)
graph.directed = True
graph.set_sidebar(enabled=True, start_with="Neighborhood")
graph.set_node_styles_mapping(lambda index, node: node_styles.get(node["properties"]["Abstraction"], {}))
graph.set_node_label_mapping(lambda index, node : truncate_string_middle(node["properties"][node_styles.get(node["properties"]["label"], {"label":"label"})["label"]], 15))
graph.show()

GraphWidget(layout=Layout(height='800px', width='100%'))

## Query in the Capec catalog

In [15]:
query_attack_patterns(attack_pattern_df_str, search_columns=['description', 'name'], keywords=['communication', 'network', 'interaction'], ap_type=['Meta'])

Unnamed: 0_level_0,name,capec_parents_id,capec_childs_id,x_capec_abstraction,description,x_capec_extended_description
capec_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
125,Flooding,,"[482, 486, 487, 488, 489, 490, 528, 666]",Meta,"An adversary consumes the resources of a target by rapidly engaging in a large number of interactions with the target. This type of attack generally exposes a weakness in rate limiting or flow. When successful this attack prevents legitimate users from accessing the service and can cause the target to crash. This attack differs from resource depletion through leaks or allocations in that the latter attacks do not rely on the volume of requests made to the target but instead focus on manipulation of the target's operations. The key factor in a flooding attack is the number of requests the adversary can make in a given period of time. The greater this number, the more likely an attack is to succeed against a given target.\n\n",
594,Traffic Injection,,[595],Meta,"An adversary injects traffic into the target's network connection. The adversary is therefore able to degrade or disrupt the connection, and potentially modify the content. This is not a flooding attack, as the adversary is not focusing on exhausting resources. Instead, the adversary is crafting a specific input to affect the system in a particular way.\n\n",
272,Protocol Manipulation,,"[220, 276, 277, 278, 90]",Meta,"An adversary subverts a communications protocol to perform an attack. This type of attack can allow an adversary to impersonate others, discover sensitive information, control the outcome of a session, or perform other attacks. This type of attack targets invalid assumptions that may be inherent in implementers of the protocol, incorrect implementations of the protocol, or vulnerabilities in the protocol itself.\n\n",
607,Obstruction,,"[547, 582, 601, 603]",Meta,"An attacker obstructs the interactions between system components. By interrupting or disabling these interactions, an adversary can often force the system into a degraded state or cause the system to stop working as intended. This can cause the system components to be unavailable until the obstruction mitigated.\n\n",
586,Object Injection,,,Meta,"An adversary attempts to exploit an application by injecting additional, malicious content during its processing of serialized objects. Developers leverage serialization in order to convert data or state into a static, binary format for saving to disk or transferring over a network. These objects are then deserialized when needed to recover the data/state. By injecting a malformed object into a vulnerable application, an adversary can potentially compromise the application by manipulating the deserialization process. This can result in a number of unwanted outcomes, including remote code execution.\n\n",
123,Buffer Manipulation,,"[100, 540]",Meta,"An adversary manipulates an application's interaction with a buffer in an attempt to read or modify data they shouldn't have access to. Buffer attacks are distinguished in that it is the buffer space itself that is the target of the attack rather than any code responsible for interpreting the content of the buffer. In virtually all buffer attacks the content that is placed in the buffer is immaterial. Instead, most buffer attacks involve retrieving or providing more input than can be stored in the allocated buffer, resulting in the reading or overwriting of other unintended program memory.\n\n",
216,Communication Channel Manipulation,,"[12, 217]",Meta,"An adversary manipulates a setting or parameter on communications channel in order to compromise its security. This can result in information exposure, insertion/removal of information from the communications stream, and/or potentially system compromise.\n\n",
117,Interception,,"[157, 499, 651]",Meta,"An adversary monitors data streams to or from the target for information gathering purposes. This attack may be undertaken to solely gather sensitive information or to support a further attack against the target. This attack pattern can involve sniffing network traffic as well as other types of data streams (e.g. radio). The adversary can attempt to initiate the establishment of a data stream or passively observe the communications as they unfold. In all variants of this attack, the adversary is not the intended recipient of the data stream. In contrast to other means of gathering information (e.g., targeting data leaks), the adversary must actively position themself so as to observe explicit data channels (e.g. network traffic) and read the content. However, this attack differs from a Adversary-In-the-Middle (CAPEC-94) attack, as the adversary does not alter the content of the communications nor forward data to the intended recipient.\n\n",
548,Contaminate Resource,,,Meta,"An adversary contaminates organizational information systems (including devices and networks) by causing them to handle information of a classification/sensitivity for which they have not been authorized. When this happens, the contaminated information system, device, or network must be brought offline to investigate and mitigate the data spill, which denies availability of the system until the investigation is complete.\n\n",Contamination through email is a very common attack vector. Systems with email servers or personal work systems using email are susceptible to this attack simply by receiving an email that contains a classified document or information. A fake classified document could even be used that is mistaken as true classified material. This would still cause the system to be taken offline until the validity of the classified material is confirmed.\n\n
192,Protocol Analysis,,[97],Meta,"An adversary engages in activities to decipher and/or decode protocol information for a network or application communication protocol used for transmitting information between interconnected nodes or systems on a packet- switched data network. While this type of analysis involves the analysis of a networking protocol inherently, it does not require the presence of an actual or physical network.\n\n","Although certain techniques for protocol analysis benefit from manipulating live 'on-the-wire' interactions between communicating components, static or dynamic analysis techniques applied to executables as well as to device drivers, such as network interface drivers, can also be used to reveal the function and characteristics of a communication protocol implementation. Depending upon the methods used the process may involve observing, interacting, and modifying actual communications occurring between hosts. The goal of protocol analysis is to derive the data transmission syntax, as well as to extract the meaningful content, including packet or content delimiters used by the protocol. This type of analysis is often performed on closed- specification protocols, or proprietary protocols, but is also useful for analyzing publicly available specifications to determine how particular implementations deviate from published specifications.\n\n"


In [16]:
query_attack_patterns(attack_pattern_df_str, search_columns=['name', 'description'], keywords=['node', 'forward'], query_type='and')

Unnamed: 0_level_0,name,capec_parents_id,capec_childs_id,x_capec_abstraction,description,x_capec_extended_description
capec_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
219,XML Routing Detour Attacks,[94],,Standard,"An attacker subverts an intermediate system used to process XML content and forces the intermediate to modify and/or re-route the processing of the content. XML Routing Detour Attacks are Adversary in the Middle type attacks (CAPEC-94). The attacker compromises or inserts an intermediate system in the processing of the XML message. For example, WS-Routing can be used to specify a series of nodes or intermediaries through which content is passed. If any of the intermediate nodes in this route are compromised by an attacker they could be used for a routing detour attack. From the compromised system the attacker is able to route the XML process to other nodes of their choice and modify the responses so that the normal chain of processing is unaware of the interception. This system can forward the message to an outside entity and hide the forwarding and processing from the legitimate processing systems by altering the header information.\n\n",


In [17]:
get_child_attack_patterns(169, attack_pattern_df)

Unnamed: 0_level_0,name,capec_parents_id,capec_childs_id,x_capec_abstraction,description,x_capec_extended_description
capec_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
169,Footprinting,,"[292, 300, 309, 497, 529, 573, 574, 575, 576, 577, 580, 646, 694]",Meta,An adversary engages in probing and exploration activities to identify constituents and properties of the target.,"\n <xhtml:p>Footprinting is a general term to describe a variety of information gathering techniques, often used by attackers in preparation for some attack. It consists of using tools to learn as much as possible about the composition, configuration, and security mechanisms of the targeted application, system or network. Information that might be collected during a footprinting effort could include open ports, applications and their versions, network topology, and similar information. Although similar to fingerprinting, footprinting aims to get a more holistic view of a system or network, whereas fingerprinting is more targeted to a specific application or operating system. While footprinting is not intended to be damaging (although certain activities, such as network scans, can sometimes cause disruptions to vulnerable applications inadvertently) it may often pave the way for more damaging attacks.</xhtml:p>\n"


# GUI

Interactive interface for searching information in the Capec catalog.

In [18]:
import ipywidgets as widgets
from IPython.display import display, HTML
import yake

# YAKE setup
language = "en"
max_ngram_size = 3
deduplication_threshold = 0.9
numOfKeywords = 20
custom_kw_extractor = yake.KeywordExtractor(lan=language, n=max_ngram_size, dedupLim=deduplication_threshold, top=numOfKeywords, features=None)

display(HTML('''
    <style>
        .myStyle { font-weight: bold; }
    </style>
'''))

column_to_search = attack_pattern_df.columns.to_list()

clear_button = widgets.Button(
    description='Clear',
    disabled=False,
    button_style='danger',
    tooltip='Clear',
    icon='trash',
    layout=widgets.Layout(width='auto', height='auto')
)

search_id_tags = widgets.TagsInput(
    value=[],
    placeholder='Search ID',
    description='ID',
    disabled=False,
    layout=widgets.Layout(width='200px', height='auto')
)

show_tree_toggle = widgets.ToggleButtons(
    options=[True, False],
    value=False,
    description='Show Childs',
    disabled=False,
    button_style='info',
    tooltip='Show Tree',
    icon='check',
    layout=widgets.Layout(width='auto', height='auto', justify_content='center', align_items='center', flex_flow='column', display='flex'),
    style=widgets.ToggleButtonsStyle(button_width='auto', font_weight='bold')
)

show_all_toggle = widgets.ToggleButtons(
    options=[True, False],
    value=False,
    description='Show All',
    disabled=False,
    button_style='info',
    tooltip='Show All',
    icon='check',
    layout=widgets.Layout(width='auto', height='auto', justify_content='center', align_items='center', flex_flow='column', display='flex'),
    style=widgets.ToggleButtonsStyle(button_width='auto', font_weight='bold')
)

id_label = widgets.Label(value='ID')

search_id_box_clr = widgets.HBox([clear_button, search_id_tags], layout=widgets.Layout(align_items='center', width='auto', justify_content='center'))
search_id_box = widgets.VBox([id_label, search_id_box_clr], layout=widgets.Layout(align_items='center', width='auto', justify_content='center'))

search_type_toggle = widgets.ToggleButtons(
    options=['or', 'and'],
    value='or',
    description='Search Type',
    disabled=False,
    button_style='info',
    tooltips=['Search using OR', 'Search using AND'],
    layout=widgets.Layout(width='auto', height='auto', justify_content='center', align_items='center', flex_flow='column', display='flex'),
    style=widgets.ToggleButtonsStyle(button_width='auto', font_weight='bold')
)

search_abstraction_sel = widgets.SelectMultiple(
    options=['Meta', 'Standard', 'Detailed'],
    value=['Meta', 'Standard', 'Detailed'],
    rows=3,
    description='Abstraction',
    disabled=False,
    layout=widgets.Layout(width='auto', height='fit-content', justify_content='center', align_items='center', flex_flow='column', display='flex')
)

search_columns_sel = widgets.SelectMultiple(
    options=column_to_search,
    value=column_to_search,
    rows=5,
    description='Columns',
    disabled=False,
    layout=widgets.Layout(width='auto', height='auto', justify_content='center', align_items='center', flex_flow='column', display='flex')
)

show_columns = widgets.SelectMultiple(
    options=column_to_search,
    value=['name', 'capec_parents_id', 'capec_childs_id', 'x_capec_abstraction', 'description', 'x_capec_extended_description'],
    rows=5,
    description='Columns to show',
    disabled=False,
    layout=widgets.Layout(width='fit-content', height='auto', justify_content='center', align_items='center', flex_flow='column', display='flex')
)

keywords_label = widgets.Label(value='Keywords')

search_keyword_tag = widgets.TagsInput(
    placeholder='Enter keyword',
    value=[],
    allow_duplicates=False,
    layout=widgets.Layout(width='fit-parent', height='auto', justify_self='center', align_self='center')
)

keyword_description = widgets.Textarea(
    placeholder='Enter description',
    value='',
    disabled=False,
    layout=widgets.Layout(width='fit-parent', height='auto', justify_self='center', align_self='center', min_width='300px'),
    rows=5
)

search_button = widgets.Button(
    description='Search',
    disabled=False,
    button_style='info',
    tooltip='Search',
    icon='search',
    layout=widgets.Layout(width='auto', height='auto')
)

search_keyword_tag_clr = widgets.HBox([clear_button, search_button], layout=widgets.Layout(align_items='center', width='auto', justify_content='center'))
search_keyword_tag_box = widgets.VBox([keywords_label, search_keyword_tag_clr, keyword_description, search_keyword_tag], layout=widgets.Layout(align_items='center', width='auto', justify_content='center'))

search_id_box_stack = widgets.AppLayout(
    children=[search_id_box, show_all_toggle, show_tree_toggle, show_columns],
    grid_gap='10px',
    justify_items='center',
    align_items='center',
    layout=widgets.Layout(width='fit-parent', height='auto', justify_content='center', align_items='center', display='flex', align_content='center', align_self='center', justify_self='center')
)

showing_number = widgets.Label(
    value='Showing 0 items',
    layout=widgets.Layout(width='auto', height='auto', justify_content='center', align_items='center', display='flex')
)

grid = widgets.GridspecLayout(2, 3, align_items='center', height='auto', width='auto', justify_content='center', grid_gap='10px')
grid[:, 0] = search_keyword_tag_box
grid[0, 1] = search_type_toggle
grid[0, 2] = search_abstraction_sel
grid[1, 1] = search_columns_sel
grid[1, 2] = show_columns

tab = widgets.Tab()
tab.children = [search_id_box_stack, grid]
tab.titles = ['Search by ID', 'Search by Keywords']

search_results = widgets.Output(layout=widgets.Layout(width='fit-content', height='auto', padding='0 20px 0 0'))

search_type_toggle.add_class('myStyle')
search_abstraction_sel.add_class('myStyle')
search_columns_sel.add_class('myStyle')
keywords_label.add_class('myStyle')
show_all_toggle.add_class('myStyle')
showing_number.add_class('myStyle')
show_tree_toggle.add_class('myStyle')
id_label.add_class('myStyle')
show_columns.add_class('myStyle')

def on_clear_button_clicked(b):
    if tab.selected_index == 0:
        search_id_tags.value = []
    elif tab.selected_index == 1:
        search_keyword_tag.value = []
        keyword_description.value = ''
    
def on_search_button_clicked(b):
    if tab.selected_index == 1 and keyword_description.value != '':
        search_keyword_tag.value = [x[0] for x in custom_kw_extractor.extract_keywords(keyword_description.value)]
    
def on_update(change):
    search_results.clear_output()
    column_to_show = list(show_columns.value)
    with search_results:
        if tab.selected_index==0:
            if search_id_tags.value != []:
                query_out = get_child_attack_patterns([int(id) for id in search_id_tags.value], attack_pattern_df, show_tree=show_tree_toggle.value, show_columns=column_to_show)
                showing_number.value = f"Showing {len(query_out.values)} items"
                display(capec_abstraction_sort(query_out))
            elif show_all_toggle.value:
                query_out = attack_pattern_df[column_to_show]
                showing_number.value = f"Showing {len(attack_pattern_df.values)} items"
                display(capec_abstraction_sort(query_out))
            else:
                showing_number.value = "Showing 0 items"
        elif tab.selected_index==1 and search_keyword_tag.value != []:
            query_out = query_attack_patterns(attack_pattern_df_str, search_columns=search_columns_sel.value, ap_type=search_abstraction_sel.value, keywords=search_keyword_tag.value, query_type=search_type_toggle.value, show_columns=column_to_show)
            showing_number.value = f"Showing {len(query_out.values)} items"
            display(capec_abstraction_sort(query_out))
        else:
            showing_number.value = "Showing 0 items"

search_id_tags.observe(on_update, names='value')
search_type_toggle.observe(on_update, names='value')
search_abstraction_sel.observe(on_update, names='value')
search_columns_sel.observe(on_update, names='value')
search_keyword_tag.observe(on_update, names='value')
show_all_toggle.observe(on_update, names='value')
show_tree_toggle.observe(on_update, names='value')
show_columns.observe(on_update, names='value')
search_button.on_click(on_search_button_clicked)
clear_button.on_click(on_clear_button_clicked)
tab.observe(on_update, names='selected_index')

display(tab)
display(showing_number)
display(search_results)

Tab(children=(AppLayout(children=(VBox(children=(Label(value='ID', _dom_classes=('myStyle',)), HBox(children=(…

Label(value='Showing 0 items', layout=Layout(align_items='center', display='flex', height='auto', justify_cont…

Output(layout=Layout(height='auto', padding='0 20px 0 0', width='fit-content'))

# Building Threat Model

In [19]:
macm_df = read_macm(driver)
macm_df

Unnamed: 0,Component ID,Application,Name,Type,App ID
0,1,WordPress,CSC,User,103
1,3,WordPress,LAN Network,Network,103
2,4,WordPress,PC,HW.PC,103
3,5,WordPress,OS,Service.OS,103
4,6,WordPress,Server,HW.Server,103
5,7,WordPress,OS,Service.OS,103
6,8,WordPress,Browser,Service.Browser,103
7,9,WordPress,WordPress,Service.Web,103
8,10,WordPress,MySQL,Service.DB,103


In [20]:
clear_database(driver, "emacm")
create_enhanched_macm_db(driver, attack_pattern_df, threat_catalog_df, macm_df, macm_file, database="emacm")

In [22]:
with driver.session(database="emacm") as session:
    query_out = session.run("MATCH a=((n)-[r1]->(p)) RETURN a").graph()
emacm_graph = GraphWidget(graph=query_out)
emacm_graph.directed = True
emacm_graph.set_sidebar(enabled=True, start_with="Neighborhood")
emacm_graph.set_node_label_mapping(lambda index, node : truncate_string_middle(node["properties"][node_styles.get(node["properties"]["label"], {"label":"label"})["label"]], 15))
emacm_graph.show()

GraphWidget(layout=Layout(height='800px', width='100%'))