diff --git a/ad_miner/__main__.py b/ad_miner/__main__.py index 233c5e0..1f96a7b 100644 --- a/ad_miner/__main__.py +++ b/ad_miner/__main__.py @@ -171,7 +171,7 @@ def main() -> None: domains = Domains(arguments, neo4j) computers = Computers(arguments, neo4j, domains) users = Users(arguments, neo4j, domains) - objects = Objects(arguments, neo4j) + objects = Objects(arguments, neo4j, domains, computers, users) # Generate the main page logger.print_success("Temporary vulnerabilities rating :") diff --git a/ad_miner/sources/modules/computers.py b/ad_miner/sources/modules/computers.py index 48739d0..3afb119 100755 --- a/ad_miner/sources/modules/computers.py +++ b/ad_miner/sources/modules/computers.py @@ -313,7 +313,7 @@ def genComputersAdminOfPages(self): computers_admin_to_list = generic_computing.getListAdminTo( self.list_computers_admin_computers, "source_computer", "target_computer" ) - computers_admin_data_grid = [] + self.computers_admin_data_grid = [] for admin_computer, computers_list in computers_admin_to_list.items(): if admin_computer is not None and computers_list is not None: num_path, nb_domains = self.domain.findAndCreatePathToDaFromComputersList(admin_computer, computers_list) @@ -337,8 +337,8 @@ def genComputersAdminOfPages(self): else: tmp_line["Paths to domain admin"] = "-" - computers_admin_data_grid.append(tmp_line) - computers_admin_data_grid.sort( + self.computers_admin_data_grid.append(tmp_line) + self.computers_admin_data_grid.sort( key=lambda x: x["Computers count"], reverse=True ) @@ -351,7 +351,7 @@ def genComputersAdminOfPages(self): ) grid = Grid("Computers admins of other computers") grid.setheaders(["Computer Admin", "Computers count", "Paths to domain admin"]) - grid.setData(json.dumps(computers_admin_data_grid)) + grid.setData(json.dumps(self.computers_admin_data_grid)) page.addComponent(grid) page.render() diff --git a/ad_miner/sources/modules/description.json b/ad_miner/sources/modules/description.json index a194781..3e4a267 100644 --- a/ad_miner/sources/modules/description.json +++ b/ad_miner/sources/modules/description.json @@ -274,7 +274,7 @@ "users_GPO_access": { "title": "GPOs that can be edited by unprivileged users", - "description": "GPOs that can be edited by unprivileged users", + "description": "GPOs that can be edited by unprivileged users.
: Full domain or at least one domain admin as target.
: At least one object admin of a computer.
: At least one object as target.
: No direct target.", "risk": "If an AD object has rights over a GPO, it can potentially cause damage over all the objects affected by the GPO. GPOs can also be leveraged to gain privileges in the domain(s). If an attacker exploits one of these paths, they will be able to gain privileges in the domain(s) and cause some serious damage.", "poa": "Review the paths, make sure they are not exploitable. If they are, cut the link between the Active Directory objects in order to reduce the attack surface." }, diff --git a/ad_miner/sources/modules/domains.py b/ad_miner/sources/modules/domains.py index f61c6c3..6884eb1 100755 --- a/ad_miner/sources/modules/domains.py +++ b/ad_miner/sources/modules/domains.py @@ -1,8 +1,6 @@ -import json import time from urllib.parse import quote -from os.path import sep from ad_miner.sources.modules import generic_formating from ad_miner.sources.modules import logger @@ -238,7 +236,6 @@ def __init__(self, arguments, neo4j): self.generateDomainMapTrust() - self.get_unpriv_users_to_GPO() self.get_domain_OUs() self.genDAPage() self.genInsufficientForestDomainsLevels() @@ -1316,253 +1313,6 @@ def findAndCreatePathToDaFromUsersList(self, admin_user, computers): ) return (len(path_to_generate), len(list_domain)) - def get_unpriv_users_to_GPO(self): - if self.arguments.gpo_low and self.unpriv_users_to_GPO is None: - return - if not self.arguments.gpo_low: - fail = [] - if self.unpriv_users_to_GPO_init is None: - fail.append("unpriv_users_to_GPO_init") - elif self.unpriv_users_to_GPO_user_enforced is None: - fail.append("unpriv_users_to_GPO_user_enforced") - elif self.unpriv_users_to_GPO_user_not_enforced is None: - fail.append("unpriv_users_to_GPO_user_not_enforced") - elif self.unpriv_users_to_GPO_computer_enforced is None: - fail.append("unpriv_users_to_GPO_computer_enforced") - elif self.unpriv_users_to_GPO_computer_not_enforced is None: - fail.append("unpriv_users_to_GPO_computer_not_enforced") - - if 0 < len(fail) < 5: # if only some of them are disabled - logger.print_error( - f" In order to use 'normal GPO mode', please activate the following in config.json : {', '.join(fail)}" - ) - - if len(fail) > 0: - return - - def parseGPOData(listOfPaths, headers): - """ - Initial parsing of data from neo4j requests for GPO - """ - dictOfGPO = {} - for path in listOfPaths: - start = path.nodes[0] - end = path.nodes[-1] - if "GPO" in start.labels: - nameOfGPO = start.name - idOfGPO = start.id - sens = "right" - elif "GPO" in end.labels: - nameOfGPO = end.name - idOfGPO = end.id - sens = "left" - else: - continue - try: - if sens == "right": - dictOfGPO[nameOfGPO][headers[4]] += 1 - dictOfGPO[nameOfGPO]["right_path"].append(path) - dictOfGPO[nameOfGPO]["end_list"].append(end.name) - elif sens == "left": - dictOfGPO[nameOfGPO][headers[1]] += 1 - dictOfGPO[nameOfGPO]["left_path"].append(path) - dictOfGPO[nameOfGPO]["entry_list"].append(start.name) - else: - continue - except KeyError: - if sens == "right": - dictOfGPO[nameOfGPO] = { - headers[0]: nameOfGPO, - headers[1]: 0, - headers[4]: 1, - "left_path": [], - "right_path": [path], - "id": idOfGPO, - "entry_list": [], - "end_list": [end.name], - } - elif sens == "left": - dictOfGPO[nameOfGPO] = { - headers[0]: nameOfGPO, - headers[1]: 1, - headers[4]: 0, - "left_path": [path], - "right_path": [], - "id": idOfGPO, - "entry_list": [start.name], - "end_list": [], - } - else: - continue - return dictOfGPO - - def formatGPOGrid(dictOfGPO, headers): - output = [] - for _, dict in dictOfGPO.items(): - # Get number of domains - domains = [] - for path in dict["right_path"]: - #for relation in path.relationships: - for i in range(len(path.nodes)): - if path.nodes[i].labels == "Domain": - domains.append(path.nodes[i].name) - # if dict[headers[4]] > 0: - self.number_of_gpo += 1 - - nbDomains = len(list(set(domains))) - sortClass = str(nbDomains).zfill(6) - icon = ( - '' - % sortClass - ) - - output.append( - { - headers[0]: dict[headers[0]], - headers[1]: dict[headers[1]], - headers[2]: { - "link": "users_GPO_access-%s-left-graph.html" - % (quote(str(dict[headers[0]]).replace(sep, '_'))), - "value": "", - }, - headers[3]: { - "link": "users_GPO_access-%s-left-grid.html" - % (quote(str(dict[headers[0]]).replace(sep, '_'))), - "value": "", - }, - headers[4]: len(list(set(dict["end_list"]))), - headers[5]: "%s %d" % (icon, nbDomains), - headers[6]: { - "link": "users_GPO_access-%s-right-graph.html" - % (quote(str(dict[headers[0]]).replace(sep, '_'))), - "value": "", - }, - headers[7]: { - "link": "users_GPO_access-%s-right-grid.html" - % (quote(str(dict[headers[0]]).replace(sep, '_'))), - "value": "", - }, - } - ) - return output - - def formatSmallGrid(list, gpo_name): - output = [] - for n in list: - output.append({gpo_name: n}) - return output - - headers = [ - "GPO name", - "Paths to GPO", - "Inbound graph", - "Inbound list", - "Objects impacted", - "Domains impacted", - "Outbound graph", - "Outbound list", - ] - if not self.arguments.gpo_low: - data = ( - self.unpriv_users_to_GPO_init - + self.unpriv_users_to_GPO_user_enforced - + self.unpriv_users_to_GPO_computer_enforced - + self.unpriv_users_to_GPO_user_not_enforced - + self.unpriv_users_to_GPO_computer_not_enforced - ) - self.unpriv_users_to_GPO_parsed = parseGPOData(data, headers) - grid = Grid("Users with GPO access") - else: - self.unpriv_users_to_GPO_parsed = parseGPOData( - self.unpriv_users_to_GPO, headers - ) - grid = Grid("Users with GPO access") - - formated_data = sorted( - formatGPOGrid(self.unpriv_users_to_GPO_parsed, headers), - key=lambda x: x[headers[1]], - reverse=True, - ) - page = Page( - self.arguments.cache_prefix, - "users_GPO_access", - "Exploitation through GPO", - "users_GPO_access", - ) - - grid.setheaders(headers) - grid.setData(json.dumps(formated_data)) - page.addComponent(grid) - page.render() - - for _, GPO in self.unpriv_users_to_GPO_parsed.items(): - url_left_graph = "users_GPO_access-%s-left-graph" % GPO[headers[0]] - url_right_graph = "users_GPO_access-%s-right-graph" % GPO[headers[0]] - page_left_graph = Page( - self.arguments.cache_prefix, - url_left_graph, - "Users with write access on GPO", - "graph_GPO_access", - ) - page_right_graph = Page( - self.arguments.cache_prefix, - url_right_graph, - "Objects impacted by GPO", - "graph_GPO_access", - ) - - url_left_grid = "users_GPO_access-%s-left-grid" % GPO[headers[0]] - url_right_grid = "users_GPO_access-%s-right-grid" % GPO[headers[0]] - page_left_grid = Page( - self.arguments.cache_prefix, - url_left_grid, - "List of users able to compromise %s" % GPO[headers[0]], - "grid_GPO_access", - ) - page_right_grid = Page( - self.arguments.cache_prefix, - url_right_grid, - "List of users impacted by %s" % GPO[headers[0]], - "grid_GPO_access", - ) - - # if GPO[headers[4]] > 0: - graph_left = Graph() - graph_left.setPaths(GPO["left_path"]) - page_left_graph.addComponent(graph_left) - - graph_right = Graph() - graph_right.setPaths(GPO["right_path"]) - page_right_graph.addComponent(graph_right) - - if not self.arguments.gpo_low: - entry_grid = Grid("List of users able to compromise %s" % GPO[headers[0]]) - else: - entry_grid = Grid("List of users able to compromise %s" % GPO[headers[0]]) - entry_grid.setheaders([GPO[headers[0]]]) - entry_grid.setData( - json.dumps( - formatSmallGrid( - list(set(GPO["entry_list"])), GPO[headers[0]]) - ) - ) - page_left_grid.addComponent(entry_grid) - - if not self.arguments.gpo_low: - end_grid = Grid("List of users impacted by %s" % GPO[headers[0]]) - else: - end_grid = Grid("List of users impacted by %s" % GPO[headers[0]]) - end_grid.setheaders([GPO[headers[0]]]) - end_grid.setData( - json.dumps(formatSmallGrid( - list(set(GPO["end_list"])), GPO[headers[0]])) - ) - page_right_grid.addComponent(end_grid) - - page_left_graph.render() - page_right_graph.render() - page_left_grid.render() - page_right_grid.render() @staticmethod def generatePathToOUHandlers(self): diff --git a/ad_miner/sources/modules/objects.py b/ad_miner/sources/modules/objects.py index 0e64f24..efb352f 100755 --- a/ad_miner/sources/modules/objects.py +++ b/ad_miner/sources/modules/objects.py @@ -1,4 +1,7 @@ import time +import json + +from os.path import sep from ad_miner.sources.modules import logger from ad_miner.sources.modules.graph_class import Graph @@ -12,9 +15,12 @@ class Objects: - def __init__(self, arguments, neo4j): + def __init__(self, arguments, neo4j, domain, computers, users): self.arguments = arguments self.neo4j = neo4j + self.domain = domain + self.computers = computers + self.users = users self.start = time.time() logger.print_debug("Computing other objects") @@ -25,6 +31,8 @@ def __init__(self, arguments, neo4j): self.users_nb_domain_admins = neo4j.all_requests["nb_domain_admins"]["result"] + self.get_unpriv_users_to_GPO() + end_nodes = [] # Check if dcsync path is activated or not if self.objects_to_dcsync == None: @@ -177,4 +185,288 @@ def genNodesDCsyncLightPage(self, neo4j): grid.setheaders(headers) grid.setData(data) page.addComponent(grid) - page.render() \ No newline at end of file + page.render() + + + def get_unpriv_users_to_GPO(self): + if self.arguments.gpo_low and self.domain.unpriv_users_to_GPO is None: + return + if not self.arguments.gpo_low: + fail = [] + if self.domain.unpriv_users_to_GPO_init is None: + fail.append("unpriv_users_to_GPO_init") + elif self.domain.unpriv_users_to_GPO_user_enforced is None: + fail.append("unpriv_users_to_GPO_user_enforced") + elif self.domain.unpriv_users_to_GPO_user_not_enforced is None: + fail.append("unpriv_users_to_GPO_user_not_enforced") + elif self.domain.unpriv_users_to_GPO_computer_enforced is None: + fail.append("unpriv_users_to_GPO_computer_enforced") + elif self.domain.unpriv_users_to_GPO_computer_not_enforced is None: + fail.append("unpriv_users_to_GPO_computer_not_enforced") + + if 0 < len(fail) < 5: # if only some of them are disabled + logger.print_error( + f" In order to use 'normal GPO mode', please activate the following in config.json : {', '.join(fail)}" + ) + + if len(fail) > 0: + return + + def parseGPOData(listOfPaths, headers): + """ + Initial parsing of data from neo4j requests for GPO + """ + dictOfGPO = {} + for path in listOfPaths: + start = path.nodes[0] + end = path.nodes[-1] + if "GPO" in start.labels: + nameOfGPO = start.name + idOfGPO = start.id + sens = "right" + elif "GPO" in end.labels: + nameOfGPO = end.name + idOfGPO = end.id + sens = "left" + else: + continue + try: + if sens == "right": + dictOfGPO[nameOfGPO][headers[4]] += 1 + dictOfGPO[nameOfGPO]["right_path"].append(path) + dictOfGPO[nameOfGPO]["end_list"].append((end.name, end.labels)) + elif sens == "left": + dictOfGPO[nameOfGPO][headers[1]] += 1 + dictOfGPO[nameOfGPO]["left_path"].append(path) + dictOfGPO[nameOfGPO]["entry_list"].append((start.name, start.labels)) + else: + continue + except KeyError: + if sens == "right": + dictOfGPO[nameOfGPO] = { + headers[0]: nameOfGPO, + headers[1]: 0, + headers[4]: 1, + "left_path": [], + "right_path": [path], + "id": idOfGPO, + "entry_list": [], + "end_list": [(end.name, end.labels)], + } + elif sens == "left": + dictOfGPO[nameOfGPO] = { + headers[0]: nameOfGPO, + headers[1]: 1, + headers[4]: 0, + "left_path": [path], + "right_path": [], + "id": idOfGPO, + "entry_list": [(start.name, start.labels)], + "end_list": [], + } + else: + continue + return dictOfGPO + + def formatGPOGrid(dictOfGPO, headers): + output = [] + + # Extract all computers admin of computers + self.computers_with_admin_rights = [d["Computer Admin"].split(' ')[-1] for d in self.computers.computers_admin_data_grid] + # Extract all users admin of computers + self.users_with_admin_rights = [d["User"].split(' ')[-1] for d in self.users.users_admin_of_computers] + + for _, dict in dictOfGPO.items(): + self.domain.number_of_gpo += 1 + # Rate the interest of the GPO + # 0 star : no object impacted + # 1 star : at least one object impacted + # 2 stars : at least one admin account impacted + # 3 stars : full domain or at least one domain admin impacted + paths = dict["right_path"] + + if len(paths) == 0: + interest = 0 + else: + interest = 1 + + for path in paths: + for i in range(len(path.nodes)): + if path.nodes[i].labels == "Domain": + interest = 3 + break + if path.nodes[i].name in self.domain.admin_list: + interest = 3 + break + if path.nodes[i].name in self.users_with_admin_rights or path.nodes[i].name in self.computers_with_admin_rights: + interest = max(2, interest) + + # Color for stars + if interest == 3: + color = "red" + elif interest == 2: + color = "orange" + else: + color = "green" + + icon = f""*interest + f""*(3-interest) + + output.append( + { + headers[0]: ' ' + dict[headers[0]], + headers[1]: f' ' + str(dict[headers[1]]), + headers[2]: { + "link": "users_GPO_access-%s-left-graph.html" + % (quote(str(dict[headers[0]]).replace(sep, '_'))), + "value": "", + }, + headers[3]: { + "link": "users_GPO_access-%s-left-grid.html" + % (quote(str(dict[headers[0]]).replace(sep, '_'))), + "value": "", + }, + headers[4]: f' ' + str(len(list(set(dict["end_list"])))), + headers[5]: icon, + headers[6]: { + "link": "users_GPO_access-%s-right-graph.html" + % (quote(str(dict[headers[0]]).replace(sep, '_'))), + "value": "", + }, + headers[7]: { + "link": "users_GPO_access-%s-right-grid.html" + % (quote(str(dict[headers[0]]).replace(sep, '_'))), + "value": "", + }, + } + ) + return output + + def formatSmallGrid(list, gpo_name): + output = [] + for n in list: + if n[1] == "Computer": + icon = ' ' + elif n[1] == "User": + icon = ' ' + elif n[1] == "Domain": + icon = ' ' + else: + icon = ' ' + + if n[0] in self.computers_with_admin_rights or n[0] in self.users_with_admin_rights: + icon = icon + ' ' + if n[0] in self.domain.admin_list: + icon = ' ' + + output.append({gpo_name: icon + n[0]}) + return output + + headers = [ + "GPO name", + "Paths to GPO", + "Inbound graph", + "Inbound list", + "Objects impacted", + "Targets interest", + "Outbound graph", + "Outbound list", + ] + if not self.arguments.gpo_low: + data = ( + self.domain.unpriv_users_to_GPO_init + + self.domain.unpriv_users_to_GPO_user_enforced + + self.domain.unpriv_users_to_GPO_computer_enforced + + self.domain.unpriv_users_to_GPO_user_not_enforced + + self.domain.unpriv_users_to_GPO_computer_not_enforced + ) + self.domain.unpriv_users_to_GPO_parsed = parseGPOData(data, headers) + grid = Grid("Users with GPO access") + else: + self.domain.unpriv_users_to_GPO_parsed = parseGPOData( + self.domain.unpriv_users_to_GPO, headers + ) + grid = Grid("Users with GPO access") + + formated_data = sorted( + formatGPOGrid(self.domain.unpriv_users_to_GPO_parsed, headers), + key=lambda x: x[headers[1]], + reverse=True, + ) + page = Page( + self.domain.arguments.cache_prefix, + "users_GPO_access", + "Exploitation through GPO", + "users_GPO_access", + ) + + grid.setheaders(headers) + grid.setData(json.dumps(formated_data)) + page.addComponent(grid) + page.render() + + for _, GPO in self.domain.unpriv_users_to_GPO_parsed.items(): + url_left_graph = "users_GPO_access-%s-left-graph" % GPO[headers[0]] + url_right_graph = "users_GPO_access-%s-right-graph" % GPO[headers[0]] + page_left_graph = Page( + self.arguments.cache_prefix, + url_left_graph, + "Users with write access on GPO", + "graph_GPO_access", + ) + page_right_graph = Page( + self.arguments.cache_prefix, + url_right_graph, + "Objects impacted by GPO", + "graph_GPO_access", + ) + + url_left_grid = "users_GPO_access-%s-left-grid" % GPO[headers[0]] + url_right_grid = "users_GPO_access-%s-right-grid" % GPO[headers[0]] + page_left_grid = Page( + self.arguments.cache_prefix, + url_left_grid, + "List of users able to compromise %s" % GPO[headers[0]], + "grid_GPO_access", + ) + page_right_grid = Page( + self.arguments.cache_prefix, + url_right_grid, + "List of users impacted by %s" % GPO[headers[0]], + "grid_GPO_access", + ) + + # if GPO[headers[4]] > 0: + graph_left = Graph() + graph_left.setPaths(GPO["left_path"]) + page_left_graph.addComponent(graph_left) + + graph_right = Graph() + graph_right.setPaths(GPO["right_path"]) + page_right_graph.addComponent(graph_right) + + if not self.arguments.gpo_low: + entry_grid = Grid("List of users able to compromise %s" % GPO[headers[0]]) + else: + entry_grid = Grid("List of users able to compromise %s" % GPO[headers[0]]) + entry_grid.setheaders([GPO[headers[0]]]) + entry_grid.setData( + json.dumps( + formatSmallGrid( + list(set(GPO["entry_list"])), GPO[headers[0]]) + ) + ) + page_left_grid.addComponent(entry_grid) + + + end_grid = Grid("List of users impacted by %s" % GPO[headers[0]]) + end_grid.setheaders([GPO[headers[0]]]) + end_grid.setData( + json.dumps(formatSmallGrid( + list(set(GPO["end_list"])), GPO[headers[0]])) + ) + page_right_grid.addComponent(end_grid) + + page_left_graph.render() + page_right_graph.render() + page_left_grid.render() + page_right_grid.render() \ No newline at end of file