33import ssl
44import urllib .request
55from json import JSONDecodeError
6- from typing import Optional
6+ from typing import Optional , Union
77from urllib .error import HTTPError
88
99from app .translator .core .models .query_container import MitreInfoContainer , MitreTacticContainer , MitreTechniqueContainer
1010from app .translator .tools .singleton_meta import SingletonMeta
1111from const import ROOT_PROJECT_PATH
1212
1313
14+ class TrieNode :
15+ def __init__ (self ):
16+ self .children = {}
17+ self .is_end_of_word = False
18+ self .result = None
19+
20+
21+ class Trie :
22+ """
23+ Trie (prefix tree) data structure for storing and searching Mitre ATT&CK Techniques and Tactics strings.
24+
25+ This class handles the insertion and searching of strings related to Mitre ATT&CK Techniques and Tactics, even when
26+ the strings have variations in spacing, case, or underscores. By normalizing the text—converting it to lowercase and
27+ removing spaces and underscores—different variations of the same logical string are treated as equivalent.
28+
29+ It means strings 'CredentialAccess', 'credential Access', and 'credential_access' will be processed identically,
30+ leading to the same result.
31+ """
32+
33+ def __init__ (self ):
34+ self .root = TrieNode ()
35+
36+ def normalize_text (self , text : str ) -> str :
37+ return text .replace (" " , "" ).lower ().replace ("_" , "" ).lower ()
38+
39+ def insert (self , text : str , result : Union [MitreTacticContainer , MitreTechniqueContainer ]) -> None :
40+ node = self .root
41+ normalized_text = self .normalize_text (text )
42+
43+ for char in normalized_text :
44+ if char not in node .children :
45+ node .children [char ] = TrieNode ()
46+ node = node .children [char ]
47+
48+ node .is_end_of_word = True
49+ node .result = result
50+
51+
52+ class TacticsTrie (Trie ):
53+ def __init__ (self ):
54+ self .root = TrieNode ()
55+
56+ def search (self , text : str ) -> Optional [MitreTacticContainer ]:
57+ node : TrieNode = self .root
58+ normalized_text = self .normalize_text (text )
59+
60+ for char in normalized_text :
61+ if char not in node .children :
62+ return
63+ node = node .children [char ]
64+
65+ if node .is_end_of_word :
66+ return node .result
67+
68+
69+ class TechniquesTrie (Trie ):
70+ def search (self , text : str ) -> Optional [MitreTechniqueContainer ]:
71+ node : TrieNode = self .root
72+ normalized_text = self .normalize_text (text )
73+
74+ for char in normalized_text :
75+ if char not in node .children :
76+ return
77+ node = node .children [char ]
78+
79+ if node .is_end_of_word :
80+ return node .result
81+
82+
1483class MitreConfig (metaclass = SingletonMeta ):
1584 config_url : str = "https://raw.githubusercontent.com/mitre/cti/master/enterprise-attack/enterprise-attack.json"
1685 mitre_source_types : tuple = ("mitre-attack" ,)
1786
1887 def __init__ (self , server : bool = False ):
19- self .tactics = {}
20- self .techniques = {}
88+ self .tactics : TacticsTrie = TacticsTrie ()
89+ self .techniques : TechniquesTrie = TechniquesTrie ()
2190 if not server :
2291 self .__load_mitre_configs_from_files ()
2392
@@ -44,7 +113,6 @@ def update_mitre_config(self) -> None: # noqa: PLR0912
44113 return
45114
46115 tactic_map = {}
47- technique_map = {}
48116
49117 # Map the tactics
50118 for entry in mitre_json ["objects" ]:
@@ -53,11 +121,12 @@ def update_mitre_config(self) -> None: # noqa: PLR0912
53121 for ref in entry ["external_references" ]:
54122 if ref ["source_name" ] == "mitre-attack" :
55123 tactic_map [entry ["x_mitre_shortname" ]] = entry ["name" ]
56- self .tactics [entry ["name" ].replace (" " , "_" ).lower ()] = {
57- "external_id" : ref ["external_id" ],
58- "url" : ref ["url" ],
59- "tactic" : entry ["name" ],
60- }
124+
125+ tactic_data = MitreTacticContainer (
126+ external_id = ref ["external_id" ], url = ref ["url" ], name = entry ["name" ]
127+ )
128+ self .tactics .insert (entry ["name" ], tactic_data )
129+
61130 break
62131
63132 # Map the techniques
@@ -68,19 +137,15 @@ def update_mitre_config(self) -> None: # noqa: PLR0912
68137 continue
69138 for ref in entry ["external_references" ]:
70139 if ref ["source_name" ] in self .mitre_source_types :
71- technique_map [ref ["external_id" ]] = entry ["name" ]
72140 sub_tactics = []
73- # Get Mitre Tactics (Kill-Chains)
74141 for tactic in entry ["kill_chain_phases" ]:
75142 if tactic ["kill_chain_name" ] in self .mitre_source_types :
76- # Map the short phase_name to tactic name
77143 sub_tactics .append (tactic_map [tactic ["phase_name" ]])
78- self .techniques [ref ["external_id" ].lower ()] = {
79- "technique_id" : ref ["external_id" ],
80- "technique" : entry ["name" ],
81- "url" : ref ["url" ],
82- "tactic" : sub_tactics ,
83- }
144+
145+ technique_data = MitreTechniqueContainer (
146+ technique_id = ref ["external_id" ], name = entry ["name" ], url = ref ["url" ], tactic = sub_tactics
147+ )
148+ self .techniques .insert (ref ["external_id" ], technique_data )
84149 break
85150
86151 # Map the sub-techniques
@@ -92,58 +157,60 @@ def update_mitre_config(self) -> None: # noqa: PLR0912
92157 if ref ["source_name" ] in self .mitre_source_types :
93158 sub_technique_id = ref ["external_id" ]
94159 sub_technique_name = entry ["name" ]
95- parent_technique_name = technique_map [sub_technique_id .split ("." )[0 ]]
96- parent_tactics = self .techniques .get (sub_technique_id .split ("." )[0 ].lower (), {}).get (
97- "tactic" , []
98- )
99- sub_technique_name = f"{ parent_technique_name } : { sub_technique_name } "
100- self .techniques [ref ["external_id" ].lower ()] = {
101- "technique_id" : ref ["external_id" ],
102- "technique" : sub_technique_name ,
103- "url" : ref ["url" ],
104- "tactic" : parent_tactics ,
105- }
160+ if parent_technique := self .techniques .search (sub_technique_id .split ("." )[0 ]):
161+ sub_technique_name = f"{ parent_technique .name } : { sub_technique_name } "
162+ sub_technique_data = MitreTechniqueContainer (
163+ technique_id = ref ["external_id" ],
164+ name = sub_technique_name ,
165+ url = ref ["url" ],
166+ tactic = parent_technique .tactic ,
167+ )
168+ self .techniques .insert (sub_technique_id , sub_technique_data )
106169 break
107170
108171 def __load_mitre_configs_from_files (self ) -> None :
109172 try :
110173 with open (os .path .join (ROOT_PROJECT_PATH , "app/dictionaries/tactics.json" )) as file :
111- self .tactics = json .load (file )
174+ loaded = json .load (file )
175+
176+ for tactic_name , tactic_data in loaded .items ():
177+ tactic = MitreTacticContainer (
178+ external_id = tactic_data ["external_id" ], url = tactic_data ["url" ], name = tactic_data ["tactic" ]
179+ )
180+ self .tactics .insert (tactic_name , tactic )
112181 except JSONDecodeError :
113- self . tactics = {}
182+ print ( "Unable to load MITRE Tactics" )
114183
115184 try :
116185 with open (os .path .join (ROOT_PROJECT_PATH , "app/dictionaries/techniques.json" )) as file :
117- self .techniques = json .load (file )
186+ loaded = json .load (file )
187+ for technique_id , technique_data in loaded .items ():
188+ technique = MitreTechniqueContainer (
189+ technique_id = technique_data ["technique_id" ],
190+ name = technique_data ["technique" ],
191+ url = technique_data ["url" ],
192+ tactic = technique_data .get ("tactic" , []),
193+ )
194+ self .techniques .insert (technique_id , technique )
118195 except JSONDecodeError :
119- self . techniques = {}
196+ print ( "Unable to load MITRE Techniques" )
120197
121198 def get_tactic (self , tactic : str ) -> Optional [MitreTacticContainer ]:
122- tactic = tactic .replace ("." , "_" )
123- if tactic_found := self .tactics .get (tactic ):
124- return MitreTacticContainer (
125- external_id = tactic_found ["external_id" ], url = tactic_found ["url" ], name = tactic_found ["tactic" ]
126- )
199+ return self .tactics .search (tactic )
127200
128201 def get_technique (self , technique_id : str ) -> Optional [MitreTechniqueContainer ]:
129- if technique_found := self .techniques .get (technique_id ):
130- return MitreTechniqueContainer (
131- technique_id = technique_found ["technique_id" ],
132- name = technique_found ["technique" ],
133- url = technique_found ["url" ],
134- tactic = technique_found ["tactic" ],
135- )
202+ return self .techniques .search (technique_id )
136203
137204 def get_mitre_info (
138205 self , tactics : Optional [list [str ]] = None , techniques : Optional [list [str ]] = None
139206 ) -> MitreInfoContainer :
140207 tactics_list = []
141208 techniques_list = []
142209 for tactic in tactics or []:
143- if tactic_found := self .get_tactic ( tactic = tactic . lower () ):
210+ if tactic_found := self .tactics . search ( tactic ):
144211 tactics_list .append (tactic_found )
145212 for technique in techniques or []:
146- if technique_found := self .get_technique ( technique_id = technique . lower () ):
213+ if technique_found := self .techniques . search ( technique ):
147214 techniques_list .append (technique_found )
148215 return MitreInfoContainer (
149216 tactics = sorted (tactics_list , key = lambda x : x .name ),
0 commit comments