In [None]:
from stix2 import parse, MemoryStore, Filter

store = MemoryStore()
store.load_from_file("data/stix-capec.json")

In [None]:
def remove_revoked_deprecated(stix_objects: list) -> list:
    # Note we use .get() because the property may not be present in the JSON data. The default is ""
    # if the property is not set.
    return list(
        filter(
            lambda x: x.get("x_capec_status", "") != "Deprecated", stix_objects
        )
    )

In [None]:
def get_objects_by_type(store: MemoryStore, stix_type: str, remove_deprecated=False) -> list:
    objects = store.query([Filter("type", "=", stix_type)])
    if remove_deprecated:
        objects = remove_revoked_deprecated(objects)
    if not objects:
        return []
    return objects

In [None]:
def get_attack_related(store: MemoryStore, remove_deprecated=False):
    objects = store.query([Filter("type", "=", "attack-pattern"), Filter("external_references.source_name", "=", "ATTACK")])
    if remove_deprecated:
        objects = remove_revoked_deprecated(objects)
    if not objects:
        return []
    return objects


In [None]:
attack_patterns = get_attack_related(store)
print(len(attack_patterns))
all_patterns = {p.id: p for p in get_objects_by_type(store, "attack-pattern")}
print(len(all_patterns))
patterns = get_objects_by_type(store, "attack-pattern")
all_patterns_ids = {}

for p in get_objects_by_type(store, "attack-pattern"):
    capec_id = [p.external_id for p in p.external_references if p.source_name == "capec"][0]
    all_patterns_ids[capec_id] = p.id

In [None]:
print(len(all_patterns))

In [None]:
def get_id(cid: str):
    return all_patterns_ids[cid]

In [None]:
import json

attack_data = {}
with open("build/attack-to-capec.json", "r") as f:
    attack_data = json.load(f)

In [None]:
mappings = {}
for pattern in attack_patterns:
    attack_ids = [p.external_id for p in pattern.external_references if p.source_name == "ATTACK"]
    capec_id = [p.external_id for p in pattern.external_references if p.source_name == "capec"][0]
    cwe_ids = [p.external_id for p in pattern.external_references if p.source_name == "cwe"]
    
    mappings[capec_id] = {'ATTACK': attack_ids, "CWE": cwe_ids, 'id': pattern.id}

for capec_id in attack_data:
    if capec_id not in mappings:
        mappings[capec_id] = {'ATTACK': attack_data[capec_id], "CWE": []}
    else:
        mappings[capec_id]['ATTACK'].extend(attack_data[capec_id])

mappings = {get_id(i): mappings[i] for i in mappings}
print(len(mappings))

In [None]:
print(mappings)

In [None]:
print(all_patterns_ids['CAPEC-17'])
all_patterns['attack-pattern--c4a0c765-e4ca-43c2-996e-08ce13ae8f80']

In [None]:
from mitreattack.stix20 import MitreAttackData

mitre_attack_data = MitreAttackData("data/enterprise-attack.json")

In [None]:
def ids_to_names(attack_data: MitreAttackData, attack_ids: list[str]) -> list:
    return [f"{attack_id}({attack_data.get_object_by_attack_id(attack_id, 'attack-pattern').name})" for attack_id in attack_ids]

In [None]:
chains = []
for p in mappings:
    pattern = all_patterns[p]
    prev_actions = pattern.get('x_capec_can_follow_refs', [])
    next_actions = pattern.get('x_capec_can_precede_refs', [])
    if pattern.id not in mappings:
        continue
    
    for aid in mappings[pattern.id]['ATTACK']:
        prev_aids = []
        for a in prev_actions:
            if a in mappings:
                prev_aids.extend(mappings[a]['ATTACK'])
                
        next_aids = []
        for a in next_actions:
            if a in mappings:
                next_aids.extend(mappings[a]['ATTACK'])
        if len(prev_aids) != 0 or len(next_aids) != 0:
            prev = ids_to_names(mitre_attack_data, prev_aids)
            current = ids_to_names(mitre_attack_data, [aid])
            nxt = ids_to_names(mitre_attack_data, next_aids)
            chains.append({
                "prev": prev,
                "current": current,
                "next": nxt
            })


In [None]:
print(len(chains))
with open("build/attack-triples.txt", "w") as fp:
    for e in chains:
        for p in e['prev']:
            
            for n in e['next']:
                if not p:
                    fp.write(f"None --> {e['current'][0]}")
                else:
                    fp.write(f"{p} --> {e['current'][0]}")
                if not n:
                    fp.write(" --> None \n")
                else:
                    fp.write(f" --> {n}\n")

In [None]:
print(len(chains))
with open("build/attack-pairs.txt", "w") as fp:
    for e in chains:
        for p in e['prev']:
            fp.write(f"{p} --> {e['current'][0]}\n")
        for n in e['next']:
            fp.write(f"{e['current'][0]} --> {n}\n")