Skip to content

Commit

Permalink
Fix ATTACK technique to CAPEC link. The link can be found in the CAP…
Browse files Browse the repository at this point in the history
…EC entry. Previously the ATTACK technique entry could have a link to capec. Fix #38
  • Loading branch information
hembergerik committed Jul 4, 2023
1 parent 64a9c55 commit 74395f8
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from typing import Dict, Any

import stix2
import pandas as pd

from offense.build_offensive_BRON import DESCRIPTION_MAP_PATHS, NAME_MAP_PATHS
from download_threat_information.parsing_scripts.parse_capec_cwe import load_capec_file


# TODO refactor separation of mitigation and attacks. I.e. BRON is attacks, then extension is mitigations which should be separable
Expand All @@ -23,7 +25,6 @@
def link_tactic_techniques(file_name_: str, save_path: str):
logging.info(f"Begin link ATT&CK Tactic and Technique in {file_name_}")
technique_tactic_dict = {}
capec_technique_dict = {}
technique_id_name_dict = {}
tactic_id_name_dict = {}
tactic_descriptions = {}
Expand All @@ -49,14 +50,7 @@ def link_tactic_techniques(file_name_: str, save_path: str):
if "url" in ref.keys():
if "https://attack.mitre.org/techniques/" in ref["url"]:
technique_id = ref["external_id"]
if ref["source_name"] == "capec":
capec_id = ref["external_id"]
split = capec_id.split("-")
if split[1] not in capec_technique_dict.keys():
capec_technique_dict[split[1]] = [technique_id]
else:
capec_technique_dict[split[1]].append(technique_id)


if entry_id.startswith("x-mitre-tactic--"):
# TODO messy
external_id = entry["external_references"][0]["external_id"]
Expand Down Expand Up @@ -92,7 +86,6 @@ def link_tactic_techniques(file_name_: str, save_path: str):
technique_tactic_dict[technique_id] = list(tactics_set)

save_files = {
NAME_MAP_PATHS["attack_map"]: capec_technique_dict,
NAME_MAP_PATHS["tactic_map"]: technique_tactic_dict,
NAME_MAP_PATHS["technique_names"]: technique_id_name_dict,
NAME_MAP_PATHS["tactic_names"]: tactic_id_name_dict,
Expand All @@ -109,6 +102,14 @@ def link_tactic_techniques(file_name_: str, save_path: str):
logging.info(f"Wrote to disk: {file_name_}")


def load_technique_file(save_path: str) -> Dict[str, str]:
file_name = os.path.join(save_path, NAME_MAP_PATHS["technique_names"])
with open(file_name, "r") as fd:
technique_data = json.load(fd)

return technique_data


def _get_attack_id(entry: Dict[str, Any]) -> str:
technique_id = ""
for external_reference in entry.get("external_references"):
Expand Down Expand Up @@ -319,6 +320,29 @@ def link_technique_technique(file_path: str, save_path: str):
)


def link_capec_technique(save_path: str):
logging.info(f"Begin linking capec to technique from {save_path}")
capec_objects = load_capec_file(save_path)
techniques = load_technique_file(save_path)
capec_technique_dict = collections.defaultdict(list)
for row in capec_objects.iterrows():
capec_entry = row[1]
capec_id = capec_entry["ID"]
technique_maps = capec_entry["Taxonomy_Mappings"]
for technique_map in technique_maps:
assert not technique_map.startswith("T")
technique_id = f"T{technique_map}"
if technique_id in techniques.keys():
capec_technique_dict[capec_id].append(technique_id)

assert len(capec_technique_dict) > 0
out_file = os.path.join(save_path, NAME_MAP_PATHS["attack_map"])
with open(out_file, "w") as fd:
fd.write(json.dumps(capec_technique_dict, indent=4, sort_keys=True))

logging.info(f"End linking CAPEC to technique to {out_file}")


def parse_attack(file_name: str, save_path: str, domain: str = "mitre-attack", platform: str = ""):
logging.info(f"Begin parse ATT&CK data from {file_name}")
# TODO for handling of bronsprak
Expand Down Expand Up @@ -512,4 +536,5 @@ def parse_attack(file_name: str, save_path: str, domain: str = "mitre-attack", p
link_technique_technique(args.filename, args.save_path)
sys.exit(0)

link_tactic_techniques(args.filename, args.save_path)
link_capec_technique(args.save_path)
link_tactic_techniques(args.filename, args.save_path)
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def parse_capec_xml_to_csv(capec_xml_file: str, save_path: str) -> None:
data_ = dict([(k, list(v)) for k, v in internal_links.items()])
json.dump(data_, fd, indent=1)

logging.info(f"Parsd CAPEC {capec_xml_file} to {save_path}")
logging.info(f"Parsed CAPEC {capec_xml_file} to {save_path}")


def _remove_namespace(tag: str, namespace: str) -> str:
Expand Down Expand Up @@ -282,7 +282,7 @@ def _load_cwe_file(save_path: str) -> "pd.DataFrame":
return cwe_data


def _load_capec_file(save_path: str) -> "pd.DataFrame":
def load_capec_file(save_path: str) -> "pd.DataFrame":
file_name = os.path.join(save_path, CAPEC_XML_FILE_NAME)
with open(file_name, "r") as fd:
capec_data = pd.read_json(fd)
Expand All @@ -302,7 +302,7 @@ def parse_capec_cwe_files(save_path):
}

# Load CAPEC and CWE data
capec_objects = _load_capec_file(save_path)
capec_objects = load_capec_file(save_path)
cwe_data = _load_cwe_file(save_path)
# Make capec_names_dict
for row in capec_objects.iterrows():
Expand Down
4 changes: 4 additions & 0 deletions tutorials/build_bron.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ def _parse():
os.path.join(DOWNLOAD_PATH, "raw_enterprise_attack.json"), BRON_SAVE_PATH
)
logging.info("Linked Techniques")
parse_attack.link_capec_technique(
BRON_SAVE_PATH
)
logging.info("Linked CAPEC and Techniques")


def _build():
Expand Down

0 comments on commit 74395f8

Please sign in to comment.