diff --git a/README.md b/README.md index 40e1ea8c..270b3838 100644 --- a/README.md +++ b/README.md @@ -65,20 +65,20 @@ There is a docker build file if you want to run the scanner from a container: ### Command-line options ``` -usage: gcp-scanner -o /folder_to_save_results/ -g - +usage: python3 scanner.py -o folder_to_save_results -g - GCP Scanner options: -h, --help show this help message and exit + -ls, --light-scan Return only the most important GCP resource fields in the output. -k KEY_PATH, --sa-key-path KEY_PATH Path to directory with SA keys in json format -g GCLOUD_PROFILE_PATH, --gcloud-profile-path GCLOUD_PROFILE_PATH Path to directory with gcloud profile. Specify - to search for credentials in default gcloud config path -m, --use-metadata Extract credentials from GCE instance metadata -at ACCESS_TOKEN_FILES, --access-token-files ACCESS_TOKEN_FILES - A list of comma separated files with access token and OAuth scopes.TTL limited. A token and scopes should be stored in JSON - format. + A list of comma separated files with access token and OAuth scopes.TTL limited. A token and scopes should be stored in JSON format. -rt REFRESH_TOKEN_FILES, --refresh-token-files REFRESH_TOKEN_FILES A list of comma separated files with refresh_token, client_id,token_uri and client_secret stored in JSON format. -s KEY_NAME, --service-account KEY_NAME @@ -89,10 +89,14 @@ options: Comma separated list of project names to include in the scan -c CONFIG_PATH, --config CONFIG_PATH A path to config file with a set of specific resources to scan. - -l {INFO,WARNING,ERROR}, --logging {INFO,WARNING,ERROR} + -l {DEBUG,INFO,WARNING,ERROR,CRITICAL}, --logging {DEBUG,INFO,WARNING,ERROR,CRITICAL} Set logging level (INFO, WARNING, ERROR) - -lf LOG_DIRECTORY, --log-file LOG_DIRECTORY + -lf LOG_FILE, --log-file LOG_FILE Save logs to the path specified rather than displaying in console + -pwc PROJECT_WORKER_COUNT, --project-worker-count PROJECT_WORKER_COUNT + Set limit for project crawlers run in parallel. + -rwc RESOURCE_WORKER_COUNT, --resource-worker-count RESOURCE_WORKER_COUNT + Set limit for resource crawlers run in parallel. Required parameters: -o OUTPUT, --output-dir OUTPUT diff --git a/src/gcp_scanner/arguments.py b/src/gcp_scanner/arguments.py index 4147aaae..bfce9d07 100644 --- a/src/gcp_scanner/arguments.py +++ b/src/gcp_scanner/arguments.py @@ -129,11 +129,17 @@ def arg_parser(): help='Save logs to the path specified rather than displaying in\ console') parser.add_argument( - '-wc', - '--worker-count', + '-pwc', + '--project-worker-count', default=1, - dest='worker_count', - help='Set limit for workers run in parallel.') + dest='project_worker_count', + help='Set limit for project crawlers run in parallel.') + parser.add_argument( + '-rwc', + '--resource-worker-count', + default=1, + dest='resource_worker_count', + help='Set limit for resource crawlers run in parallel.') args: argparse.Namespace = parser.parse_args() diff --git a/src/gcp_scanner/models.py b/src/gcp_scanner/models.py index 1ce9ff35..8ae34ecc 100644 --- a/src/gcp_scanner/models.py +++ b/src/gcp_scanner/models.py @@ -58,7 +58,7 @@ def __init__( sa_name, credentials, chain_so_far, - worker_count + resource_worker_count ): self.project = project self.sa_results = sa_results @@ -70,4 +70,4 @@ def __init__( self.sa_name = sa_name self.credentials = credentials self.chain_so_far = chain_so_far - self.worker_count = worker_count + self.resource_worker_count = resource_worker_count diff --git a/src/gcp_scanner/scanner.py b/src/gcp_scanner/scanner.py index 982919ad..f2280f05 100644 --- a/src/gcp_scanner/scanner.py +++ b/src/gcp_scanner/scanner.py @@ -13,20 +13,18 @@ # limitations under the License. -"""The main module that initiates scanning of GCP resources. - -""" +"""The main module that initiates scanning of GCP resources.""" import collections -import concurrent +from datetime import datetime import json +from json.decoder import JSONDecodeError import logging -import multiprocessing import os -import sys -from datetime import datetime -from json.decoder import JSONDecodeError from pathlib import Path -from typing import List, Dict, Optional, Union, Any +import sys +import threading +import time +from typing import Any, Dict, List, Optional, Union from google.auth.exceptions import MalformedError from google.cloud import container_v1 @@ -39,61 +37,88 @@ from . import models from . import scanner from .client.client_factory import ClientFactory -from .crawler.crawler_factory import CrawlerFactory from .crawler import misc_crawler +from .crawler.crawler_factory import CrawlerFactory # We define the schema statically to make it easier for the user and avoid extra # config files. LIGHT_VERSION_SCAN_SCHEMA = { - 'compute_instances': ['name', 'zone', 'machineType', 'networkInterfaces', - 'status'], - 'compute_images': ['name', 'status', 'diskSizeGb', 'sourceDisk'], - 'machine_images': ['name', 'description', 'status', 'sourceInstance', - 'totalStorageBytes', 'savedDisks'], - 'compute_disks': ['name', 'sizeGb', 'zone', 'status', 'sourceImage', 'users'], - 'compute_snapshots': ['name', 'status', 'sourceDisk', 'downloadBytes'], - 'managed_zones': ['name', 'dnsName', 'description', 'nameServers'], - 'sql_instances': ['name', 'region', 'ipAddresses', 'databaseVersion', - 'state'], - 'cloud_functions': ['name', 'eventTrigger', 'status', 'entryPoint', - 'serviceAccountEmail'], - 'kms': ['name', 'primary', 'purpose', 'createTime'], - 'services': ['name'], + 'compute_instances': [ + 'name', + 'zone', + 'machineType', + 'networkInterfaces', + 'status', + ], + 'compute_images': ['name', 'status', 'diskSizeGb', 'sourceDisk'], + 'machine_images': [ + 'name', + 'description', + 'status', + 'sourceInstance', + 'totalStorageBytes', + 'savedDisks', + ], + 'compute_disks': [ + 'name', + 'sizeGb', + 'zone', + 'status', + 'sourceImage', + 'users', + ], + 'compute_snapshots': ['name', 'status', 'sourceDisk', 'downloadBytes'], + 'managed_zones': ['name', 'dnsName', 'description', 'nameServers'], + 'sql_instances': [ + 'name', + 'region', + 'ipAddresses', + 'databaseVersion', + 'state', + ], + 'cloud_functions': [ + 'name', + 'eventTrigger', + 'status', + 'entryPoint', + 'serviceAccountEmail', + ], + 'kms': ['name', 'primary', 'purpose', 'createTime'], + 'services': ['name'], } # The following map is used to establish the relationship between # crawlers and clients. It determines the appropriate crawler and # client to be selected from the respective factory classes. CRAWL_CLIENT_MAP = { - 'app_services': 'appengine', - 'bigtable_instances': 'bigtableadmin', - 'bq': 'bigquery', - 'cloud_functions': 'cloudfunctions', - 'compute_disks': 'compute', - 'compute_images': 'compute', - 'compute_instances': 'compute', - 'compute_snapshots': 'compute', - 'datastore_kinds': 'datastore', - 'dns_policies': 'dns', - 'endpoints': 'servicemanagement', - 'firestore_collections': 'firestore', - 'filestore_instances': 'file', - 'firewall_rules': 'compute', - 'iam_policy': 'cloudresourcemanager', - 'kms': 'cloudkms', - 'machine_images': 'compute', - 'managed_zones': 'dns', - 'project_info': 'cloudresourcemanager', - 'pubsub_subs': 'pubsub', - 'registered_domains': 'domains', - 'services': 'serviceusage', - 'service_accounts': 'iam', - 'sourcerepos': 'sourcerepo', - 'spanner_instances': 'spanner', - 'sql_instances': 'sqladmin', - 'static_ips': 'compute', - 'storage_buckets': 'storage', - 'subnets': 'compute', + 'app_services': 'appengine', + 'bigtable_instances': 'bigtableadmin', + 'bq': 'bigquery', + 'cloud_functions': 'cloudfunctions', + 'compute_disks': 'compute', + 'compute_images': 'compute', + 'compute_instances': 'compute', + 'compute_snapshots': 'compute', + 'datastore_kinds': 'datastore', + 'dns_policies': 'dns', + 'endpoints': 'servicemanagement', + 'firestore_collections': 'firestore', + 'filestore_instances': 'file', + 'firewall_rules': 'compute', + 'iam_policy': 'cloudresourcemanager', + 'kms': 'cloudkms', + 'machine_images': 'compute', + 'managed_zones': 'dns', + 'pubsub_subs': 'pubsub', + 'registered_domains': 'domains', + 'services': 'serviceusage', + 'service_accounts': 'iam', + 'sourcerepos': 'sourcerepo', + 'spanner_instances': 'spanner', + 'sql_instances': 'sqladmin', + 'static_ips': 'compute', + 'storage_buckets': 'storage', + 'subnets': 'compute', } @@ -134,27 +159,64 @@ def save_results(res_data: Dict, res_path: str, is_light: bool): outfile.write(sa_results_data) -def get_crawl(crawler, project_id, client, crawler_config): - return crawler.crawl(project_id, client, crawler_config) +def get_crawl( + crawler: Any, + project_id: str, + client: Any, + crawler_config: dict, + scan_results: dict, + crawler_name: str, +): + """The function calls the crawler and returns result in dictionary + + Args: + crawler: crawler method to start + project_id: id of a project to scan + client: appropriate client method + crawler_config: a dictionary containing specific parameters for a crawler + scan_results: a dictionary to save scanning results + crawler_name: name of a crawler + + Returns: + scan_result: a dictionary with scanning results + """ + + res = crawler.crawl(project_id, client, crawler_config) + if res is not None and len(res) != 0: + scan_results[crawler_name] = res + return scan_results def get_resources(project: models.ProjectInfo): - """The function crawls the data for a project and stores the results in a + """The function crawls the data for a project and stores the results in a + dictionary. - Args: + Args: project: class to store project scan configration """ - if project.target_project and \ - project.target_project not in project.project['projectId']: + if ( + project.target_project + and project.target_project not in project.project['projectId'] + ): return project_id = project.project['projectId'] print(f'Inspecting project {project_id}') - project_result = project.sa_results['projects'][project_id] + project_result = dict() project_result['project_info'] = project.project + project_result['service_account_chain'] = project.sa_results[ + 'service_account_chain' + ] + project_result['current_service_account'] = project.sa_results[ + 'current_service_account' + ] + project_result['token_scopes'] = project.sa_results['token_scopes'] + project_result['service_account_edges'] = project.sa_results[project_id][ + 'service_account_edges' + ] # Fail with error if the output file already exists output_file_name = f'{project_id}-{project.scan_time_suffix}.json' @@ -166,71 +228,90 @@ def get_resources(project: models.ProjectInfo): pass except FileExistsError: - logging.error('Try removing the %s file and restart the scanner.', - output_file_name) - - results_crawl_pool = dict() - with concurrent.futures.ThreadPoolExecutor( - max_workers=int(project.worker_count)) as executor: - for crawler_name, client_name in CRAWL_CLIENT_MAP.items(): - if is_set(project.scan_config, crawler_name): - crawler_config = {} - if project.scan_config is not None: - crawler_config = project.scan_config.get(crawler_name) - # add gcs output path to the config. - # this path is used by the storage bucket crawler as of now. - crawler_config['gcs_output_path'] = gcs_output_path - # crawl the data - crawler = CrawlerFactory.create_crawler(crawler_name) - client = ClientFactory.get_client(client_name).get_service( + logging.error( + 'Try removing the %s file and restart the scanner.', output_file_name + ) + + threads_list = list() + for crawler_name, client_name in CRAWL_CLIENT_MAP.items(): + if is_set(project.scan_config, crawler_name): + crawler_config = {} + if project.scan_config is not None: + crawler_config = project.scan_config.get(crawler_name) + + # add gcs output path to the config. + # this path is used by the storage bucket crawler as of now. + crawler_config['gcs_output_path'] = gcs_output_path + + # crawl the data + crawler = CrawlerFactory.create_crawler(crawler_name) + client = ClientFactory.get_client(client_name).get_service( project.credentials, - ) - results_crawl_pool[crawler_name] = executor.submit( - get_crawl, - crawler, - project_id, - client, - crawler_config, - ) + ) + + t = threading.Thread( + target=get_crawl, + args=( + crawler, + project_id, + client, + crawler_config, + project_result, + crawler_name, + ), + ) + t.daemon = True + t.start() + threads_list.append(t) + + while True: + active_threads = 0 + for t in threads_list: + if t.is_alive(): + active_threads += 1 + if active_threads >= project.resource_worker_count: + time.sleep(0.1) + else: + break - for crawler_name, future_obj in results_crawl_pool.items(): - project_result[crawler_name] = future_obj.result() + for t in threads_list: + t.join() # Call other miscellaneous crawlers here if is_set(project.scan_config, 'gke_clusters'): gke_client = gke_client_for_credentials(project.credentials) - project_result['gke_clusters'] = misc_crawler.get_gke_clusters( - project_id, - gke_client, + res = misc_crawler.get_gke_clusters( + project_id, + gke_client, ) + if res is not None and len(res) != 0: + project_result['gke_clusters'] = res if is_set(project.scan_config, 'gke_images'): - project_result['gke_images'] = misc_crawler.get_gke_images( - project_id, - project.credentials.token, + res = misc_crawler.get_gke_images( + project_id, + project.credentials.token, ) + if res is not None and len(res) != 0: + project_result['gke_images'] = res logging.info('Saving results for %s into the file', project_id) - - save_results(project.sa_results, output_path, project.light_scan) - # Clean memory to avoid leak for large amount projects. - project.sa_results.clear() + save_results(project_result, output_path, project.light_scan) def impersonate_service_accounts( - context, - project, - scan_config, - sa_results, - chain_so_far, - sa_name, - credentials + context, + project, + scan_config, + sa_results, + chain_so_far, + sa_name, + credentials, ): - """The function enumerates projects accessible by SA and impersonates them. - """ + """The function enumerates projects accessible by SA and impersonates them.""" # Enumerate projects accessible by SA project_id = project['projectId'] - print(f'Inspecting project {project_id} for Impersonation') + print(f'Looking for impersonation options in {project_id}') project_result = sa_results['projects'][project_id] project_result['project_info'] = project # Iterate over discovered service accounts by attempting impersonation @@ -247,10 +328,10 @@ def impersonate_service_accounts( iam_client = iam_client_for_credentials(credentials) if is_set(scan_config, 'iam_policy') is False: iam_policy = CrawlerFactory.create_crawler('iam_policy').crawl( - project_id, - ClientFactory.get_client('cloudresourcemanager').get_service( - credentials, - ), + project_id, + ClientFactory.get_client('cloudresourcemanager').get_service( + credentials, + ), ) project_service_accounts = get_sas_for_impersonation(iam_policy) @@ -258,29 +339,36 @@ def impersonate_service_accounts( try: logging.info('Trying %s', candidate_service_account) creds_impersonated = credsdb.impersonate_sa( - iam_client, candidate_service_account) + iam_client, candidate_service_account + ) context.service_account_queue.put( - (candidate_service_account, creds_impersonated, updated_chain)) + (candidate_service_account, creds_impersonated, updated_chain) + ) project_result['service_account_edges'].append( - candidate_service_account) - logging.info('Successfully impersonated %s using %s', - candidate_service_account, sa_name) + candidate_service_account + ) + logging.info( + 'Successfully impersonated %s using %s', + candidate_service_account, + sa_name, + ) except Exception: - logging.error('Failed to get token for %s', - candidate_service_account) + logging.error('Failed to get token for %s', candidate_service_account) logging.error(sys.exc_info()[1]) def iam_client_for_credentials( - credentials: Credentials) -> IAMCredentialsClient: + credentials: Credentials, +) -> IAMCredentialsClient: return iam_credentials.IAMCredentialsClient(credentials=credentials) def gke_client_for_credentials( - credentials: Credentials + credentials: Credentials, ) -> container_v1.services.cluster_manager.client.ClusterManagerClient: return container_v1.services.cluster_manager.ClusterManagerClient( - credentials=credentials) + credentials=credentials + ) def get_sa_details_from_key_files(key_path): @@ -309,8 +397,7 @@ def get_sa_details_from_key_files(key_path): return sa_details -def get_sas_for_impersonation( - iam_policy: List[Dict[str, Any]]) -> List[str]: +def get_sas_for_impersonation(iam_policy: List[Dict[str, Any]]) -> List[str]: """Extract a list of unique SAs from IAM policy associated with project. Args: @@ -344,7 +431,8 @@ def infinite_defaultdict(): def get_sa_tuples(args): - """The function extracts service account (SA) credentials from various + """The function extracts service account (SA) credentials from various + sources and returns a list of tuples. """ @@ -376,8 +464,9 @@ def get_sa_tuples(args): continue logging.info('Retrieving credentials for %s', account_name) - credentials = credsdb.get_creds_from_data(access_token, - json.loads(account_creds)) + credentials = credsdb.get_creds_from_data( + access_token, json.loads(account_creds) + ) if credentials is None: logging.error('Failed to retrieve access token for %s', account_name) continue @@ -406,19 +495,22 @@ def get_sa_tuples(args): return sa_tuples + def main(): - """The main scanner loop for GCP Scanner - """ + """The main scanner loop for GCP Scanner""" logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR) logging.getLogger('googleapiclient.http').setLevel(logging.ERROR) args = arguments.arg_parser() - logging.basicConfig(level=getattr(logging, args.log_level.upper(), None), - format='%(asctime)s - %(levelname)s - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', - filename=args.log_file, filemode='a') + logging.basicConfig( + level=getattr(logging, args.log_level.upper(), None), + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + filename=args.log_file, + filemode='a', + ) force_projects_list = list() if args.force_projects: @@ -436,7 +528,7 @@ def main(): context = models.SpiderContext(sa_tuples) - project_queue = multiprocessing.Queue() + project_queue = list() processed_sas = set() while not context.service_account_queue.empty(): @@ -457,9 +549,11 @@ def main(): sa_results['token_scopes'] = credentials.scopes project_list = CrawlerFactory.create_crawler( - 'project_list', + 'project_list', ).crawl( - ClientFactory.get_client('cloudresourcemanager').get_service(credentials), + ClientFactory.get_client('cloudresourcemanager').get_service( + credentials + ), ) if len(project_list) <= 0: @@ -468,53 +562,70 @@ def main(): if force_projects_list: for force_project_id in force_projects_list: res = CrawlerFactory.create_crawler( - 'project_info', + 'project_info', ).crawl( - force_project_id, - ClientFactory.get_client('cloudresourcemanager').get_service( - credentials, - ), + force_project_id, + ClientFactory.get_client('cloudresourcemanager').get_service( + credentials, + ), ) if res: project_list.append(res) else: # force object creation anyway - project_list.append({'projectId': force_project_id, - 'projectNumber': 'N/A'}) + project_list.append( + {'projectId': force_project_id, 'projectNumber': 'N/A'} + ) # Enumerate projects accessible by SA for project in project_list: project_obj = models.ProjectInfo( - project, - sa_results, - args.output, - scan_config, - args.light_scan, - args.target_project, - scan_time_suffix, - sa_name, - credentials, - chain_so_far, - args.worker_count + project, + sa_results, + args.output, + scan_config, + args.light_scan, + args.target_project, + scan_time_suffix, + sa_name, + credentials, + chain_so_far, + int(args.resource_worker_count), ) - project_queue.put(project_obj) + project_queue.append(project_obj) impersonate_service_accounts( - context, - project, - scan_config, - sa_results, - chain_so_far, - sa_name, - credentials + context, + project, + scan_config, + sa_results, + chain_so_far, + sa_name, + credentials, ) - pool = multiprocessing.Pool( - processes=min(int(args.worker_count), os.cpu_count())) + all_thread_handles = list() - while not project_queue.empty(): - pool.apply_async(scanner.get_resources, args=(project_queue.get(),)) + # See i#267 on why we use the native threading approach here. + for i, project_obj in enumerate(project_queue): + print('Finished %d projects out of %d' % (i, len(project_queue) - 1)) + sync_t = threading.Thread(target=scanner.get_resources, args=(project_obj,)) + sync_t.daemon = True + sync_t.start() + all_thread_handles.append(sync_t) + + while True: # enforce explicit block on number of threads + active_threads = 0 + for t in all_thread_handles: + if t.is_alive(): + active_threads += 1 + + if active_threads >= int(args.project_worker_count): + time.sleep(0.1) + else: + break - pool.close() - pool.join() + # wait for any threads left to finish + for t in all_thread_handles: + t.join() return 0 diff --git a/src/gcp_scanner/test_acceptance.py b/src/gcp_scanner/test_acceptance.py index 867d90fe..a2dae004 100644 --- a/src/gcp_scanner/test_acceptance.py +++ b/src/gcp_scanner/test_acceptance.py @@ -18,11 +18,12 @@ import json import os +import sys import unittest.mock from . import scanner -RESOURCE_COUNT = 31 +RESOURCE_COUNT = 30 RESULTS_JSON_COUNT = 1 PROJECT_INFO_COUNT = 5 IAM_POLICY_COUNT = 12 @@ -53,6 +54,10 @@ def check_obj_entry(res_dict, subojects_count, entry_name, volatile=False): obj = res_dict.get(entry_name, None) + if subojects_count == 0: + assert obj is None + return + if volatile is True: assert obj is not None and (len(obj) == subojects_count or \ len(obj) == subojects_count - 1) @@ -63,12 +68,10 @@ def check_obj_entry(res_dict, subojects_count, entry_name, volatile=False): def validate_result(): file_name = os.listdir("res/")[0] with open("res/" + file_name, "r", encoding="utf-8") as f: - res_data = json.load(f) + project = json.load(f) - # project - project = res_data["projects"].get("test-gcp-scanner-2", None) + json.dump(project, sys.stdout) assert project is not None - assert len(project) == RESOURCE_COUNT check_obj_entry(project, PROJECT_INFO_COUNT, "project_info") check_obj_entry(project, IAM_POLICY_COUNT, "iam_policy") @@ -106,6 +109,7 @@ def validate_result(): check_obj_entry(project, SERVICES_COUNT, "services") + assert len(project) == RESOURCE_COUNT def test_acceptance(): os.mkdir("res")