From b51059ccf10f0a4ee98da85cdfdcb8d302f28835 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bence=20H=C3=A9zs=C5=91?= Date: Thu, 7 May 2026 13:04:47 +0200 Subject: [PATCH] Normalize code for Ruff and Black checks --- config.py | 1 + core/engine.py | 218 ++++++++--- core/utils.py | 1 + core/utils_aws.py | 168 +++++--- core/utils_azure.py | 122 ++++-- core/utils_db.py | 5 + core/utils_report.py | 724 ++++++++++++++++++++++------------- core/utils_report_html.py | 106 +++-- core/utils_report_json.py | 118 ++++-- core/utils_report_pdf.py | 304 +++++++++------ core/utils_sync.py | 27 +- main.py | 268 +++++++++---- tests/test_utils_and_main.py | 4 +- tests/test_validate.py | 8 +- utils/aws.py | 9 +- utils/azure.py | 20 +- utils/connection.py | 17 +- utils/constants.py | 50 +-- utils/data.py | 37 +- utils/sync.py | 8 +- utils/utils.py | 24 +- utils/validate.py | 30 +- 22 files changed, 1541 insertions(+), 728 deletions(-) diff --git a/config.py b/config.py index bfb4d82..3cb20e6 100644 --- a/config.py +++ b/config.py @@ -16,6 +16,7 @@ Please do not modify CLI_VERSION; it is used for debugging purposes. """ + CLI_VERSION = "v1.0.0" HOST = "" diff --git a/core/engine.py b/core/engine.py index 039b26a..441e099 100644 --- a/core/engine.py +++ b/core/engine.py @@ -14,14 +14,21 @@ from .utils_aws import build_aws_resource_inventory, build_aws_cost_inventory from .utils_azure import build_azure_resource_inventory, build_azure_cost_inventory from .utils_db import connect, load_data -from .utils_report import generate_html_report, generate_pdf_report, generate_json_report +from .utils_report import ( + generate_html_report, + generate_pdf_report, + generate_json_report, +) from .utils_sync import post_assessment # Configure the logger logger = logging.getLogger("core.engine") + # Stage 1 -def verify_credentials(cloud_service_provider: int, provider_details: Dict[str, Any]) -> Tuple[bool, str]: +def verify_credentials( + cloud_service_provider: int, provider_details: Dict[str, Any] +) -> Tuple[bool, str]: connection_success = False logs = "" @@ -31,41 +38,48 @@ def verify_credentials(cloud_service_provider: int, provider_details: Dict[str, credential = provider_details.get("credential") or ClientSecretCredential( tenant_id=provider_details["tenantId"], client_id=provider_details["clientId"], - client_secret=provider_details["clientSecret"] + client_secret=provider_details["clientSecret"], + ) + resource_client = ResourceManagementClient( + credential, provider_details["subscriptionId"] ) - resource_client = ResourceManagementClient(credential, provider_details["subscriptionId"]) - list(resource_client.resource_groups.list()) # Benign call to verify credentials + list( + resource_client.resource_groups.list() + ) # Benign call to verify credentials connection_success = True logs = "Azure connection successful." except ClientAuthenticationError as e: logs = f"Azure credentials validation failed: {str(e)}" - #logger.error(logs) + # logger.error(logs) except Exception as e: logs = f"Azure connection test failed: {str(e)}" - #logger.error(logs) + # logger.error(logs) elif cloud_service_provider == 2: # AWS try: client = boto3.client( - 'ec2', + "ec2", aws_access_key_id=provider_details["accessKey"], aws_secret_access_key=provider_details["secretKey"], - region_name=provider_details["region"] + region_name=provider_details["region"], ) client.describe_regions() # Benign call to verify credentials connection_success = True logs = "AWS connection successful." except NoCredentialsError as e: logs = f"AWS credentials validation failed: {str(e)}" - #logger.error(logs) + # logger.error(logs) except Exception as e: logs = f"AWS connection test failed: {str(e)}" - #logger.error(logs) + # logger.error(logs) return connection_success, logs + # Stage 2 -def test_permissions(cloud_service_provider: int, provider_details: Dict[str, Any]) -> Tuple[bool, bool, bool, str]: +def test_permissions( + cloud_service_provider: int, provider_details: Dict[str, Any] +) -> Tuple[bool, bool, bool, str]: permission_valid = False permission_reader = False permission_cost = False @@ -77,19 +91,27 @@ def test_permissions(cloud_service_provider: int, provider_details: Dict[str, An credential = provider_details.get("credential") or ClientSecretCredential( tenant_id=provider_details["tenantId"], client_id=provider_details["clientId"], - client_secret=provider_details["clientSecret"] + client_secret=provider_details["clientSecret"], ) resource_group_scope = f"/subscriptions/{provider_details['subscriptionId']}/resourceGroups/{provider_details['resourceGroupName']}" # Check role assignments - auth_client = AuthorizationManagementClient(credential, provider_details["subscriptionId"]) - role_assignments = auth_client.role_assignments.list_for_scope(scope=resource_group_scope) + auth_client = AuthorizationManagementClient( + credential, provider_details["subscriptionId"] + ) + role_assignments = auth_client.role_assignments.list_for_scope( + scope=resource_group_scope + ) for role_assignment in role_assignments: role_definition_id = role_assignment.role_definition_id - if role_definition_id.endswith("acdd72a7-3385-48ef-bd42-f606fba81ae7"): # Reader role + if role_definition_id.endswith( + "acdd72a7-3385-48ef-bd42-f606fba81ae7" + ): # Reader role permission_reader = True - if role_definition_id.endswith("72fafb9e-0641-4937-9268-a91bfd8191a3"): # Cost Management Reader + if role_definition_id.endswith( + "72fafb9e-0641-4937-9268-a91bfd8191a3" + ): # Cost Management Reader permission_cost = True if permission_reader and permission_cost: @@ -112,23 +134,25 @@ def test_permissions(cloud_service_provider: int, provider_details: Dict[str, An elif cloud_service_provider == 2: # AWS try: sts_client = boto3.client( - 'sts', + "sts", aws_access_key_id=provider_details["accessKey"], aws_secret_access_key=provider_details["secretKey"], - region_name=provider_details["region"] + region_name=provider_details["region"], ) identity = sts_client.get_caller_identity() - user_arn = identity['Arn'] - user_name = user_arn.split('/')[-1] + user_arn = identity["Arn"] + user_name = user_arn.split("/")[-1] iam_client = boto3.client( - 'iam', + "iam", aws_access_key_id=provider_details["accessKey"], aws_secret_access_key=provider_details["secretKey"], - region_name=provider_details["region"] + region_name=provider_details["region"], ) policies = iam_client.list_attached_user_policies(UserName=user_name) - policy_names = [policy['PolicyName'] for policy in policies['AttachedPolicies']] + policy_names = [ + policy["PolicyName"] for policy in policies["AttachedPolicies"] + ] permission_reader = "ViewOnlyAccess" in policy_names permission_cost = "AWSBillingReadOnlyAccess" in policy_names @@ -154,17 +178,27 @@ def test_permissions(cloud_service_provider: int, provider_details: Dict[str, An return permission_valid, permission_reader, permission_cost, logs + # Stage 3 -def create_resource_inventory(cloud_service_provider: int, provider_details: Dict[str, Any], report_path: str, raw_data_path: str) -> Dict[str, Any]: +def create_resource_inventory( + cloud_service_provider: int, + provider_details: Dict[str, Any], + report_path: str, + raw_data_path: str, +) -> Dict[str, Any]: # Copy assets and datasets folders data copy_assets(report_path) try: if cloud_service_provider == 1: # Azure - build_azure_resource_inventory(cloud_service_provider, provider_details, report_path, raw_data_path) + build_azure_resource_inventory( + cloud_service_provider, provider_details, report_path, raw_data_path + ) elif cloud_service_provider == 2: # AWS - build_aws_resource_inventory(cloud_service_provider, provider_details, report_path, raw_data_path) + build_aws_resource_inventory( + cloud_service_provider, provider_details, report_path, raw_data_path + ) return {"success": True, "logs": "Resource inventory created successfully."} @@ -173,13 +207,23 @@ def create_resource_inventory(cloud_service_provider: int, provider_details: Dic # Do not raise the exception here; just return the error information return {"success": False, "logs": str(e)} + # Stage 4 -def create_cost_inventory(cloud_service_provider: int, provider_details: Dict[str, Any], report_path: str, raw_data_path: str) -> Dict[str, Any]: +def create_cost_inventory( + cloud_service_provider: int, + provider_details: Dict[str, Any], + report_path: str, + raw_data_path: str, +) -> Dict[str, Any]: try: if cloud_service_provider == 1: # Azure - build_azure_cost_inventory(cloud_service_provider, provider_details, report_path, raw_data_path) + build_azure_cost_inventory( + cloud_service_provider, provider_details, report_path, raw_data_path + ) elif cloud_service_provider == 2: # AWS - build_aws_cost_inventory(cloud_service_provider, provider_details, report_path, raw_data_path) + build_aws_cost_inventory( + cloud_service_provider, provider_details, report_path, raw_data_path + ) return {"success": True, "logs": "Cost inventory created successfully."} @@ -187,12 +231,23 @@ def create_cost_inventory(cloud_service_provider: int, provider_details: Dict[st logger.error(f"Error creating cost inventory: {str(e)}", exc_info=True) return {"success": False, "logs": str(e)} + # Stage 5 - Online -def sync_assessment(report_path: str, name: str, started_at: int, metadata: Dict[str, Any], mode: str, token: Optional[str]) -> Dict[str, Any]: +def sync_assessment( + report_path: str, + name: str, + started_at: int, + metadata: Dict[str, Any], + mode: str, + token: Optional[str], +) -> Dict[str, Any]: if mode != "online" or not token: - return {"success": True, "online": False, - "payload": None, - "logs": "offline – sync skipped."} + return { + "success": True, + "online": False, + "payload": None, + "logs": "offline – sync skipped.", + } result = post_assessment( name=name, @@ -221,7 +276,6 @@ def sync_assessment(report_path: str, name: str, started_at: int, metadata: Dict else: rows.append(("null", rid)) - db_path = os.path.join(report_path, "data", "assessment.db") with connect(db_path=db_path) as conn: cursor = conn.cursor() @@ -230,7 +284,7 @@ def sync_assessment(report_path: str, name: str, started_at: int, metadata: Dict INSERT INTO risk_inventory (resource_type, risk) VALUES (?, ?) """, - rows + rows, ) conn.commit() @@ -253,8 +307,8 @@ def sync_assessment(report_path: str, name: str, started_at: int, metadata: Dict int(scoring["exit_score"]), int(scoring["human_score"]), int(scoring["technology_score"]), - int(scoring["operational_score"]) - ) + int(scoring["operational_score"]), + ), ) conn.commit() logger.debug("Scoring data saved to local DB.") @@ -265,13 +319,15 @@ def sync_assessment(report_path: str, name: str, started_at: int, metadata: Dict return result + # Stage 5 - Offline -def perform_risk_assessment(exit_strategy: int, report_path: str, mode: str) -> Dict[str, Any]: +def perform_risk_assessment( + exit_strategy: int, report_path: str, mode: str +) -> Dict[str, Any]: if mode != "offline": logger.debug("Online mode – skipping local risk assessment.") - return {"success": True, - "logs": "online mode – local risk skipped."} + return {"success": True, "logs": "online mode – local risk skipped."} try: # Define the database path @@ -289,24 +345,35 @@ def perform_risk_assessment(exit_strategy: int, report_path: str, mode: str) -> total_resource_count = sum(item["count"] for item in resource_inventory) # Calculate total number of distinct resource types - distinct_resource_types = set(item["resource_type"] for item in resource_inventory) + distinct_resource_types = set( + item["resource_type"] for item in resource_inventory + ) total_resource_types = len(distinct_resource_types) # Process each resource by `resource_type` for resource_data in resource_inventory: - resource_type_id = str(resource_data["resource_type"]) # Convert to string for consistent comparison + resource_type_id = str( + resource_data["resource_type"] + ) # Convert to string for consistent comparison # Filter alternatives for the current resource_type and exit strategy relevant_alternatives = [ - alt for alt in alternatives - if str(alt["resource_type"]) == resource_type_id and str(alt["strategy_type"]) == str(exit_strategy) + alt + for alt in alternatives + if str(alt["resource_type"]) == resource_type_id + and str(alt["strategy_type"]) == str(exit_strategy) ] alternative_count = len(relevant_alternatives) # Count alternatives with support support_count = sum( - 1 for alt in relevant_alternatives - if any(tech["id"] == alt["alternative_technology"] and tech["support_plan"] == "t" for tech in alternative_technologies) + 1 + for alt in relevant_alternatives + if any( + tech["id"] == alt["alternative_technology"] + and tech["support_plan"] == "t" + for tech in alternative_technologies + ) ) # Determine risks based on criteria, using resource_type_id in output @@ -339,7 +406,7 @@ def perform_risk_assessment(exit_strategy: int, report_path: str, mode: str) -> INSERT INTO risk_inventory (resource_type, risk) VALUES (?, ?) """, - [(entry["resource_type"], entry["risk"]) for entry in risk_inventory] + [(entry["resource_type"], entry["risk"]) for entry in risk_inventory], ) conn.commit() @@ -349,8 +416,17 @@ def perform_risk_assessment(exit_strategy: int, report_path: str, mode: str) -> logger.error(f"Error performing risk assessment: {str(e)}", exc_info=True) return {"success": False, "logs": str(e)} + # Stage 6 -def generate_report(cloud_service_provider: int, provider_details: Dict[str, Any], exit_strategy: int, assessment_type: int, name: str, report_path: str, raw_data_path: str) -> Dict[str, Any]: +def generate_report( + cloud_service_provider: int, + provider_details: Dict[str, Any], + exit_strategy: int, + assessment_type: int, + name: str, + report_path: str, + raw_data_path: str, +) -> Dict[str, Any]: try: db_path = os.path.join(report_path, "data", "assessment.db") @@ -384,7 +460,9 @@ def generate_report(cloud_service_provider: int, provider_details: Dict[str, Any elif len(scoring_data) == 0: scoring_data = None else: - logger.warning("Unexpected multiple rows in scoring_data: %d", len(scoring_data)) + logger.warning( + "Unexpected multiple rows in scoring_data: %d", len(scoring_data) + ) scoring_data = scoring_data[0] # Generate Outputs @@ -392,20 +470,48 @@ def generate_report(cloud_service_provider: int, provider_details: Dict[str, Any # Generate HTML report reports["HTML"] = generate_html_report( - report_path, metadata, resource_type_mapping, resource_inventory, - cost_data, scoring_data, risk_data, risk_definitions, alternatives, alternative_technologies, exit_strategy + report_path, + metadata, + resource_type_mapping, + resource_inventory, + cost_data, + scoring_data, + risk_data, + risk_definitions, + alternatives, + alternative_technologies, + exit_strategy, ) # Generate PDF report reports["PDF"] = generate_pdf_report( - provider_details, report_path, metadata, resource_type_mapping, resource_inventory, - cost_data, scoring_data, risk_data, risk_definitions, alternatives, alternative_technologies, exit_strategy + provider_details, + report_path, + metadata, + resource_type_mapping, + resource_inventory, + cost_data, + scoring_data, + risk_data, + risk_definitions, + alternatives, + alternative_technologies, + exit_strategy, ) # Generate JSON report reports["JSON"] = generate_json_report( - raw_data_path, metadata, resource_type_mapping, resource_inventory, - cost_data, scoring_data, risk_data, risk_definitions, alternatives, alternative_technologies, exit_strategy + raw_data_path, + metadata, + resource_type_mapping, + resource_inventory, + cost_data, + scoring_data, + risk_data, + risk_definitions, + alternatives, + alternative_technologies, + exit_strategy, ) return {"success": True, "reports": reports} diff --git a/core/utils.py b/core/utils.py index f4eead0..3ba82d0 100644 --- a/core/utils.py +++ b/core/utils.py @@ -5,6 +5,7 @@ logger = logging.getLogger("core.engine.utils") + def copy_assets(report_path: str) -> None: assets_folders = ["css", "img", "icons"] assets_path = os.path.join(report_path, "assets") diff --git a/core/utils_aws.py b/core/utils_aws.py index a74438a..ab31aba 100644 --- a/core/utils_aws.py +++ b/core/utils_aws.py @@ -16,7 +16,14 @@ logger = logging.getLogger("core.engine.aws") -def aws_api_call_with_retry(client: Any, function_name: str, parameters: Dict[str, Any], max_retries: int, retry_delay: int) -> Callable[..., Any]: + +def aws_api_call_with_retry( + client: Any, + function_name: str, + parameters: Dict[str, Any], + max_retries: int, + retry_delay: int, +) -> Callable[..., Any]: def api_call(*args, **kwargs): for attempt in range(max_retries): try: @@ -26,21 +33,22 @@ def api_call(*args, **kwargs): else: return function_to_call(**kwargs) except botocore.exceptions.ClientError as error: - error_code = error.response['Error']['Code'] - #logger.warning(f"ClientError: {error_code}. Attempt {attempt + 1} of {max_retries}. Retrying in {retry_delay} seconds.") - if error_code in ['Throttling', 'RequestLimitExceeded']: - time.sleep(retry_delay * (2 ** attempt)) + error_code = error.response["Error"]["Code"] + # logger.warning(f"ClientError: {error_code}. Attempt {attempt + 1} of {max_retries}. Retrying in {retry_delay} seconds.") + if error_code in ["Throttling", "RequestLimitExceeded"]: + time.sleep(retry_delay * (2**attempt)) continue else: raise - except botocore.exceptions.BotoCoreError as error: - #logger.warning(f"BotoCoreError: {str(error)}. Attempt {attempt + 1} of {max_retries}. Retrying in {retry_delay} seconds.") - time.sleep(retry_delay * (2 ** attempt)) + except botocore.exceptions.BotoCoreError: + # logger.warning(f"BotoCoreError. Attempt {attempt + 1} of {max_retries}. Retrying in {retry_delay} seconds.") + time.sleep(retry_delay * (2**attempt)) continue raise Exception(f"Failed to call {function_name} after {max_retries} attempts") return api_call # Return the callable function + def convert_datetime(obj: Any) -> Any: if isinstance(obj, dict): for k, v in obj.items(): @@ -52,7 +60,13 @@ def convert_datetime(obj: Any) -> Any: return obj.isoformat() return obj -def build_aws_resource_inventory(cloud_service_provider: int, provider_details: Dict[str, Any], report_path: str, raw_data_path: str) -> None: + +def build_aws_resource_inventory( + cloud_service_provider: int, + provider_details: Dict[str, Any], + report_path: str, + raw_data_path: str, +) -> None: try: access_key = provider_details["accessKey"] secret_key = provider_details["secretKey"] @@ -61,7 +75,7 @@ def build_aws_resource_inventory(cloud_service_provider: int, provider_details: session = boto3.Session( aws_access_key_id=access_key, aws_secret_access_key=secret_key, - region_name=region + region_name=region, ) db_path = os.path.join(report_path, "data", "assessment.db") @@ -80,38 +94,42 @@ def build_aws_resource_inventory(cloud_service_provider: int, provider_details: aggregated_resources = defaultdict(int) # Iterate through each resource type in the JSON - for idx, (resource_type_code, resource_info) in enumerate(resource_type_mapping.items(), start=1): - parts = resource_type_code.split('.') + for idx, (resource_type_code, resource_info) in enumerate( + resource_type_mapping.items(), start=1 + ): + parts = resource_type_code.split(".") if len(parts) != 4 or parts[0] != "AWS": - #logger.warning(f"Invalid resource type format: {resource_type_code}. Skipping.") + # logger.warning(f"Invalid resource type format: {resource_type_code}. Skipping.") continue # Extract service name, operation name, and result key service_name, operation_name, result_key = parts[1], parts[2], parts[3] - #logger.info(f"Processing service {service_name} with operation {operation_name}") + # logger.info(f"Processing service {service_name} with operation {operation_name}") try: client = session.client(service_name, region_name=region) if not hasattr(client, operation_name): - #logger.error(f"Operation {operation_name} does not exist for service {service_name}") + # logger.error(f"Operation {operation_name} does not exist for service {service_name}") continue # Make the API call - api_call = aws_api_call_with_retry(client, operation_name, {}, max_retries=3, retry_delay=2) + api_call = aws_api_call_with_retry( + client, operation_name, {}, max_retries=3, retry_delay=2 + ) response = api_call() if isinstance(response, dict): response.pop("ResponseMetadata", None) resources = response.get(result_key.strip(), []) # Handle paginated results - while 'NextToken' in response: - next_token = response['NextToken'] + while "NextToken" in response: + next_token = response["NextToken"] response = api_call(NextToken=next_token) response.pop("ResponseMetadata", None) resources.extend(response.get(result_key.strip(), [])) else: - #logger.warning(f"No valid response found for {service_name} operation {operation_name}. Skipping.") + # logger.warning(f"No valid response found for {service_name} operation {operation_name}. Skipping.") continue # Aggregate the resources @@ -119,14 +137,16 @@ def build_aws_resource_inventory(cloud_service_provider: int, provider_details: aggregated_resources[(resource_type_code, region)] += 1 # Store raw data - raw_data.append({ - "service": service_name, - "operation": operation_name, - "resources": resources - }) - - except (NoCredentialsError, ClientError, Exception) as e: - #logger.error(f"Error while processing {service_name}: {str(e)}", exc_info=True) + raw_data.append( + { + "service": service_name, + "operation": operation_name, + "resources": resources, + } + ) + + except (NoCredentialsError, ClientError, Exception): + # logger.error(f"Error while processing {service_name}", exc_info=True) continue # Save raw data to a JSON file @@ -140,12 +160,15 @@ def build_aws_resource_inventory(cloud_service_provider: int, provider_details: with connect(db_path=db_path) as conn: cursor = conn.cursor() - for (resource_type_code, resource_location), resource_count in aggregated_resources.items(): + for ( + resource_type_code, + resource_location, + ), resource_count in aggregated_resources.items(): try: # Map resource type code to resource_type_id resource_info = resource_type_mapping.get(resource_type_code) if not resource_info: - #logger.warning(f"Resource type {resource_type_code} not found in resourcetype mapping. Skipping.") + # logger.warning(f"Resource type {resource_type_code} not found in resourcetype mapping. Skipping.") continue resource_type_id = resource_info["id"] @@ -156,21 +179,31 @@ def build_aws_resource_inventory(cloud_service_provider: int, provider_details: VALUES (?, ?, ?) ON CONFLICT(resource_type, location) DO UPDATE SET count = excluded.count """, - (resource_type_id, resource_location, resource_count) + (resource_type_id, resource_location, resource_count), ) except sqlite3.Error as e: - logger.error(f"SQLite error while processing aggregated resource: {e}", exc_info=True) + logger.error( + f"SQLite error while processing aggregated resource: {e}", + exc_info=True, + ) except Exception as e: - logger.error(f"Unexpected error while processing aggregated resource: {e}", exc_info=True) + logger.error( + f"Unexpected error while processing aggregated resource: {e}", + exc_info=True, + ) conn.commit() except Exception as e: logger.error(f"Error creating AWS resource inventory: {str(e)}", exc_info=True) + def get_missing_months_aws(processed_costs: Set[str], max_months: int) -> List[date]: current_date = datetime.utcnow().date().replace(day=1) - processed_months = {datetime.strptime(month_str, '%Y-%m-%d').date().replace(day=1) for month_str in processed_costs} + processed_months = { + datetime.strptime(month_str, "%Y-%m-%d").date().replace(day=1) + for month_str in processed_costs + } missing_months = [] for i in range(max_months): @@ -180,14 +213,20 @@ def get_missing_months_aws(processed_costs: Set[str], max_months: int) -> List[d return missing_months -def build_aws_cost_inventory(cloud_service_provider: int, provider_details: Dict[str, Any], report_path: str, raw_data_path: str) -> None: + +def build_aws_cost_inventory( + cloud_service_provider: int, + provider_details: Dict[str, Any], + report_path: str, + raw_data_path: str, +) -> None: try: session = boto3.Session( aws_access_key_id=provider_details["accessKey"], aws_secret_access_key=provider_details["secretKey"], - region_name=provider_details["region"] + region_name=provider_details["region"], ) - cost_explorer = session.client('ce', region_name='us-east-1') + cost_explorer = session.client("ce", region_name="us-east-1") db_path = os.path.join(report_path, "data", "assessment.db") @@ -195,19 +234,21 @@ def build_aws_cost_inventory(cloud_service_provider: int, provider_details: Dict start_time = end_time - relativedelta(months=6) cost_and_usage = cost_explorer.get_cost_and_usage( - TimePeriod={'Start': start_time.strftime('%Y-%m-%d'), 'End': end_time.strftime('%Y-%m-%d')}, - Granularity='MONTHLY', - Metrics=['UnblendedCost'], - GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}], + TimePeriod={ + "Start": start_time.strftime("%Y-%m-%d"), + "End": end_time.strftime("%Y-%m-%d"), + }, + Granularity="MONTHLY", + Metrics=["UnblendedCost"], + GroupBy=[{"Type": "DIMENSION", "Key": "SERVICE"}], Filter={ - 'Dimensions': { - 'Key': 'REGION', - 'Values': [provider_details["region"]] - } - } + "Dimensions": {"Key": "REGION", "Values": [provider_details["region"]]} + }, ) - cost_inventory_raw_path = os.path.join(raw_data_path, "cost_inventory_raw_data.json") + cost_inventory_raw_path = os.path.join( + raw_data_path, "cost_inventory_raw_data.json" + ) with open(cost_inventory_raw_path, "w", encoding="utf-8") as raw_file: json.dump(cost_and_usage, raw_file, indent=4) @@ -215,11 +256,23 @@ def build_aws_cost_inventory(cloud_service_provider: int, provider_details: Dict with connect(db_path=db_path) as conn: cursor = conn.cursor() - for result in cost_and_usage['ResultsByTime']: - month_str = result['TimePeriod']['Start'] - total_cost = sum(float(group['Metrics']['UnblendedCost']['Amount']) for group in result['Groups']) - currency = result['Groups'][0]['Metrics']['UnblendedCost']['Unit'] if result['Groups'] else 'USD' - month_date = datetime.strptime(month_str, '%Y-%m-%d').date().replace(day=1).isoformat() + for result in cost_and_usage["ResultsByTime"]: + month_str = result["TimePeriod"]["Start"] + total_cost = sum( + float(group["Metrics"]["UnblendedCost"]["Amount"]) + for group in result["Groups"] + ) + currency = ( + result["Groups"][0]["Metrics"]["UnblendedCost"]["Unit"] + if result["Groups"] + else "USD" + ) + month_date = ( + datetime.strptime(month_str, "%Y-%m-%d") + .date() + .replace(day=1) + .isoformat() + ) # Insert or update the cost data for the month cursor.execute( @@ -230,12 +283,17 @@ def build_aws_cost_inventory(cloud_service_provider: int, provider_details: Dict cost = excluded.cost, currency = excluded.currency """, - (month_date, total_cost, currency) + (month_date, total_cost, currency), ) # Handle missing months - structured_months = {datetime.strptime(result['TimePeriod']['Start'], '%Y-%m-%d').date() for result in cost_and_usage['ResultsByTime']} - missing_months = get_missing_months_aws({month.isoformat() for month in structured_months}, 6) + structured_months = { + datetime.strptime(result["TimePeriod"]["Start"], "%Y-%m-%d").date() + for result in cost_and_usage["ResultsByTime"] + } + missing_months = get_missing_months_aws( + {month.isoformat() for month in structured_months}, 6 + ) for missing_month in missing_months: cursor.execute( @@ -245,7 +303,7 @@ def build_aws_cost_inventory(cloud_service_provider: int, provider_details: Dict ON CONFLICT(month) DO UPDATE SET currency = excluded.currency """, - (missing_month.isoformat(), currency) + (missing_month.isoformat(), currency), ) conn.commit() diff --git a/core/utils_azure.py b/core/utils_azure.py index c070a5c..b69b534 100644 --- a/core/utils_azure.py +++ b/core/utils_azure.py @@ -18,28 +18,41 @@ logger = logging.getLogger("core.engine.azure") logging.getLogger("azure").setLevel(logging.WARNING) -def is_resource_inventory_empty(credential: Any, subscription_id: str, resource_group_name: str) -> bool: + +def is_resource_inventory_empty( + credential: Any, subscription_id: str, resource_group_name: str +) -> bool: try: resource_client = ResourceManagementClient(credential, subscription_id) - #logger.info("Checking Azure resource inventory...") - resources = list(resource_client.resources.list_by_resource_group(resource_group_name)) + # logger.info("Checking Azure resource inventory...") + resources = list( + resource_client.resources.list_by_resource_group(resource_group_name) + ) if not resources: - #logger.info("No resources found in the resource group.") + # logger.info("No resources found in the resource group.") return True else: - #logger.info("Resources found in the resource group.") + # logger.info("Resources found in the resource group.") return False except AzureError as e: - logger.error(f"Error checking Azure resource inventory: {str(e)}", exc_info=True) + logger.error( + f"Error checking Azure resource inventory: {str(e)}", exc_info=True + ) raise -def build_azure_resource_inventory(cloud_service_provider: int, provider_details: Dict[str, Any], report_path: str, raw_data_path: str) -> None: + +def build_azure_resource_inventory( + cloud_service_provider: int, + provider_details: Dict[str, Any], + report_path: str, + raw_data_path: str, +) -> None: try: # Use DefaultAzureCredential if provided, otherwise fall back to ClientSecretCredential credential = provider_details.get("credential") or ClientSecretCredential( tenant_id=provider_details["tenantId"], client_id=provider_details["clientId"], - client_secret=provider_details["clientSecret"] + client_secret=provider_details["clientSecret"], ) subscription_id = provider_details["subscriptionId"] resource_group_name = provider_details["resourceGroupName"] @@ -47,14 +60,20 @@ def build_azure_resource_inventory(cloud_service_provider: int, provider_details db_path = os.path.join(report_path, "data", "assessment.db") # Check if resource inventory is empty - if is_resource_inventory_empty(credential, subscription_id, resource_group_name): - logger.warning("The selected resource group does not contain any resources.") + if is_resource_inventory_empty( + credential, subscription_id, resource_group_name + ): + logger.warning( + "The selected resource group does not contain any resources." + ) return resource_client = ResourceManagementClient(credential, subscription_id) # Fetch resources and serialize to raw JSON - resources = list(resource_client.resources.list_by_resource_group(resource_group_name)) + resources = list( + resource_client.resources.list_by_resource_group(resource_group_name) + ) raw_data = [resource.serialize(True) for resource in resources] # Save raw data to a JSON file @@ -63,7 +82,9 @@ def build_azure_resource_inventory(cloud_service_provider: int, provider_details json.dump(raw_data, raw_file, indent=4) # Load resource type mapping from the assessment database - resource_type_mapping = getattr(build_azure_resource_inventory, "_resource_type_cache", None) + resource_type_mapping = getattr( + build_azure_resource_inventory, "_resource_type_cache", None + ) if resource_type_mapping is None: resource_type_mapping = { item["code"].strip().lower(): {"id": item["id"], "name": item["name"]} @@ -83,8 +104,15 @@ def build_azure_resource_inventory(cloud_service_provider: int, provider_details with connect(db_path=db_path) as conn: cursor = conn.cursor() data_to_insert = [ - (resource_type_mapping[resource_type_code]["id"], resource_location, resource_count) - for (resource_type_code, resource_location), resource_count in aggregated_resources.items() + ( + resource_type_mapping[resource_type_code]["id"], + resource_location, + resource_count, + ) + for ( + resource_type_code, + resource_location, + ), resource_count in aggregated_resources.items() if resource_type_code in resource_type_mapping ] cursor.executemany( @@ -93,7 +121,7 @@ def build_azure_resource_inventory(cloud_service_provider: int, provider_details VALUES (?, ?, ?) ON CONFLICT(resource_type, location) DO UPDATE SET count = excluded.count """, - data_to_insert + data_to_insert, ) conn.commit() @@ -104,32 +132,46 @@ def build_azure_resource_inventory(cloud_service_provider: int, provider_details except Exception as e: logger.error(f"Error fetching Azure resources: {str(e)}", exc_info=True) + def get_missing_months_azure(processed_costs: Set[str], months_back: int) -> Set[date]: today = date.today() start_date = today.replace(day=1) - relativedelta(months=months_back - 1) - all_months = {(start_date + relativedelta(months=i)).replace(day=1) for i in range(months_back)} + all_months = { + (start_date + relativedelta(months=i)).replace(day=1) + for i in range(months_back) + } processed_months = set() for month_str in processed_costs: try: # Attempt parsing with full timestamp format - month_date = datetime.strptime(month_str, '%Y-%m-%dT%H:%M:%S').date().replace(day=1) + month_date = ( + datetime.strptime(month_str, "%Y-%m-%dT%H:%M:%S").date().replace(day=1) + ) except ValueError: # Fallback to date-only format if full timestamp fails - month_date = datetime.strptime(month_str, '%Y-%m-%d').date().replace(day=1) + month_date = datetime.strptime(month_str, "%Y-%m-%d").date().replace(day=1) processed_months.add(month_date) return all_months - processed_months -def build_azure_cost_inventory(cloud_service_provider: int, provider_details: Dict[str, Any], report_path: str, raw_data_path: str) -> None: + +def build_azure_cost_inventory( + cloud_service_provider: int, + provider_details: Dict[str, Any], + report_path: str, + raw_data_path: str, +) -> None: try: # Use DefaultAzureCredential if provided, otherwise fall back to ClientSecretCredential credential = provider_details.get("credential") or ClientSecretCredential( tenant_id=provider_details["tenantId"], client_id=provider_details["clientId"], - client_secret=provider_details["clientSecret"] + client_secret=provider_details["clientSecret"], + ) + cost_management_client = CostManagementClient( + credential, base_url="https://management.azure.com" ) - cost_management_client = CostManagementClient(credential, base_url="https://management.azure.com") db_path = os.path.join(report_path, "data", "assessment.db") @@ -138,22 +180,26 @@ def build_azure_cost_inventory(cloud_service_provider: int, provider_details: Di start_time = end_time.replace(day=1) - relativedelta(months=months_back - 1) query = QueryDefinition( - type='Usage', + type="Usage", timeframe=TimeframeType.CUSTOM, - time_period={'from': start_time.strftime('%Y-%m-%dT00:00:00Z'), 'to': end_time.strftime('%Y-%m-%dT00:00:00Z')}, + time_period={ + "from": start_time.strftime("%Y-%m-%dT00:00:00Z"), + "to": end_time.strftime("%Y-%m-%dT00:00:00Z"), + }, dataset={ - 'granularity': 'Monthly', - 'aggregation': { - 'totalCost': {'name': 'Cost', 'function': 'Sum'} - } - } + "granularity": "Monthly", + "aggregation": {"totalCost": {"name": "Cost", "function": "Sum"}}, + }, ) cost_data = cost_management_client.query.usage( - f'/subscriptions/{provider_details["subscriptionId"]}/resourceGroups/{provider_details["resourceGroupName"]}', query + f'/subscriptions/{provider_details["subscriptionId"]}/resourceGroups/{provider_details["resourceGroupName"]}', + query, ) - cost_inventory_raw_path = os.path.join(raw_data_path, "cost_inventory_raw_data.json") + cost_inventory_raw_path = os.path.join( + raw_data_path, "cost_inventory_raw_data.json" + ) with open(cost_inventory_raw_path, "w", encoding="utf-8") as raw_file: json.dump(cost_data.as_dict(), raw_file, indent=4) @@ -163,7 +209,12 @@ def build_azure_cost_inventory(cloud_service_provider: int, provider_details: Di for row in cost_data.rows: cost, month_str, currency = row - month_date = datetime.strptime(month_str, '%Y-%m-%dT%H:%M:%S').date().replace(day=1).isoformat() + month_date = ( + datetime.strptime(month_str, "%Y-%m-%dT%H:%M:%S") + .date() + .replace(day=1) + .isoformat() + ) # Insert or update cost data cursor.execute( @@ -174,11 +225,14 @@ def build_azure_cost_inventory(cloud_service_provider: int, provider_details: Di cost = excluded.cost, currency = excluded.currency """, - (month_date, cost, currency) + (month_date, cost, currency), ) # Extract months already in the cost data - structured_months = {datetime.strptime(row[1], '%Y-%m-%dT%H:%M:%S').date() for row in cost_data.rows} + structured_months = { + datetime.strptime(row[1], "%Y-%m-%dT%H:%M:%S").date() + for row in cost_data.rows + } # Identify missing months and insert with zero cost missing_months = get_missing_months_azure( @@ -192,7 +246,7 @@ def build_azure_cost_inventory(cloud_service_provider: int, provider_details: Di ON CONFLICT(month) DO UPDATE SET currency = excluded.currency """, - (missing_month.isoformat(), currency) + (missing_month.isoformat(), currency), ) conn.commit() diff --git a/core/utils_db.py b/core/utils_db.py index 3a2d263..b32270a 100644 --- a/core/utils_db.py +++ b/core/utils_db.py @@ -9,6 +9,7 @@ # Default master database MASTER_DATABASE = "datasets/data.db" + def connect(db_path=MASTER_DATABASE): try: conn = sqlite3.connect(db_path) @@ -17,6 +18,7 @@ def connect(db_path=MASTER_DATABASE): logger.error(f"Error connecting to database: {e}") raise + def load_data(table_name, db_path=MASTER_DATABASE): try: conn = connect(db_path) @@ -30,6 +32,7 @@ def load_data(table_name, db_path=MASTER_DATABASE): logger.error(f"Error loading data from table '{table_name}': {e}") raise + def execute_query(query, params=None, db_path=MASTER_DATABASE): try: conn = connect(db_path) @@ -43,6 +46,7 @@ def execute_query(query, params=None, db_path=MASTER_DATABASE): logger.error(f"Error executing query: {e}") raise + def fetch_one(query, params=None, db_path=MASTER_DATABASE): try: conn = connect(db_path) @@ -56,6 +60,7 @@ def fetch_one(query, params=None, db_path=MASTER_DATABASE): logger.error(f"Error fetching data: {e}") raise + def fetch_all(query, params=None, db_path=MASTER_DATABASE): try: conn = connect(db_path) diff --git a/core/utils_report.py b/core/utils_report.py index 838c1ce..8b8d306 100644 --- a/core/utils_report.py +++ b/core/utils_report.py @@ -11,44 +11,94 @@ from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib import colors from reportlab.lib.colors import HexColor -from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak, Image, Table, TableStyle +from reportlab.platypus import ( + SimpleDocTemplate, + Paragraph, + Spacer, + PageBreak, + Image, + Table, + TableStyle, +) # Utils -from core.utils_report_html import transform_cost_inventory_for_html, transform_risk_inventory_for_html, transform_alt_tech_for_html -from core.utils_report_json import transform_resource_inventory_for_json, transform_cost_inventory_for_json, transform_risk_inventory_for_json, transform_alt_tech_for_json -from core.utils_report_pdf import transform_resource_inventory_for_pdf, transform_cost_inventory_for_pdf, transform_risk_inventory_for_pdf, transform_alt_tech_for_pdf, draw_header_footer, draw_risk_chart, draw_cost_chart, draw_vendor_lockin_radar_chart, draw_exitscore_chart +from core.utils_report_html import ( + transform_cost_inventory_for_html, + transform_risk_inventory_for_html, + transform_alt_tech_for_html, +) +from core.utils_report_json import ( + transform_resource_inventory_for_json, + transform_cost_inventory_for_json, + transform_risk_inventory_for_json, + transform_alt_tech_for_json, +) +from core.utils_report_pdf import ( + transform_resource_inventory_for_pdf, + transform_cost_inventory_for_pdf, + transform_risk_inventory_for_pdf, + transform_alt_tech_for_pdf, + draw_header_footer, + draw_risk_chart, + draw_cost_chart, + draw_vendor_lockin_radar_chart, + draw_exitscore_chart, +) # Configure logger logger = logging.getLogger("core.engine.report") logger.setLevel(logging.INFO) + def anonymize_string(s: str, num_visible: int = 4) -> str: if not isinstance(s, str): return "N/A" if len(s) <= 2 * num_visible: - return '*' * len(s) + return "*" * len(s) middle_length = len(s) - 2 * num_visible return f"{s[:num_visible]}{'*' * middle_length}{s[-num_visible:]}" -def generate_html_report(report_path: str, metadata: Dict[str, Any], resource_type_mapping: Dict[str, Dict[str, Any]], resource_inventory: List[Dict[str, Any]], cost_data: List[Dict[str, Any]], scoring_data: Optional[Dict[str, Any]], risk_data: List[Dict[str, Any]], risk_definitions: List[Dict[str, Any]], alternatives: List[Dict[str, Any]], alternative_technologies: List[Dict[str, Any]], exit_strategy: int) -> str: + +def generate_html_report( + report_path: str, + metadata: Dict[str, Any], + resource_type_mapping: Dict[str, Dict[str, Any]], + resource_inventory: List[Dict[str, Any]], + cost_data: List[Dict[str, Any]], + scoring_data: Optional[Dict[str, Any]], + risk_data: List[Dict[str, Any]], + risk_definitions: List[Dict[str, Any]], + alternatives: List[Dict[str, Any]], + alternative_technologies: List[Dict[str, Any]], + exit_strategy: int, +) -> str: # Transform resource inventory resource_inventory_dict = { str(item["resource_type"]): { **item, - "name": resource_type_mapping.get(str(item["resource_type"]), {}).get("name", "Unknown Resource"), - "icon": "/assets" + resource_type_mapping.get(str(item["resource_type"]), {}).get("icon", "/icons/default.png") + "name": resource_type_mapping.get(str(item["resource_type"]), {}).get( + "name", "Unknown Resource" + ), + "icon": "/assets" + + resource_type_mapping.get(str(item["resource_type"]), {}).get( + "icon", "/icons/default.png" + ), } for item in resource_inventory } # Transform risks - risks, severity_counts = transform_risk_inventory_for_html(risk_data, risk_definitions, resource_inventory_dict) + risks, severity_counts = transform_risk_inventory_for_html( + risk_data, risk_definitions, resource_inventory_dict + ) # Transform costs - months, cost_values, total_cost, currency, currency_symbol = transform_cost_inventory_for_html(cost_data) + months, cost_values, total_cost, currency, currency_symbol = ( + transform_cost_inventory_for_html(cost_data) + ) # Transform resource data with names and icons resource_counts = [] @@ -56,20 +106,19 @@ def generate_html_report(report_path: str, metadata: Dict[str, Any], resource_ty count = resource.get("count", 0) resource_info = resource_type_mapping.get(str(resource_type), {}) name = resource_info.get("name", "Unknown Resource") - icon = resource_info.get("icon", "assets/icons/default.png").lstrip('/') + icon = resource_info.get("icon", "assets/icons/default.png").lstrip("/") - resource_counts.append({ - "resource_type": resource_type, - "name": name, - "icon": icon, - "count": count - }) + resource_counts.append( + {"resource_type": resource_type, "name": name, "icon": icon, "count": count} + ) # Calculate total resources total_resources = sum(item["count"] for item in resource_counts) # Transform alternative technologies - alternative_technologies_data = transform_alt_tech_for_html(resource_inventory, alternatives, alternative_technologies, exit_strategy) + alternative_technologies_data = transform_alt_tech_for_html( + resource_inventory, alternatives, alternative_technologies, exit_strategy + ) # Scoring Data scoring_context = { @@ -82,7 +131,7 @@ def generate_html_report(report_path: str, metadata: Dict[str, Any], resource_ty # Render the HTML template template_path = os.path.join("assets", "template", "index.html") - with open(template_path, 'r') as file: + with open(template_path, "r") as file: template_content = file.read() template = Template(template_content) @@ -90,9 +139,9 @@ def generate_html_report(report_path: str, metadata: Dict[str, Any], resource_ty **metadata, **scoring_context, risks=risks, - high_risk_count=severity_counts['high'], - medium_risk_count=severity_counts['medium'], - low_risk_count=severity_counts['low'], + high_risk_count=severity_counts["high"], + medium_risk_count=severity_counts["medium"], + low_risk_count=severity_counts["low"], total_cost=total_cost, months_json=json.dumps(months), costs_json=json.dumps(cost_values), @@ -104,17 +153,36 @@ def generate_html_report(report_path: str, metadata: Dict[str, Any], resource_ty # Save HTML report html_path = os.path.join(report_path, "index.html") - with open(html_path, 'w') as report_file: + with open(html_path, "w") as report_file: report_file.write(html_content) return html_path -def generate_json_report(raw_data_path: str, metadata: Dict[str, Any], resource_type_mapping: Dict[str, Dict[str, Any]], resource_inventory: List[Dict[str, Any]], cost_data: List[Dict[str, Any]], scoring_data: Optional[Dict[str, Any]], risk_data: List[Dict[str, Any]], risk_definitions: List[Dict[str, Any]], alternatives: List[Dict[str, Any]], alternative_technologies: List[Dict[str, Any]], exit_strategy: int) -> str: + +def generate_json_report( + raw_data_path: str, + metadata: Dict[str, Any], + resource_type_mapping: Dict[str, Dict[str, Any]], + resource_inventory: List[Dict[str, Any]], + cost_data: List[Dict[str, Any]], + scoring_data: Optional[Dict[str, Any]], + risk_data: List[Dict[str, Any]], + risk_definitions: List[Dict[str, Any]], + alternatives: List[Dict[str, Any]], + alternative_technologies: List[Dict[str, Any]], + exit_strategy: int, +) -> str: # Transform data for JSON - transformed_resource_inventory = transform_resource_inventory_for_json(resource_inventory, resource_type_mapping) + transformed_resource_inventory = transform_resource_inventory_for_json( + resource_inventory, resource_type_mapping + ) transformed_cost_inventory = transform_cost_inventory_for_json(cost_data) - transformed_risk_inventory = transform_risk_inventory_for_json(risk_data, risk_definitions, resource_inventory) - transformed_alt_tech = transform_alt_tech_for_json(resource_inventory, alternatives, alternative_technologies, exit_strategy) + transformed_risk_inventory = transform_risk_inventory_for_json( + risk_data, risk_definitions, resource_inventory + ) + transformed_alt_tech = transform_alt_tech_for_json( + resource_inventory, alternatives, alternative_technologies, exit_strategy + ) # Build the JSON structure report_json = { @@ -123,7 +191,7 @@ def generate_json_report(raw_data_path: str, metadata: Dict[str, Any], resource_ "resource_inventory": transformed_resource_inventory, "cost_inventory": transformed_cost_inventory, "risk_inventory": transformed_risk_inventory, - } + }, } # Add scoring_data only if present @@ -140,12 +208,26 @@ def generate_json_report(raw_data_path: str, metadata: Dict[str, Any], resource_ # Save JSON to file json_path = os.path.join(raw_data_path, "assessment_result.json") - with open(json_path, 'w') as json_file: + with open(json_path, "w") as json_file: json.dump(report_json, json_file, indent=4) return json_path -def generate_pdf_report(provider_details: Dict[str, Any], report_path: str, metadata: Dict[str, Any], resource_type_mapping: Dict[str, Any], resource_inventory: List[Dict[str, Any]], cost_data: List[Dict[str, Any]], scoring_data: Optional[Dict[str, Any]], risk_data: List[Dict[str, Any]], risk_definitions: List[Dict[str, Any]], alternatives: List[Dict[str, Any]], alternative_technologies: List[Dict[str, Any]], exit_strategy: int) -> str: + +def generate_pdf_report( + provider_details: Dict[str, Any], + report_path: str, + metadata: Dict[str, Any], + resource_type_mapping: Dict[str, Any], + resource_inventory: List[Dict[str, Any]], + cost_data: List[Dict[str, Any]], + scoring_data: Optional[Dict[str, Any]], + risk_data: List[Dict[str, Any]], + risk_definitions: List[Dict[str, Any]], + alternatives: List[Dict[str, Any]], + alternative_technologies: List[Dict[str, Any]], + exit_strategy: int, +) -> str: # Define the PDF path pdf_path = os.path.join(report_path, "report.pdf") @@ -155,14 +237,18 @@ def header_footer(canvas, doc): draw_header_footer(report_path, canvas, doc) # Create a document template with the header and footer - doc = SimpleDocTemplate(pdf_path, pagesize=A4, title="EscapeCloud_-_Cloud_Exit_Assessment") + doc = SimpleDocTemplate( + pdf_path, pagesize=A4, title="EscapeCloud_-_Cloud_Exit_Assessment" + ) styles = getSampleStyleSheet() - content_style = ParagraphStyle('ContentStyle', fontSize=10, leading=12, spaceAfter=10) - styles['Heading1'].leading = 1.5 * styles['Heading1'].fontSize - styles['Heading1'].textColor = HexColor('#112726') - styles['Heading2'].leading = 1.5 * styles['Heading2'].fontSize - styles['Heading2'].textColor = HexColor('#112726') - tablecontent_style = styles['BodyText'] + content_style = ParagraphStyle( + "ContentStyle", fontSize=10, leading=12, spaceAfter=10 + ) + styles["Heading1"].leading = 1.5 * styles["Heading1"].fontSize + styles["Heading1"].textColor = HexColor("#112726") + styles["Heading2"].leading = 1.5 * styles["Heading2"].fontSize + styles["Heading2"].textColor = HexColor("#112726") + tablecontent_style = styles["BodyText"] # Define a custom padding value header_padding = 12 @@ -171,7 +257,7 @@ def header_footer(canvas, doc): # --- # Page 1: Summary --- content.append(Spacer(1, header_padding)) - content.append(Paragraph("Summary", styles['Heading1'])) + content.append(Paragraph("Summary", styles["Heading1"])) summary_block1 = "Quick overview of the assessment:" content.append(Paragraph(summary_block1, content_style)) @@ -180,47 +266,64 @@ def header_footer(canvas, doc): "1": "Microsoft Azure", "2": "Amazon Web Services", "3": "Alibaba Cloud", - "4": "Google Cloud" + "4": "Google Cloud", } exit_strategy_map = { "1": "Repatriation to On-Premises", "2": "Hybrid Cloud Adoption", - "3": "Migration to Alternate Cloud" + "3": "Migration to Alternate Cloud", } - type_map = { - "1": "Basic", - "2": "Standard" - } + type_map = {"1": "Basic", "2": "Standard"} # Prepare the summary data summary_data = [ ["Name", "Value"], - ["Cloud Service Provider", cloud_service_provider_map.get(str(metadata["cloud_service_provider"]), "Unknown")], - ["Exit Strategy", exit_strategy_map.get(str(metadata["exit_strategy"]), "Unknown")], + [ + "Cloud Service Provider", + cloud_service_provider_map.get( + str(metadata["cloud_service_provider"]), "Unknown" + ), + ], + [ + "Exit Strategy", + exit_strategy_map.get(str(metadata["exit_strategy"]), "Unknown"), + ], ["Assessment Type", type_map.get(str(metadata["assessment_type"]), "Unknown")], - ["TimeStamp", metadata["timestamp"]] + ["TimeStamp", metadata["timestamp"]], ] # Column widths - summary_colWidths = [4*cm, 11.5*cm] + summary_colWidths = [4 * cm, 11.5 * cm] # Create the summary table summary_table = Table(summary_data, colWidths=summary_colWidths) # Define the summary table style - summary_table_style = TableStyle([ - ('BACKGROUND', (0, 0), (-1, 0), HexColor('#115e59')), # Header row background color - ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), # Header row text color - ('GRID', (0, 0), (-1, -1), 1, HexColor("#000000")), # Grid lines - ('ALIGN', (0, 0), (-1, -1), 'LEFT'), # Left align all cells - ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), # Middle vertical alignment for all cells - ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), # Bold font for header row - ('FONTSIZE', (0, 0), (-1, 0), 11), # Font size for header row - ('BOTTOMPADDING', (0, 0), (-1, 0), 12), # Padding for header row - ('TOPPADDING', (0, 0), (-1, 0), 12) # Padding for header row - ]) + summary_table_style = TableStyle( + [ + ( + "BACKGROUND", + (0, 0), + (-1, 0), + HexColor("#115e59"), + ), # Header row background color + ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), # Header row text color + ("GRID", (0, 0), (-1, -1), 1, HexColor("#000000")), # Grid lines + ("ALIGN", (0, 0), (-1, -1), "LEFT"), # Left align all cells + ( + "VALIGN", + (0, 0), + (-1, -1), + "MIDDLE", + ), # Middle vertical alignment for all cells + ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), # Bold font for header row + ("FONTSIZE", (0, 0), (-1, 0), 11), # Font size for header row + ("BOTTOMPADDING", (0, 0), (-1, 0), 12), # Padding for header row + ("TOPPADDING", (0, 0), (-1, 0), 12), # Padding for header row + ] + ) summary_table.setStyle(summary_table_style) @@ -229,7 +332,7 @@ def header_footer(canvas, doc): content.append(Spacer(1, 12)) # --- Page 1: Scope of Assessment --- - content.append(Paragraph("Scope of Assessment", styles['Heading2'])) + content.append(Paragraph("Scope of Assessment", styles["Heading2"])) scope_block1 = "Defined scope of assessment:" content.append(Paragraph(scope_block1, content_style)) @@ -237,40 +340,65 @@ def header_footer(canvas, doc): scope_data = [["Name", "Value"]] if metadata["cloud_service_provider"] == 1: # Azure - scope_data.extend([ - ["Tenant ID", provider_details.get("tenantId", "N/A")], - ["Client ID", provider_details.get("clientId", "N/A")], - ["Client Secret", anonymize_string(provider_details.get("clientSecret", "N/A"))], - ["Subscription ID", provider_details.get("subscriptionId", "N/A")], - ["Resource Group Name", provider_details.get("resourceGroupName", "N/A")] - ]) + scope_data.extend( + [ + ["Tenant ID", provider_details.get("tenantId", "N/A")], + ["Client ID", provider_details.get("clientId", "N/A")], + [ + "Client Secret", + anonymize_string(provider_details.get("clientSecret", "N/A")), + ], + ["Subscription ID", provider_details.get("subscriptionId", "N/A")], + [ + "Resource Group Name", + provider_details.get("resourceGroupName", "N/A"), + ], + ] + ) elif metadata["cloud_service_provider"] == 2: # AWS - scope_data.extend([ - ["Access Key", provider_details.get("accessKey", "N/A")], - ["Secret Key", anonymize_string(provider_details.get("secretKey", "N/A"))], - ["Region", provider_details.get("region", "N/A")] - ]) + scope_data.extend( + [ + ["Access Key", provider_details.get("accessKey", "N/A")], + [ + "Secret Key", + anonymize_string(provider_details.get("secretKey", "N/A")), + ], + ["Region", provider_details.get("region", "N/A")], + ] + ) else: scope_data.append(["N/A", "N/A"]) # Column widths - scope_colWidths = [4*cm, 11.5*cm] + scope_colWidths = [4 * cm, 11.5 * cm] # Create the scope table scope_table = Table(scope_data, colWidths=scope_colWidths) # Define the scope table style - scope_table_style = TableStyle([ - ('BACKGROUND', (0, 0), (-1, 0), HexColor('#115e59')), # Header row background color - ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), # Header row text color - ('GRID', (0, 0), (-1, -1), 1, HexColor("#000000")), # Grid lines - ('ALIGN', (0, 0), (-1, -1), 'LEFT'), # Left align all cells - ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), # Middle vertical alignment for all cells - ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), # Bold font for header row - ('FONTSIZE', (0, 0), (-1, 0), 11), # Font size for header row - ('BOTTOMPADDING', (0, 0), (-1, 0), 12), # Padding for header row - ('TOPPADDING', (0, 0), (-1, 0), 12) # Padding for header row - ]) + scope_table_style = TableStyle( + [ + ( + "BACKGROUND", + (0, 0), + (-1, 0), + HexColor("#115e59"), + ), # Header row background color + ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), # Header row text color + ("GRID", (0, 0), (-1, -1), 1, HexColor("#000000")), # Grid lines + ("ALIGN", (0, 0), (-1, -1), "LEFT"), # Left align all cells + ( + "VALIGN", + (0, 0), + (-1, -1), + "MIDDLE", + ), # Middle vertical alignment for all cells + ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), # Bold font for header row + ("FONTSIZE", (0, 0), (-1, 0), 11), # Font size for header row + ("BOTTOMPADDING", (0, 0), (-1, 0), 12), # Padding for header row + ("TOPPADDING", (0, 0), (-1, 0), 12), # Padding for header row + ] + ) scope_table.setStyle(scope_table_style) @@ -279,9 +407,9 @@ def header_footer(canvas, doc): content.append(Spacer(1, 12)) # --- # Page 1: Costs --- - content.append(Paragraph("Costs", styles['Heading2'])) - #costs_block1 = "Overview of the costs for the last 6 months:" - #content.append(Paragraph(costs_block1, content_style)) + content.append(Paragraph("Costs", styles["Heading2"])) + # costs_block1 = "Overview of the costs for the last 6 months:" + # content.append(Paragraph(costs_block1, content_style)) costs_block2 = "Examining the costs reveals the financial impact of the transition, allowing for more informed decision-making and strategic planning." costs_paragraph = Paragraph(costs_block2, tablecontent_style) @@ -293,52 +421,62 @@ def header_footer(canvas, doc): # Create the data structure for the table costcharts_table_data = [ - [costs_paragraph, '', '', cost_chart, '', ''], # Row 1: Paragraph and Chart - months, # Row 2: Months - [f"{currency_symbol} {cost:.2f}" for cost in costs] # Row 3: Costs + [costs_paragraph, "", "", cost_chart, "", ""], # Row 1: Paragraph and Chart + months, # Row 2: Months + [f"{currency_symbol} {cost:.2f}" for cost in costs], # Row 3: Costs ] # Create the table with 6 columns costcharts_table = Table( - costcharts_table_data, - colWidths=[2.58333333333*cm] * 6 # Equal width columns + costcharts_table_data, colWidths=[2.58333333333 * cm] * 6 # Equal width columns ) # Define the table style - costcharts_table_style = TableStyle([ - # Merge cells for Row 1 - ('SPAN', (0, 0), (2, 0)), # Merge columns 1, 2, and 3 for the paragraph - ('SPAN', (3, 0), (5, 0)), # Merge columns 4, 5, and 6 for the chart - - # Align the merged cell (Row 1, Column 1-2-3) to top-left - ('VALIGN', (0, 0), (2, 0), 'TOP'), # Align vertically to top - ('ALIGN', (0, 0), (2, 0), 'LEFT'), # Align horizontally to left - - # Remove padding for the merged cell in Row 1, Columns 1-2-3 - ('LEFTPADDING', (0, 0), (2, 0), 0), - ('RIGHTPADDING', (0, 0), (2, 0), 0), - ('TOPPADDING', (0, 0), (2, 0), 0), - ('BOTTOMPADDING', (0, 0), (2, 0), 0), - - # Background and text color for Row 2 (months) - ('BACKGROUND', (0, 1), (-1, 1), HexColor('#115e59')), # Row 2 background color - ('TEXTCOLOR', (0, 1), (-1, 1), colors.white), # Row 2 text color - ('FONTNAME', (0, 1), (-1, 1), 'Helvetica-Bold'), # Bold font for Row 2 - - # Center alignment for Row 2 (months) - ('ALIGN', (0, 1), (-1, 1), 'CENTER'), # Center align -> Row 2 text - - # Font and alignment for Row 3 (costs) - ('FONTNAME', (0, 2), (-1, 2), 'Helvetica'), # Regular font for Row 3 - ('ALIGN', (0, 2), (-1, 2), 'CENTER'), # Center align -> Row 3 text - - # Grid lines for Row 2 and Row 3 - ('GRID', (0, 1), (-1, 2), 1, colors.black), # Grid for months and costs rows - - # Center alignment and vertical alignment for all cells - ('VALIGN', (0, 0), (-1, -1), 'MIDDLE'), # Vertical alignment for all cells - ('VALIGN', (0, 0), (2, 0), 'TOP'), # Align vertically to top for the merged cell - ]) + costcharts_table_style = TableStyle( + [ + # Merge cells for Row 1 + ("SPAN", (0, 0), (2, 0)), # Merge columns 1, 2, and 3 for the paragraph + ("SPAN", (3, 0), (5, 0)), # Merge columns 4, 5, and 6 for the chart + # Align the merged cell (Row 1, Column 1-2-3) to top-left + ("VALIGN", (0, 0), (2, 0), "TOP"), # Align vertically to top + ("ALIGN", (0, 0), (2, 0), "LEFT"), # Align horizontally to left + # Remove padding for the merged cell in Row 1, Columns 1-2-3 + ("LEFTPADDING", (0, 0), (2, 0), 0), + ("RIGHTPADDING", (0, 0), (2, 0), 0), + ("TOPPADDING", (0, 0), (2, 0), 0), + ("BOTTOMPADDING", (0, 0), (2, 0), 0), + # Background and text color for Row 2 (months) + ( + "BACKGROUND", + (0, 1), + (-1, 1), + HexColor("#115e59"), + ), # Row 2 background color + ("TEXTCOLOR", (0, 1), (-1, 1), colors.white), # Row 2 text color + ("FONTNAME", (0, 1), (-1, 1), "Helvetica-Bold"), # Bold font for Row 2 + # Center alignment for Row 2 (months) + ("ALIGN", (0, 1), (-1, 1), "CENTER"), # Center align -> Row 2 text + # Font and alignment for Row 3 (costs) + ("FONTNAME", (0, 2), (-1, 2), "Helvetica"), # Regular font for Row 3 + ("ALIGN", (0, 2), (-1, 2), "CENTER"), # Center align -> Row 3 text + # Grid lines for Row 2 and Row 3 + ( + "GRID", + (0, 1), + (-1, 2), + 1, + colors.black, + ), # Grid for months and costs rows + # Center alignment and vertical alignment for all cells + ("VALIGN", (0, 0), (-1, -1), "MIDDLE"), # Vertical alignment for all cells + ( + "VALIGN", + (0, 0), + (2, 0), + "TOP", + ), # Align vertically to top for the merged cell + ] + ) # Apply the table style costcharts_table.setStyle(costcharts_table_style) @@ -349,19 +487,21 @@ def header_footer(canvas, doc): # Page 2: Risks content.append(Spacer(1, header_padding)) - content.append(Paragraph("Risk Assessment", styles['Heading1'])) + content.append(Paragraph("Risk Assessment", styles["Heading1"])) risk_block1 = "The Risk Assessment provides a thorough evaluation of potential risks associated with the cloud resources utilized in the project and the alternative technologies available in the market:" content.append(Paragraph(risk_block1, content_style)) content.append(Spacer(1, 12)) # Transform the risk data for the PDF and get severity counts - risks, severity_counts = transform_risk_inventory_for_pdf(risk_data, risk_definitions, resource_inventory) + risks, severity_counts = transform_risk_inventory_for_pdf( + risk_data, risk_definitions, resource_inventory + ) # severity_counts is a dict like: {'high': X, 'medium': Y, 'low': Z} risk_chart_data = { - 'high': severity_counts['high'], - 'medium': severity_counts['medium'], - 'low': severity_counts['low'] + "high": severity_counts["high"], + "medium": severity_counts["medium"], + "low": severity_counts["low"], } risk_chart = draw_risk_chart(risk_chart_data) content.append(risk_chart) @@ -374,17 +514,25 @@ def header_footer(canvas, doc): # Define the path to severity icons severity_icon_map = { "high": (os.path.join(report_path, "assets/icons/severity/high.png"), 22.5, 12), - "medium": (os.path.join(report_path, "assets/icons/severity/medium.png"), 39, 12), - "low": (os.path.join(report_path, "assets/icons/severity/low.png"), 20.5, 12) + "medium": ( + os.path.join(report_path, "assets/icons/severity/medium.png"), + 39, + 12, + ), + "low": (os.path.join(report_path, "assets/icons/severity/low.png"), 20.5, 12), } # Build the risk table data risk_table_data = [["#", "Risk name", "Impacted", "Severity"]] for i, risk in enumerate(risks): - impacted_str = str(risk['impacted_resources_count']) if risk['impacted_resources_count'] > 0 else '-' + impacted_str = ( + str(risk["impacted_resources_count"]) + if risk["impacted_resources_count"] > 0 + else "-" + ) # Get the severity level and corresponding icon details - severity_level = risk['severity'].lower() + severity_level = risk["severity"].lower() icon_details = severity_icon_map.get(severity_level, None) if icon_details: @@ -396,12 +544,7 @@ def header_footer(canvas, doc): else: severity_icon = Paragraph("N/A", tablecontent_style) - risk_table_data.append([ - str(i + 1), - risk['name'], - impacted_str, - severity_icon - ]) + risk_table_data.append([str(i + 1), risk["name"], impacted_str, severity_icon]) # Add the total risks row total_risks = len(risks) @@ -412,28 +555,27 @@ def header_footer(canvas, doc): risk_table = Table(risk_table_data, colWidths=risk_table_colWidths) risk_table_style_commands = [ - ('BACKGROUND', (0, 0), (-1, 0), HexColor('#115e59')), # Header row background - ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), # Header text color - ('BACKGROUND', (0, -1), (-1, -1), HexColor('#115e59')), # Last row background - ('TEXTCOLOR', (0, -1), (-1, -1), colors.white), # Last row text color - ('BOX', (0, 0), (-1, -1), 1, HexColor('#112726')), - ('BOTTOMPADDING', (0, 0), (-1, 0), 12), # Padding for header row - ('TOPPADDING', (0, 0), (-1, 0), 12), + ("BACKGROUND", (0, 0), (-1, 0), HexColor("#115e59")), # Header row background + ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), # Header text color + ("BACKGROUND", (0, -1), (-1, -1), HexColor("#115e59")), # Last row background + ("TEXTCOLOR", (0, -1), (-1, -1), colors.white), # Last row text color + ("BOX", (0, 0), (-1, -1), 1, HexColor("#112726")), + ("BOTTOMPADDING", (0, 0), (-1, 0), 12), # Padding for header row + ("TOPPADDING", (0, 0), (-1, 0), 12), # Remove SPAN if not needed # ('SPAN', (-4, -1), (-2, -1)), - - ('ALIGN', (0, 1), (0, -2), 'LEFT'), - ('VALIGN', (0, 1), (0, -2), 'MIDDLE'), - ('ALIGN', (1, 1), (1, -2), 'LEFT'), - ('VALIGN', (1, 1), (1, -2), 'MIDDLE'), - ('ALIGN', (2, 1), (2, -2), 'CENTER'), - ('VALIGN', (2, 1), (2, -2), 'MIDDLE'), - ('ALIGN', (3, 1), (3, -2), 'CENTER'), - ('VALIGN', (3, 1), (3, -2), 'MIDDLE'), - ('ALIGN', (-1, 0), (-1, 0), 'CENTER'), - ('VALIGN', (-1, 0), (-1, 0), 'MIDDLE'), - ('ALIGN', (-1, -1), (-1, -1), 'CENTER'), - ('VALIGN', (-1, -1), (-1, -1), 'MIDDLE') + ("ALIGN", (0, 1), (0, -2), "LEFT"), + ("VALIGN", (0, 1), (0, -2), "MIDDLE"), + ("ALIGN", (1, 1), (1, -2), "LEFT"), + ("VALIGN", (1, 1), (1, -2), "MIDDLE"), + ("ALIGN", (2, 1), (2, -2), "CENTER"), + ("VALIGN", (2, 1), (2, -2), "MIDDLE"), + ("ALIGN", (3, 1), (3, -2), "CENTER"), + ("VALIGN", (3, 1), (3, -2), "MIDDLE"), + ("ALIGN", (-1, 0), (-1, 0), "CENTER"), + ("VALIGN", (-1, 0), (-1, 0), "MIDDLE"), + ("ALIGN", (-1, -1), (-1, -1), "CENTER"), + ("VALIGN", (-1, -1), (-1, -1), "MIDDLE"), ] risk_table.setStyle(TableStyle(risk_table_style_commands)) @@ -443,8 +585,8 @@ def header_footer(canvas, doc): # Page 3: EscapeCloud Scoring if metadata.get("assessment_type") == 2: content.append(Spacer(1, header_padding)) - content.append(Paragraph("EscapeCloud Scoring", styles['Heading1'])) - content.append(Paragraph("Scoring #1 - Exit Score", styles['Heading2'])) + content.append(Paragraph("EscapeCloud Scoring", styles["Heading1"])) + content.append(Paragraph("Scoring #1 - Exit Score", styles["Heading2"])) scoring_block1 = "The following gauge chart visualizes a combined score that reflects both risk assessment results and the evaluation of alternative technologies:" @@ -456,92 +598,112 @@ def header_footer(canvas, doc): chart_output_path = os.path.join(report_path, "assets/charts") os.makedirs(chart_output_path, exist_ok=True) - exit_score_image_path = draw_exitscore_chart(exit_score, chart_output_path, width=750, height=500) + exit_score_image_path = draw_exitscore_chart( + exit_score, chart_output_path, width=750, height=500 + ) # Define the table data exitscore_table_data = [ - ['', ''], - ['Complex (0 - 20)', ''], - ['Challenging (20 - 40)', ''], - ['Manageable (40 - 60)', ''], - ['Smooth Transition (60 - 80)', ''], - ['Seamless (80 - 100)', ''] + ["", ""], + ["Complex (0 - 20)", ""], + ["Challenging (20 - 40)", ""], + ["Manageable (40 - 60)", ""], + ["Smooth Transition (60 - 80)", ""], + ["Seamless (80 - 100)", ""], ] - exitscore_table_data[1][1] = Image(exit_score_image_path, width=7.5*cm, height=5*cm) + exitscore_table_data[1][1] = Image( + exit_score_image_path, width=7.5 * cm, height=5 * cm + ) # Column widhts - exitscore_colWidths = [5*cm, 10.5*cm] + exitscore_colWidths = [5 * cm, 10.5 * cm] # Create the table exitscore_table = Table(exitscore_table_data, colWidths=exitscore_colWidths) # Style the table - exitscore_table_style = TableStyle([ - ('SPAN', (0,0), (1,0)), - ('BACKGROUND', (0,0), (1,0), HexColor('#115e59')), - ('TEXTCOLOR', (0,0), (1,0), colors.white), - ('FONTNAME', (0,0), (1,0), 'Helvetica-Bold'), - ('ALIGN', (0,0), (1,0), 'CENTER'), - ('VALIGN', (0,0), (1,0), 'MIDDLE'), - ('SPAN', (1,1), (1,5)), - ('GRID', (0,0), (-1,-1), 1, colors.black), - ('ALIGN', (0,1), (0,5), 'LEFT'), - ('VALIGN', (0,1), (0,5), 'MIDDLE'), - ('ALIGN', (1,1), (1,1), 'CENTER'), - ('VALIGN', (1,1), (1,1), 'MIDDLE') - ]) + exitscore_table_style = TableStyle( + [ + ("SPAN", (0, 0), (1, 0)), + ("BACKGROUND", (0, 0), (1, 0), HexColor("#115e59")), + ("TEXTCOLOR", (0, 0), (1, 0), colors.white), + ("FONTNAME", (0, 0), (1, 0), "Helvetica-Bold"), + ("ALIGN", (0, 0), (1, 0), "CENTER"), + ("VALIGN", (0, 0), (1, 0), "MIDDLE"), + ("SPAN", (1, 1), (1, 5)), + ("GRID", (0, 0), (-1, -1), 1, colors.black), + ("ALIGN", (0, 1), (0, 5), "LEFT"), + ("VALIGN", (0, 1), (0, 5), "MIDDLE"), + ("ALIGN", (1, 1), (1, 1), "CENTER"), + ("VALIGN", (1, 1), (1, 1), "MIDDLE"), + ] + ) exitscore_table.setStyle(exitscore_table_style) content.append(exitscore_table) content.append(Spacer(1, 12)) - content.append(Paragraph("Scoring #2 - Vendor Lock-In Score", styles['Heading2'])) + content.append( + Paragraph("Scoring #2 - Vendor Lock-In Score", styles["Heading2"]) + ) scoring_block2 = "The following radar chart visualizes the assessment of alternative technologies across three dimensions: Human (skills availability), Technology (maturity and vendor stability), and Operational (ecosystem and support services) — only where viable alternatives exist:" content.append(Paragraph(scoring_block2, content_style)) content.append(Spacer(1, 12)) human_score = scoring_data.get("human_score", 0) if scoring_data else 0 - technology_score = scoring_data.get("technology_score", 0) if scoring_data else 0 - operational_score = scoring_data.get("operational_score", 0) if scoring_data else 0 - - vendor_lockin_chart = draw_vendor_lockin_radar_chart(human_score, technology_score, operational_score) + technology_score = ( + scoring_data.get("technology_score", 0) if scoring_data else 0 + ) + operational_score = ( + scoring_data.get("operational_score", 0) if scoring_data else 0 + ) + + vendor_lockin_chart = draw_vendor_lockin_radar_chart( + human_score, technology_score, operational_score + ) content.append(vendor_lockin_chart) # Define the table data vendor_lockin_table_data = [ - ['Human', 'Technology', 'Operational'], - [human_score, technology_score, operational_score] + ["Human", "Technology", "Operational"], + [human_score, technology_score, operational_score], ] # Column widhts - vendor_lockin_colWidths = [5*cm, 5*cm, 5*cm] + vendor_lockin_colWidths = [5 * cm, 5 * cm, 5 * cm] # Create the table - vendor_lockin_table = Table(vendor_lockin_table_data, colWidths=vendor_lockin_colWidths) + vendor_lockin_table = Table( + vendor_lockin_table_data, colWidths=vendor_lockin_colWidths + ) # Style the table - vendor_lockin_table_style = TableStyle([ - ('BACKGROUND', (0, 0), (-1, 0), HexColor('#115e59')), - ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), - ('ALIGN', (0, 0), (-1, 0), 'CENTER'), - ('VALIGN', (0, 0), (-1, 0), 'MIDDLE'), - ('ALIGN', (0, 0), (-1, -1), 'CENTER'), - ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), - ('GRID', (0, 0), (-1, -1), 1, colors.black), - ]) + vendor_lockin_table_style = TableStyle( + [ + ("BACKGROUND", (0, 0), (-1, 0), HexColor("#115e59")), + ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), + ("ALIGN", (0, 0), (-1, 0), "CENTER"), + ("VALIGN", (0, 0), (-1, 0), "MIDDLE"), + ("ALIGN", (0, 0), (-1, -1), "CENTER"), + ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), + ("GRID", (0, 0), (-1, -1), 1, colors.black), + ] + ) vendor_lockin_table.setStyle(vendor_lockin_table_style) content.append(vendor_lockin_table) content.append(PageBreak()) # Page 4: Resource Inventory content.append(Spacer(1, header_padding)) - content.append(Paragraph("Resource Inventory", styles['Heading1'])) + content.append(Paragraph("Resource Inventory", styles["Heading1"])) res_block1 = "The Resource Inventory provides a summary of the cloud resources provisioned within the defined scope:" content.append(Paragraph(res_block1, content_style)) content.append(Spacer(1, 12)) # Transform the resource inventory data for the PDF - resources = transform_resource_inventory_for_pdf(resource_inventory, resource_type_mapping, report_path) + resources = transform_resource_inventory_for_pdf( + resource_inventory, resource_type_mapping, report_path + ) # Compute total resources total_resources = sum(res["count"] for res in resources) @@ -550,12 +712,14 @@ def header_footer(canvas, doc): resource_data = [["#", "Resource type", "", "No."]] for res in resources: - resource_data.append([ - str(res["id"]), - res["resource_name"], - Image(res["icon_url"], width=20, height=20), - str(res["count"]) - ]) + resource_data.append( + [ + str(res["id"]), + res["resource_name"], + Image(res["icon_url"], width=20, height=20), + str(res["count"]), + ] + ) # Add the total resources row resource_data.append(["Total Resources", "", "", str(total_resources)]) @@ -564,28 +728,37 @@ def header_footer(canvas, doc): res_table = Table(resource_data, colWidths=res_colWidths) res_table_style_commands = [ - ('BACKGROUND', (0, 0), (-1, 0), HexColor('#115e59')), # Header row background color - ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), # Header row text color - ('BACKGROUND', (0, -1), (-1, -1), HexColor('#115e59')), # Last row background color - ('TEXTCOLOR', (0, -1), (-1, -1), colors.white), # Last row text color - ('BOX', (0, 0), (-1, -1), 1, HexColor('#112726')), - ('BOTTOMPADDING', (0, 0), (-1, 0), 12), # Padding for header row - ('TOPPADDING', (0, 0), (-1, 0), 12), # Padding for header row + ( + "BACKGROUND", + (0, 0), + (-1, 0), + HexColor("#115e59"), + ), # Header row background color + ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), # Header row text color + ( + "BACKGROUND", + (0, -1), + (-1, -1), + HexColor("#115e59"), + ), # Last row background color + ("TEXTCOLOR", (0, -1), (-1, -1), colors.white), # Last row text color + ("BOX", (0, 0), (-1, -1), 1, HexColor("#112726")), + ("BOTTOMPADDING", (0, 0), (-1, 0), 12), # Padding for header row + ("TOPPADDING", (0, 0), (-1, 0), 12), # Padding for header row # If you previously had a SPAN on the last row, remove if not needed now. # ('SPAN', (-4, -1), (-2, -1)), # remove if not required - - ('ALIGN', (0, 1), (0, -2), 'LEFT'), # Aligning the '#' column - ('VALIGN', (0, 1), (0, -2), 'MIDDLE'), - ('ALIGN', (1, 1), (1, -2), 'LEFT'), # Resource name column - ('VALIGN', (1, 1), (1, -2), 'MIDDLE'), - ('ALIGN', (2, 1), (2, -2), 'CENTER'), # Icon column - ('VALIGN', (2, 1), (2, -2), 'MIDDLE'), - ('ALIGN', (3, 1), (3, -2), 'CENTER'), # Number column - ('VALIGN', (3, 1), (3, -2), 'MIDDLE'), - ('ALIGN', (-1, 0), (-1, 0), 'CENTER'), - ('VALIGN', (-1, 0), (-1, 0), 'MIDDLE'), - ('ALIGN', (-1, -1), (-1, -1), 'CENTER'), - ('VALIGN', (-1, -1), (-1, -1), 'MIDDLE') + ("ALIGN", (0, 1), (0, -2), "LEFT"), # Aligning the '#' column + ("VALIGN", (0, 1), (0, -2), "MIDDLE"), + ("ALIGN", (1, 1), (1, -2), "LEFT"), # Resource name column + ("VALIGN", (1, 1), (1, -2), "MIDDLE"), + ("ALIGN", (2, 1), (2, -2), "CENTER"), # Icon column + ("VALIGN", (2, 1), (2, -2), "MIDDLE"), + ("ALIGN", (3, 1), (3, -2), "CENTER"), # Number column + ("VALIGN", (3, 1), (3, -2), "MIDDLE"), + ("ALIGN", (-1, 0), (-1, 0), "CENTER"), + ("VALIGN", (-1, 0), (-1, 0), "MIDDLE"), + ("ALIGN", (-1, -1), (-1, -1), "CENTER"), + ("VALIGN", (-1, -1), (-1, -1), "MIDDLE"), ] res_table_style = TableStyle(res_table_style_commands) @@ -596,52 +769,81 @@ def header_footer(canvas, doc): # Page 5: Alternative Technologies content.append(Spacer(1, header_padding)) - content.append(Paragraph("Alternative Technologies", styles['Heading1'])) + content.append(Paragraph("Alternative Technologies", styles["Heading1"])) - alttech_block = ("The Alternative Technology provides a summary of the alternative technology landscape " - "for each identified resource in the Resource Inventory, based on our dataset and market research. " - "It also includes a count of the available alternative technologies for each resource:") + alttech_block = ( + "The Alternative Technology provides a summary of the alternative technology landscape " + "for each identified resource in the Resource Inventory, based on our dataset and market research. " + "It also includes a count of the available alternative technologies for each resource:" + ) content.append(Paragraph(alttech_block, content_style)) content.append(Spacer(1, 12)) # Transform the alternative technologies data for the PDF alttech = transform_alt_tech_for_pdf( - resource_inventory, resource_type_mapping, alternatives, alternative_technologies, exit_strategy, report_path + resource_inventory, + resource_type_mapping, + alternatives, + alternative_technologies, + exit_strategy, + report_path, ) # Build the table data alttech_data = [["#", "Resource type", "", "No."]] for res in alttech: - alttech_data.append([ - str(res["id"]), - res["resource_name"], - Image(res["icon_url"], width=20, height=20) if res["icon_url"] else "", - str(res["count"]) - ]) + alttech_data.append( + [ + str(res["id"]), + res["resource_name"], + Image(res["icon_url"], width=20, height=20) if res["icon_url"] else "", + str(res["count"]), + ] + ) # Define the column widths - alttech_colWidths = [1*cm, 11.5*cm, 1.5*cm, 1.5*cm] + alttech_colWidths = [1 * cm, 11.5 * cm, 1.5 * cm, 1.5 * cm] # Create and style the alternative technology table alttech_table = Table(alttech_data, colWidths=alttech_colWidths) alttech_table_style_commands = [ - ('BACKGROUND', (0, 0), (-1, 0), HexColor('#115e59')), # Header row background color - ('TEXTCOLOR', (0, 0), (-1, 0), colors.white), # Header row text color - ('BOX', (0, 0), (-1, -1), 1, HexColor("#000000")), # Draw box around the table - ('BOTTOMPADDING', (0, 1), (-1, -1), 6), # Apply bottom padding to all rows except the header - ('TOPPADDING', (0, 1), (-1, -1), 6), # Apply top padding to all rows except the header - ('ALIGN', (2, 0), (2, -1), 'CENTER'), # Center align the text in the icon column - ('VALIGN', (2, 0), (2, -1), 'MIDDLE'), - ('ALIGN', (0, 1), (0, -1), 'LEFT'), - ('VALIGN', (0, 1), (0, -1), 'MIDDLE'), - ('ALIGN', (1, 1), (1, -1), 'LEFT'), - ('VALIGN', (1, 1), (1, -1), 'MIDDLE'), - ('ALIGN', (2, 1), (2, -1), 'CENTER'), - ('VALIGN', (2, 1), (2, -1), 'MIDDLE'), - ('ALIGN', (3, 1), (3, -1), 'CENTER'), - ('VALIGN', (3, 1), (3, -1), 'MIDDLE'), - ('ALIGN', (-1, 0), (-1, 0), 'CENTER'), # Center align the "No." header - ('VALIGN', (-1, 0), (-1, 0), 'MIDDLE'), + ( + "BACKGROUND", + (0, 0), + (-1, 0), + HexColor("#115e59"), + ), # Header row background color + ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), # Header row text color + ("BOX", (0, 0), (-1, -1), 1, HexColor("#000000")), # Draw box around the table + ( + "BOTTOMPADDING", + (0, 1), + (-1, -1), + 6, + ), # Apply bottom padding to all rows except the header + ( + "TOPPADDING", + (0, 1), + (-1, -1), + 6, + ), # Apply top padding to all rows except the header + ( + "ALIGN", + (2, 0), + (2, -1), + "CENTER", + ), # Center align the text in the icon column + ("VALIGN", (2, 0), (2, -1), "MIDDLE"), + ("ALIGN", (0, 1), (0, -1), "LEFT"), + ("VALIGN", (0, 1), (0, -1), "MIDDLE"), + ("ALIGN", (1, 1), (1, -1), "LEFT"), + ("VALIGN", (1, 1), (1, -1), "MIDDLE"), + ("ALIGN", (2, 1), (2, -1), "CENTER"), + ("VALIGN", (2, 1), (2, -1), "MIDDLE"), + ("ALIGN", (3, 1), (3, -1), "CENTER"), + ("VALIGN", (3, 1), (3, -1), "MIDDLE"), + ("ALIGN", (-1, 0), (-1, 0), "CENTER"), # Center align the "No." header + ("VALIGN", (-1, 0), (-1, 0), "MIDDLE"), ] alttech_table.setStyle(TableStyle(alttech_table_style_commands)) diff --git a/core/utils_report_html.py b/core/utils_report_html.py index bc6bd76..210dba2 100644 --- a/core/utils_report_html.py +++ b/core/utils_report_html.py @@ -8,17 +8,16 @@ logger = logging.getLogger("core.engine.report_html") logger.setLevel(logging.INFO) -def transform_cost_inventory_for_html(cost_data: List[Dict[str, Any]]) -> Tuple[List[str], List[float], float, str, str]: + +def transform_cost_inventory_for_html( + cost_data: List[Dict[str, Any]], +) -> Tuple[List[str], List[float], float, str, str]: months = [] cost_values = [] total_cost = 0 # Map currency codes to their respective symbols - currency_symbols = { - "USD": "$", - "GBP": "£", - "EUR": "€" - } + currency_symbols = {"USD": "$", "GBP": "£", "EUR": "€"} # Convert list to dictionary if necessary if isinstance(cost_data, list): @@ -30,30 +29,43 @@ def transform_cost_inventory_for_html(cost_data: List[Dict[str, Any]]) -> Tuple[ # Extract currency from the first entry, assuming all costs use the same currency first_entry = next(iter(cost_data.values()), None) currency_code = first_entry.get("currency", "USD") if first_entry else "USD" - currency_symbol = currency_symbols.get(currency_code, currency_code) # Default to currency_code if no symbol exists + currency_symbol = currency_symbols.get( + currency_code, currency_code + ) # Default to currency_code if no symbol exists # Iterate over the cost data, expecting 6 months for month, details in sorted(cost_data.items()): - months.append(datetime.strptime(month, "%Y-%m-%d").strftime('%b')) + months.append(datetime.strptime(month, "%Y-%m-%d").strftime("%b")) cost_values.append(details["cost"]) total_cost += details["cost"] total_cost = round(total_cost, 2) return months, cost_values, total_cost, currency_code, currency_symbol -def transform_risk_inventory_for_html(risk_data: List[Dict[str, Any]], risk_definitions: List[Dict[str, Any]], resource_inventory: Dict[str, Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], Dict[str, int]]: - severity_order = {'high': 1, 'medium': 2, 'low': 3} - severity_counts = {'high': 0, 'medium': 0, 'low': 0} + +def transform_risk_inventory_for_html( + risk_data: List[Dict[str, Any]], + risk_definitions: List[Dict[str, Any]], + resource_inventory: Dict[str, Dict[str, Any]], +) -> Tuple[List[Dict[str, Any]], Dict[str, int]]: + severity_order = {"high": 1, "medium": 2, "low": 3} + severity_counts = {"high": 0, "medium": 0, "low": 0} sorted_risks = [] # Map resource IDs to resource names for quick lookup - resource_name_map = {str(key): value['name'] for key, value in resource_inventory.items()} + resource_name_map = { + str(key): value["name"] for key, value in resource_inventory.items() + } # Group risks by their risk code and impacted resources risk_map = defaultdict(lambda: {"impacted_resources": set(), "count": 0}) for risk_entry in risk_data: - risk_code = risk_entry['risk'] - resource_type = str(risk_entry['resource_type']) if risk_entry['resource_type'] != "null" else None + risk_code = risk_entry["risk"] + resource_type = ( + str(risk_entry["resource_type"]) + if risk_entry["resource_type"] != "null" + else None + ) if resource_type: # Handle risks with associated resource types @@ -67,48 +79,66 @@ def transform_risk_inventory_for_html(risk_data: List[Dict[str, Any]], risk_defi # Process risk definitions for risk_code, risk_info in risk_map.items(): - risk_definition = next((rd for rd in risk_definitions if rd["id"] == risk_code), None) + risk_definition = next( + (rd for rd in risk_definitions if rd["id"] == risk_code), None + ) if not risk_definition: continue - severity = risk_definition['severity'] + severity = risk_definition["severity"] severity_counts[severity] += 1 - sorted_risks.append({ - 'name': risk_definition['name'], - 'description': risk_definition['description'], - 'impacted_resources': list(risk_info["impacted_resources"]), - 'impacted_resources_count': risk_info["count"], - 'severity': severity - }) + sorted_risks.append( + { + "name": risk_definition["name"], + "description": risk_definition["description"], + "impacted_resources": list(risk_info["impacted_resources"]), + "impacted_resources_count": risk_info["count"], + "severity": severity, + } + ) # Sort risks by severity - sorted_risks.sort(key=lambda x: severity_order.get(x['severity'], 4)) + sorted_risks.sort(key=lambda x: severity_order.get(x["severity"], 4)) return sorted_risks, severity_counts -def transform_alt_tech_for_html(resource_inventory: List[Dict[str, Any]], alternatives: List[Dict[str, Any]], alternative_technologies: List[Dict[str, Any]], exit_strategy: int) -> List[Dict[str, Any]]: + +def transform_alt_tech_for_html( + resource_inventory: List[Dict[str, Any]], + alternatives: List[Dict[str, Any]], + alternative_technologies: List[Dict[str, Any]], + exit_strategy: int, +) -> List[Dict[str, Any]]: alt_tech_data = [] for resource in resource_inventory: resource_type = resource.get("resource_type") relevant_alternatives = [ - alt for alt in alternatives - if str(alt["resource_type"]) == str(resource_type) and str(alt["strategy_type"]) == str(exit_strategy) + alt + for alt in alternatives + if str(alt["resource_type"]) == str(resource_type) + and str(alt["strategy_type"]) == str(exit_strategy) ] for alt in relevant_alternatives: tech = next( - (t for t in alternative_technologies if t["id"] == alt["alternative_technology"] and t["status"] == "t"), - None + ( + t + for t in alternative_technologies + if t["id"] == alt["alternative_technology"] and t["status"] == "t" + ), + None, ) if tech: - alt_tech_data.append({ - "resource_type_id": resource_type, - "product_name": tech.get("product_name"), - "product_description": tech.get("product_description"), - "product_url": tech.get("product_url"), - "open_source": tech.get("open_source") == "t", - "support_plan": tech.get("support_plan") == "t", - "status": tech.get("status") == "t", - }) + alt_tech_data.append( + { + "resource_type_id": resource_type, + "product_name": tech.get("product_name"), + "product_description": tech.get("product_description"), + "product_url": tech.get("product_url"), + "open_source": tech.get("open_source") == "t", + "support_plan": tech.get("support_plan") == "t", + "status": tech.get("status") == "t", + } + ) return alt_tech_data diff --git a/core/utils_report_json.py b/core/utils_report_json.py index 8c2c05a..230befb 100644 --- a/core/utils_report_json.py +++ b/core/utils_report_json.py @@ -8,7 +8,11 @@ logger = logging.getLogger("core.engine.report_json") logger.setLevel(logging.INFO) -def transform_resource_inventory_for_json(resource_inventory: List[Dict[str, Any]], resource_type_mapping: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: + +def transform_resource_inventory_for_json( + resource_inventory: List[Dict[str, Any]], + resource_type_mapping: Dict[str, Dict[str, Any]], +) -> List[Dict[str, Any]]: resource_inventory_json = [] for idx, resource in enumerate(resource_inventory): resource_type = str(resource["resource_type"]) @@ -16,46 +20,65 @@ def transform_resource_inventory_for_json(resource_inventory: List[Dict[str, Any resource_name = resource_info.get("name", "Unknown Resource") resource_code = resource_info.get("code", "N/A") - resource_inventory_json.append({ - "id": idx + 1, - "code": resource_code, - "resource_name": resource_name, - "location": resource.get("location", "Unknown"), - "count": resource.get("count", 0) - }) + resource_inventory_json.append( + { + "id": idx + 1, + "code": resource_code, + "resource_name": resource_name, + "location": resource.get("location", "Unknown"), + "count": resource.get("count", 0), + } + ) return resource_inventory_json -def transform_cost_inventory_for_json(cost_data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + +def transform_cost_inventory_for_json( + cost_data: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: # Sort by date before transformation - sorted_cost_data = sorted(cost_data, key=lambda x: datetime.strptime(x["month"], "%Y-%m-%d")) + sorted_cost_data = sorted( + cost_data, key=lambda x: datetime.strptime(x["month"], "%Y-%m-%d") + ) cost_inventory = [ { "month": item["month"], "cost": round(item["cost"], 2), - "currency": item["currency"] + "currency": item["currency"], } for item in sorted_cost_data ] return cost_inventory -def transform_risk_inventory_for_json(risk_data: List[Dict[str, Any]], risk_definitions: List[Dict[str, Any]], resource_inventory: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + +def transform_risk_inventory_for_json( + risk_data: List[Dict[str, Any]], + risk_definitions: List[Dict[str, Any]], + resource_inventory: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: # Map resource_type to their corresponding resource IDs - resource_id_map = {str(value["resource_type"]): key + 1 for key, value in enumerate(resource_inventory)} + resource_id_map = { + str(value["resource_type"]): key + 1 + for key, value in enumerate(resource_inventory) + } # Group risks by risk.id - risk_map = defaultdict(lambda: { - "id": None, - "name": "", - "description": "", - "severity": "", - "impacted_resources": set(), - "impacted_resources_count": 0 - }) + risk_map = defaultdict( + lambda: { + "id": None, + "name": "", + "description": "", + "severity": "", + "impacted_resources": set(), + "impacted_resources_count": 0, + } + ) for risk_entry in risk_data: risk_id = risk_entry["risk"] - risk_definition = next((rd for rd in risk_definitions if rd["id"] == risk_id), None) + risk_definition = next( + (rd for rd in risk_definitions if rd["id"] == risk_id), None + ) if not risk_definition: continue @@ -76,16 +99,29 @@ def transform_risk_inventory_for_json(risk_data: List[Dict[str, Any]], risk_defi # Convert impacted_resources set to a list and compute counts for risk in risk_map.values(): risk["impacted_resources"] = list(risk["impacted_resources"]) - risk["impacted_resources_count"] = len(risk["impacted_resources"]) if risk["impacted_resources"] else None + risk["impacted_resources_count"] = ( + len(risk["impacted_resources"]) if risk["impacted_resources"] else None + ) return list(risk_map.values()) -def transform_alt_tech_for_json(resource_inventory: List[Dict[str, Any]], alternatives: List[Dict[str, Any]], alternative_technologies: List[Dict[str, Any]], exit_strategy: int) -> Dict[int, List[Dict[str, Any]]]: + +def transform_alt_tech_for_json( + resource_inventory: List[Dict[str, Any]], + alternatives: List[Dict[str, Any]], + alternative_technologies: List[Dict[str, Any]], + exit_strategy: int, +) -> Dict[int, List[Dict[str, Any]]]: # Map resource_type to resource_id - resource_id_map = {str(value["resource_type"]): key + 1 for key, value in enumerate(resource_inventory)} + resource_id_map = { + str(value["resource_type"]): key + 1 + for key, value in enumerate(resource_inventory) + } # Initialize the grouped alternative technologies - grouped_alt_tech_data = {resource_id: [] for resource_id in resource_id_map.values()} + grouped_alt_tech_data = { + resource_id: [] for resource_id in resource_id_map.values() + } # Iterate through alternatives to group them by resource_id for alt in alternatives: @@ -93,20 +129,28 @@ def transform_alt_tech_for_json(resource_inventory: List[Dict[str, Any]], altern continue tech = next( - (t for t in alternative_technologies if t["id"] == alt["alternative_technology"] and t["status"] == "t"), - None + ( + t + for t in alternative_technologies + if t["id"] == alt["alternative_technology"] and t["status"] == "t" + ), + None, ) if tech: resource_id = resource_id_map.get(str(alt["resource_type"])) if resource_id: - grouped_alt_tech_data[resource_id].append({ - "id": len(grouped_alt_tech_data[resource_id]) + 1, - "product_name": tech["product_name"], - "product_description": tech["product_description"], - "product_url": tech["product_url"], - "open_source": tech["open_source"] == "t", - "support_plan": tech["support_plan"] == "t" - }) + grouped_alt_tech_data[resource_id].append( + { + "id": len(grouped_alt_tech_data[resource_id]) + 1, + "product_name": tech["product_name"], + "product_description": tech["product_description"], + "product_url": tech["product_url"], + "open_source": tech["open_source"] == "t", + "support_plan": tech["support_plan"] == "t", + } + ) # Return the grouped alternatives - return {key: grouped_alt_tech_data[key] for key in sorted(grouped_alt_tech_data.keys())} + return { + key: grouped_alt_tech_data[key] for key in sorted(grouped_alt_tech_data.keys()) + } diff --git a/core/utils_report_pdf.py b/core/utils_report_pdf.py index 6108887..6b681c4 100644 --- a/core/utils_report_pdf.py +++ b/core/utils_report_pdf.py @@ -17,6 +17,7 @@ from reportlab.graphics.charts.legends import Legend from reportlab.graphics.charts.piecharts import Pie from reportlab.graphics.charts.barcharts import VerticalBarChart + # Plotly import plotly.graph_objects as go @@ -24,7 +25,10 @@ logger = logging.getLogger("core.engine.report_pdf") logger.setLevel(logging.INFO) -def transform_resource_inventory_for_pdf(resource_inventory: list, resource_type_mapping: Dict[str, Any], report_path: str) -> List[Dict[str, Any]]: + +def transform_resource_inventory_for_pdf( + resource_inventory: list, resource_type_mapping: Dict[str, Any], report_path: str +) -> List[Dict[str, Any]]: resources = [] for idx, resource in enumerate(resource_inventory): # Convert resource_type to string for lookup @@ -39,32 +43,38 @@ def transform_resource_inventory_for_pdf(resource_inventory: list, resource_type # Prepend report_storage to form the full path to the icon icon_url = f"{report_path}{icon_path}" - resources.append({ - "id": idx + 1, - "resource_name": resource_name, - "icon_url": icon_url, - "location": resource.get("location", "Unknown"), - "count": resource.get("count", 0) - }) + resources.append( + { + "id": idx + 1, + "resource_name": resource_name, + "icon_url": icon_url, + "location": resource.get("location", "Unknown"), + "count": resource.get("count", 0), + } + ) return resources -def transform_cost_inventory_for_pdf(cost_data: list) -> Tuple[List[str], List[float], str]: + +def transform_cost_inventory_for_pdf( + cost_data: list, +) -> Tuple[List[str], List[float], str]: # Map currency codes to their respective symbols - currency_symbols = { - "USD": "$", - "GBP": "£", - "EUR": "€" - } + currency_symbols = {"USD": "$", "GBP": "£", "EUR": "€"} # Sort cost_data by date ascending - sorted_cost_data = sorted(cost_data, key=lambda x: datetime.strptime(x["month"], "%Y-%m-%d")) + sorted_cost_data = sorted( + cost_data, key=lambda x: datetime.strptime(x["month"], "%Y-%m-%d") + ) # Take the last 6 months last_six_cost_data = sorted_cost_data[-6:] # Extract months and costs - months = [datetime.strptime(item["month"], "%Y-%m-%d").strftime('%b') for item in last_six_cost_data] + months = [ + datetime.strptime(item["month"], "%Y-%m-%d").strftime("%b") + for item in last_six_cost_data + ] costs = [item["cost"] for item in last_six_cost_data] # Determine currency symbol @@ -76,7 +86,10 @@ def transform_cost_inventory_for_pdf(cost_data: list) -> Tuple[List[str], List[f return months, costs, currency_symbol -def transform_risk_inventory_for_pdf(risk_data: list, risk_definitions: list,resource_inventory: list) -> Tuple[List[Dict[str, Any]], Dict[str, int]]: + +def transform_risk_inventory_for_pdf( + risk_data: list, risk_definitions: list, resource_inventory: list +) -> Tuple[List[Dict[str, Any]], Dict[str, int]]: # Create a lookup for risk_definitions by their 'id' risk_def_map = {rd["id"]: rd for rd in risk_definitions} @@ -85,17 +98,14 @@ def transform_risk_inventory_for_pdf(risk_data: list, risk_definitions: list,res for entry in risk_data: risk_code = entry["risk"] if risk_code not in risk_map: - risk_map[risk_code] = { - "impacted_resources_count": 0, - "entries": [] - } + risk_map[risk_code] = {"impacted_resources_count": 0, "entries": []} # If resource_type is not null, increment impacted_resources_count if entry["resource_type"] is not None and entry["resource_type"] != "null": risk_map[risk_code]["impacted_resources_count"] += 1 risk_map[risk_code]["entries"].append(entry) # We'll track severity counts as well - severity_counts = {'high': 0, 'medium': 0, 'low': 0} + severity_counts = {"high": 0, "medium": 0, "low": 0} # Build a final list of risks with name, severity, impacted_resources_count risks = [] @@ -108,22 +118,40 @@ def transform_risk_inventory_for_pdf(risk_data: list, risk_definitions: list,res if severity in severity_counts: severity_counts[severity] += 1 - risks.append({ - "name": rd["name"], - "severity": severity, - "impacted_resources_count": data["impacted_resources_count"] - }) + risks.append( + { + "name": rd["name"], + "severity": severity, + "impacted_resources_count": data["impacted_resources_count"], + } + ) return risks, severity_counts -def transform_alt_tech_for_pdf(resource_inventory: list, resource_type_mapping: Dict[str, Any], alternatives: list, alternative_technologies: list, exit_strategy: int, report_path: str) -> List[Dict[str, Any]]: + +def transform_alt_tech_for_pdf( + resource_inventory: list, + resource_type_mapping: Dict[str, Any], + alternatives: list, + alternative_technologies: list, + exit_strategy: int, + report_path: str, +) -> List[Dict[str, Any]]: # Count how many valid alternatives each resource_type has for the given exit_strategy alt_counts = {} for alt in alternatives: if str(alt.get("strategy_type")) == str(exit_strategy): rtype_str = str(alt.get("resource_type")) - tech = next((t for t in alternative_technologies if t["id"] == alt["alternative_technology"] and t.get("status") == "t"), None) + tech = next( + ( + t + for t in alternative_technologies + if t["id"] == alt["alternative_technology"] + and t.get("status") == "t" + ), + None, + ) if tech: alt_counts[rtype_str] = alt_counts.get(rtype_str, 0) + 1 @@ -138,15 +166,18 @@ def transform_alt_tech_for_pdf(resource_inventory: list, resource_type_mapping: count = alt_counts.get(rtype_str, 0) - alt_tech.append({ - "id": idx + 1, - "resource_name": resource_name, - "icon_url": icon_url, - "count": count - }) + alt_tech.append( + { + "id": idx + 1, + "resource_name": resource_name, + "icon_url": icon_url, + "count": count, + } + ) return alt_tech + def draw_header_footer(report_path: str, canvas, doc) -> None: # Save the state of the canvas to not affect the drawing canvas.saveState() @@ -158,26 +189,44 @@ def draw_header_footer(report_path: str, canvas, doc) -> None: left_text_content2 = f"Date: {current_date}" # Define the header content with Paragraphs - header_style = ParagraphStyle('HeaderStyle', fontSize=10, textColor=HexColor("#9cafae")) + header_style = ParagraphStyle( + "HeaderStyle", fontSize=10, textColor=HexColor("#9cafae") + ) header_data = [ [Paragraph(left_text_content1, header_style), "", ""], - [Paragraph(left_text_content2, header_style), "", ""] + [Paragraph(left_text_content2, header_style), "", ""], ] # Create the header table - table = Table(header_data, colWidths=[width - 188 - doc.rightMargin - doc.leftMargin, 10, 150]) + table = Table( + header_data, colWidths=[width - 188 - doc.rightMargin - doc.leftMargin, 10, 150] + ) # Define the style for the table - table.setStyle(TableStyle([ - ('SPAN', (1, 0), (1, 1)), # Merge Column 2 in both rows - ('SPAN', (2, 0), (2, 1)), # Merge Column 3 in both rows - ('ALIGN', (0, 0), (0, 0), 'LEFT'), # Align left_text_content1 to the left - ('ALIGN', (0, 1), (0, 1), 'LEFT'), # Align left_text_content2 to the left - ('ALIGN', (2, 0), (2, 1), 'RIGHT'), # Align logo to the right - ('VALIGN', (0, 0), (0, 1), 'TOP'), # Vertically align to the top - ('VALIGN', (2, 0), (2, 1), 'MIDDLE'), # Vertically align to the middle - #('GRID', (0, 0), (-1, -1), 0.5, colors.red), # Temporary borders for visualization - ])) + table.setStyle( + TableStyle( + [ + ("SPAN", (1, 0), (1, 1)), # Merge Column 2 in both rows + ("SPAN", (2, 0), (2, 1)), # Merge Column 3 in both rows + ( + "ALIGN", + (0, 0), + (0, 0), + "LEFT", + ), # Align left_text_content1 to the left + ( + "ALIGN", + (0, 1), + (0, 1), + "LEFT", + ), # Align left_text_content2 to the left + ("ALIGN", (2, 0), (2, 1), "RIGHT"), # Align logo to the right + ("VALIGN", (0, 0), (0, 1), "TOP"), # Vertically align to the top + ("VALIGN", (2, 0), (2, 1), "MIDDLE"), # Vertically align to the middle + # ('GRID', (0, 0), (-1, -1), 0.5, colors.red), # Temporary borders for visualization + ] + ) + ) # Build the header table and draw it on the canvas table.wrapOn(canvas, doc.leftMargin, height - doc.topMargin) table.drawOn(canvas, doc.leftMargin, height - doc.topMargin) @@ -191,40 +240,49 @@ def draw_header_footer(report_path: str, canvas, doc) -> None: logo.drawOn(canvas, width - 150 - doc.rightMargin, logo_y) # Line below the header - canvas.setStrokeColor(HexColor('#115e59')) + canvas.setStrokeColor(HexColor("#115e59")) canvas.setLineWidth(1) line_y = height - doc.topMargin - 10 canvas.line(doc.leftMargin, line_y, width - doc.rightMargin, line_y) # Footer footer_padding = 15 # Add padding under the page number - canvas.setStrokeColor(HexColor('#115e59')) + canvas.setStrokeColor(HexColor("#115e59")) canvas.line(40, 60 + footer_padding, A4[0] - 40, 60 + footer_padding) - canvas.setFont('Helvetica', 8) + canvas.setFont("Helvetica", 8) canvas.drawString(A4[0] / 2 - 30, 60 + footer_padding - 15, f"Page {doc.page}") - canvas.setFont('Helvetica-Oblique', 8) - canvas.setFillColor(HexColor('#9cafae')) - canvas.drawCentredString(A4[0] / 2, 40, "EscapeCloud Community Edition - This report is provided 'As Is,' without any warranty of any kind.") - canvas.drawCentredString(A4[0] / 2, 30, "EscapeCloud makes no warranty that the information contained in this report is complete or error-free. Copyright 2024-2025") + canvas.setFont("Helvetica-Oblique", 8) + canvas.setFillColor(HexColor("#9cafae")) + canvas.drawCentredString( + A4[0] / 2, + 40, + "EscapeCloud Community Edition - This report is provided 'As Is,' without any warranty of any kind.", + ) + canvas.drawCentredString( + A4[0] / 2, + 30, + "EscapeCloud makes no warranty that the information contained in this report is complete or error-free. Copyright 2024-2025", + ) # Restore the state of the canvas canvas.restoreState() + def draw_risk_chart(risk_chart_data: Dict[str, int]) -> Drawing: # Define colors for each severity and their border colors severity_colors = { - 'high': HexColor('#991b1b'), - 'medium': HexColor('#ffae1f'), - 'low': HexColor('#539bff') + "high": HexColor("#991b1b"), + "medium": HexColor("#ffae1f"), + "low": HexColor("#539bff"), } # Border colors border_colors = { - 'high': HexColor('#991b1b'), - 'medium': HexColor('#ffae1f'), - 'low': HexColor('#539bff') + "high": HexColor("#991b1b"), + "medium": HexColor("#ffae1f"), + "low": HexColor("#539bff"), } # Create a drawing for the Doughnut chart @@ -254,14 +312,17 @@ def draw_risk_chart(risk_chart_data: Dict[str, int]) -> Drawing: legend.y = 130 legend.dxTextSpace = 10 legend.columnMaximum = 6 - legend.alignment = 'right' + legend.alignment = "right" legend.subCols[0].minWidth = 60 legend.subCols[1].minWidth = 30 - legend.colorNamePairs = [(severity_colors[severity], (severity, str(risk_chart_data[severity]))) for severity in risk_chart_data.keys()] + legend.colorNamePairs = [ + (severity_colors[severity], (severity, str(risk_chart_data[severity]))) + for severity in risk_chart_data.keys() + ] # Configure sub-columns for the legend - legend.subCols[0].align = 'left' - legend.subCols[1].align = 'right' + legend.subCols[0].align = "left" + legend.subCols[1].align = "right" # Add the Legend to the drawing d.add(legend) @@ -271,11 +332,13 @@ def draw_risk_chart(risk_chart_data: Dict[str, int]) -> Drawing: legend_header.x = 280 legend_header.y = 150 legend_header.dxTextSpace = 10 - legend_header.colorNamePairs = [(HexColor('#FFFFFF'), ('Severity', 'No.'))] # Corrected line - legend_header.alignment = 'right' - legend_header.subCols[0].align = 'left' + legend_header.colorNamePairs = [ + (HexColor("#FFFFFF"), ("Severity", "No.")) + ] # Corrected line + legend_header.alignment = "right" + legend_header.subCols[0].align = "left" legend_header.subCols[0].minWidth = 60 - legend_header.subCols[1].align = 'right' + legend_header.subCols[1].align = "right" legend_header.subCols[1].minWidth = 30 # Add the Legend Header to the drawing @@ -283,29 +346,32 @@ def draw_risk_chart(risk_chart_data: Dict[str, int]) -> Drawing: return d + def draw_cost_chart(months: List[str], costs: List[float]) -> Drawing: # Create a drawing for the bar chart - d = Drawing(7.5*cm, 5*cm) + d = Drawing(7.5 * cm, 5 * cm) # Create a Vertical Bar Chart bar_chart = VerticalBarChart() bar_chart.x = 20 bar_chart.y = 20 - bar_chart.width = 6.5*cm - bar_chart.height = 4*cm + bar_chart.width = 6.5 * cm + bar_chart.height = 4 * cm bar_chart.data = [costs] - bar_chart.barWidth = 0.8*cm + bar_chart.barWidth = 0.8 * cm # Style the bars - bar_chart.bars[0].fillColor = HexColor('#055160') - bar_chart.bars[0].strokeColor = HexColor('#055160') + bar_chart.bars[0].fillColor = HexColor("#055160") + bar_chart.bars[0].strokeColor = HexColor("#055160") # Set the categories (months) bar_chart.categoryAxis.categoryNames = months # Calculate valueMax max_cost = max(costs) if costs else 0 - bar_chart.valueAxis.valueMax = math.ceil(max_cost / 10.0) * 10 if max_cost > 0 else 10 + bar_chart.valueAxis.valueMax = ( + math.ceil(max_cost / 10.0) * 10 if max_cost > 0 else 10 + ) bar_chart.valueAxis.valueMin = 0 # Add the bar chart to the drawing @@ -313,43 +379,44 @@ def draw_cost_chart(months: List[str], costs: List[float]) -> Drawing: return d -def draw_exitscore_chart(exit_score: int, output_path: str, width: int = 750, height: int = 500) -> str: + +def draw_exitscore_chart( + exit_score: int, output_path: str, width: int = 750, height: int = 500 +) -> str: # Create the gauge chart - fig = go.Figure(go.Indicator( - mode="gauge+number", - value=exit_score, - domain={'x': [0, 1], 'y': [0, 1]}, - gauge={ - 'axis': {'range': [0, 100], 'tickwidth': 0.2, 'tickcolor': "darkgray"}, - 'bar': { - 'color': '#f3f6f6', - 'thickness': 0.2 + fig = go.Figure( + go.Indicator( + mode="gauge+number", + value=exit_score, + domain={"x": [0, 1], "y": [0, 1]}, + gauge={ + "axis": {"range": [0, 100], "tickwidth": 0.2, "tickcolor": "darkgray"}, + "bar": {"color": "#f3f6f6", "thickness": 0.2}, + "steps": [ + {"range": [0, 20], "color": "#ba1c1d"}, + {"range": [20, 40], "color": "#ff9533"}, + {"range": [40, 60], "color": "#f1ca00"}, + {"range": [60, 80], "color": "#76c31d"}, + {"range": [80, 100], "color": "#065f43"}, + ], }, - 'steps': [ - {'range': [0, 20], 'color': "#ba1c1d"}, - {'range': [20, 40], 'color': "#ff9533"}, - {'range': [40, 60], 'color': "#f1ca00"}, - {'range': [60, 80], 'color': "#76c31d"}, - {'range': [80, 100], 'color': "#065f43"} - ] - } - )) + ) + ) image_file = os.path.join(output_path, "exit_score_chart.png") fig.write_image(image_file, width=width, height=height) return image_file -def draw_vendor_lockin_radar_chart(human: int, technology: int, operational: int) -> Drawing: + +def draw_vendor_lockin_radar_chart( + human: int, technology: int, operational: int +) -> Drawing: # Create a drawing for the radar chart d = Drawing(350, 250) # Define the labels and data - labels = [ - 'Human', - 'Technology', - 'Operational' - ] + labels = ["Human", "Technology", "Operational"] data = [human, technology, operational] # Define your hex color with alpha @@ -389,8 +456,12 @@ def draw_vendor_lockin_radar_chart(human: int, technology: int, operational: int first_x = None first_y = None for i in range(num_facets): - x = cx + (radius * level / max_value) * cos(radians(start_angle + i * angle)) - y = cy + (radius * level / max_value) * sin(radians(start_angle + i * angle)) + x = cx + (radius * level / max_value) * cos( + radians(start_angle + i * angle) + ) + y = cy + (radius * level / max_value) * sin( + radians(start_angle + i * angle) + ) # Store the first x and y coordinates to close the triangle later if i == 0: @@ -422,31 +493,40 @@ def draw_vendor_lockin_radar_chart(human: int, technology: int, operational: int d.add(Line(cx, cy, x, y, strokeColor=colors.grey)) # Adjust label position and anchor based on quadrant - padding = 20 # Adjusted padding - anchor = 'middle' # Default text anchor + anchor = "middle" # Default text anchor label_text = labels[i] # Adjust padding and anchor for different quadrants if necessary if i * angle > 90 and i * angle < 270: - padding = -20 # Adjusted padding for different quadrants - anchor = 'end' if i * angle < 180 else 'start' # Adjust text anchor + anchor = "end" if i * angle < 180 else "start" # Adjust text anchor - label_padding = 10 # Additional padding to move the label slightly outward from max_value + label_padding = ( + 10 # Additional padding to move the label slightly outward from max_value + ) label_x = cx + (radius + label_padding) * cos(radians(start_angle + i * angle)) label_y = cy + (radius + label_padding) * sin(radians(start_angle + i * angle)) # Adjustments based on label - if label_text == 'Technology': + if label_text == "Technology": label_x += 25 label_y -= 10 - elif label_text == 'Operational': + elif label_text == "Operational": label_x -= 50 label_y -= 10 - elif label_text == 'Human': + elif label_text == "Human": label_y += 5 label_x += 10 - d.add(String(label_x, label_y, label_text, fontSize=10, fillColor=colors.black, textAnchor=anchor)) + d.add( + String( + label_x, + label_y, + label_text, + fontSize=10, + fillColor=colors.black, + textAnchor=anchor, + ) + ) return d diff --git a/core/utils_sync.py b/core/utils_sync.py index d75100d..500c24d 100644 --- a/core/utils_sync.py +++ b/core/utils_sync.py @@ -17,16 +17,28 @@ _ASSESS_PATH = "/api/v1/assessments/" + def _assess_url(host: str) -> str: host = host.strip().rstrip("/") if not host.startswith("http"): host = f"https://{host}" return f"{host}{_ASSESS_PATH}" -def _build_payload(*,report_path: str, name: str, started_at: int, exit_strategy: int, cloud_service_provider: int, assessment_type: int) -> Dict[str, Any]: + +def _build_payload( + *, + report_path: str, + name: str, + started_at: int, + exit_strategy: int, + cloud_service_provider: int, + assessment_type: int, +) -> Dict[str, Any]: db_path = os.path.join(report_path, "data", "assessment.db") - resource_rows: List[Dict[str, Any]] = load_data("resource_inventory", db_path=db_path) + resource_rows: List[Dict[str, Any]] = load_data( + "resource_inventory", db_path=db_path + ) cost_rows: List[Dict[str, Any]] = load_data("cost_inventory", db_path=db_path) res_payload = [ @@ -71,7 +83,16 @@ def _build_payload(*,report_path: str, name: str, started_at: int, exit_strategy logger.debug("Outgoing payload:\n%s", json.dumps(payload, indent=2)) return payload -def post_assessment(*, name: str, started_at: int, report_path: str, meta: Dict[str, int], token: str, timeout: int = 10) -> Dict[str, Any]: + +def post_assessment( + *, + name: str, + started_at: int, + report_path: str, + meta: Dict[str, int], + token: str, + timeout: int = 10, +) -> Dict[str, Any]: host = getattr(config, "HOST", "").strip() if not host: return {"success": False, "payload": None, "logs": "HOST missing in config.py"} diff --git a/main.py b/main.py index 55fdc58..ed871f0 100644 --- a/main.py +++ b/main.py @@ -20,15 +20,30 @@ sync_assessment, generate_report, ) -from utils.azure import select_subscription, select_resource_group, is_azure_cli_installed, is_azure_cli_logged_in, is_azure_cli_token_expired +from utils.azure import ( + select_subscription, + select_resource_group, + is_azure_cli_installed, + is_azure_cli_logged_in, + is_azure_cli_token_expired, +) from utils.aws import is_aws_cli_installed, is_aws_profile_valid from utils.connection import resolve_mode from utils.data import initialize_dataset -from utils.utils import ascii_art, create_directory, load_config, prompt_required_inputs, print_help_message, print_step +from utils.utils import ( + ascii_art, + create_directory, + load_config, + prompt_required_inputs, + print_help_message, + print_step, +) from utils.validate import validate_region, validate_config # Configure the root logger to ensure logs propagate from all modules -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) logging.getLogger("botocore").setLevel(logging.WARNING) logging.getLogger("boto3").setLevel(logging.WARNING) logging.getLogger("kaleido").setLevel(logging.WARNING) @@ -41,13 +56,14 @@ # Initialize the console object console = Console() + def handle_aws(args): config = {} cloud_provider = 2 if args.config: - #logger.info(f"AWS --config argument detected with path: {args.config}") + # logger.info(f"AWS --config argument detected with path: {args.config}") config = load_config(args.config) if not config: @@ -59,34 +75,46 @@ def handle_aws(args): config["name"] = args.name.strip() if "name" not in config or not config["name"].strip(): - config["name"] = f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + config["name"] = ( + f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + ) elif args.profile: # Check if aws cli available if not is_aws_cli_installed(): - #logger.error("AWS CLI is not installed.") - console.print("[red]AWS CLI is not installed. Install it from https://aws.amazon.com/cli/[/red]") + # logger.error("AWS CLI is not installed.") + console.print( + "[red]AWS CLI is not installed. Install it from https://aws.amazon.com/cli/[/red]" + ) return # Check if aws cli profile is valid if not is_aws_profile_valid(args.profile): - #logger.error(f"AWS profile '{args.profile}' is not configured.") - console.print(f"[red]AWS profile '{args.profile}' is not configured. Use `aws configure --profile {args.profile}`.[/red]") + # logger.error(f"AWS profile '{args.profile}' is not configured.") + console.print( + f"[red]AWS profile '{args.profile}' is not configured. Use `aws configure --profile {args.profile}`.[/red]" + ) return - #logger.info(f"AWS --profile argument detected with profile: {args.profile}") + # logger.info(f"AWS --profile argument detected with profile: {args.profile}") try: session = boto3.Session(profile_name=args.profile) credentials = session.get_credentials() if credentials is None: - #logger.error(f"AWS profile '{args.profile}' has no valid credentials.") - console.print(f"[red]AWS profile '{args.profile}' has no valid credentials. Use `aws configure --profile {args.profile}`.[/red]") + # logger.error(f"AWS profile '{args.profile}' has no valid credentials.") + console.print( + f"[red]AWS profile '{args.profile}' has no valid credentials. Use `aws configure --profile {args.profile}`.[/red]" + ) return region = session.region_name or "us-east-1" - #logger.info(f"Using AWS profile '{args.profile}' with region '{region}'.") + # logger.info(f"Using AWS profile '{args.profile}' with region '{region}'.") exit_strategy, assessment_type = prompt_required_inputs() config = { - "name": args.name.strip() if args.name else f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}", + "name": ( + args.name.strip() + if args.name + else f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + ), "cloudServiceProvider": cloud_provider, "exitStrategy": exit_strategy, "assessmentType": assessment_type, @@ -97,8 +125,10 @@ def handle_aws(args): }, } except (NoCredentialsError, ProfileNotFound) as e: - #logger.error(f"AWS profile error: {e}", exc_info=True) - console.print(f"[red]AWS profile error: {str(e)}. Use `aws configure` to set up a profile.[/red]") + # logger.error(f"AWS profile error: {e}", exc_info=True) + console.print( + f"[red]AWS profile error: {str(e)}. Use `aws configure` to set up a profile.[/red]" + ) return else: exit_strategy, assessment_type = prompt_required_inputs() @@ -117,7 +147,11 @@ def handle_aws(args): console.print(f"[red]{e} Please enter a valid AWS region.[/red]") config = { - "name": args.name.strip() if args.name else f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}", + "name": ( + args.name.strip() + if args.name + else f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + ), "cloudServiceProvider": cloud_provider, "exitStrategy": exit_strategy, "assessmentType": assessment_type, @@ -134,13 +168,14 @@ def handle_aws(args): # Run the AWS assessment pipeline run_assessment(config, "aws") + def handle_azure(args): config = {} cloud_provider = 1 if args.config: - #logger.info(f"Azure --config argument detected with path: {args.config}") + # logger.info(f"Azure --config argument detected with path: {args.config}") config = load_config(args.config) if not config: @@ -152,27 +187,35 @@ def handle_azure(args): config["name"] = args.name.strip() if "name" not in config or not config["name"].strip(): - config["name"] = f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + config["name"] = ( + f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + ) elif args.cli: - #logger.info("Azure --cli argument detected. Using Azure CLI credentials.") + # logger.info("Azure --cli argument detected. Using Azure CLI credentials.") # Check if az cli available if not is_azure_cli_installed(): - #logger.error("Azure CLI is not installed.") - console.print("[red]Azure CLI is not installed. Install it from https://aka.ms/install-azure-cli.[/red]") + # logger.error("Azure CLI is not installed.") + console.print( + "[red]Azure CLI is not installed. Install it from https://aka.ms/install-azure-cli.[/red]" + ) return # Check if the user is logged in to Azure CLI if not is_azure_cli_logged_in(): - #logger.error("User is not logged in to Azure CLI") - console.print("[red]You are not logged in to Azure CLI. Please run 'az login' and try again.[/red]") + # logger.error("User is not logged in to Azure CLI") + console.print( + "[red]You are not logged in to Azure CLI. Please run 'az login' and try again.[/red]" + ) return # Check if the cli token is expired if is_azure_cli_token_expired(): - #logger.error("Azure CLI token is expired.") + # logger.error("Azure CLI token is expired.") console.print("[red]Your Azure CLI token has expired. Please run:[/red]") - console.print("[bold cyan]az login --scope https://management.azure.com/.default[/bold cyan]") + console.print( + "[bold cyan]az login --scope https://management.azure.com/.default[/bold cyan]" + ) return try: @@ -181,8 +224,12 @@ def handle_azure(args): subscription_client = SubscriptionClient(credential) subscriptions = list(subscription_client.subscriptions.list()) if not subscriptions: - logger.error("No subscriptions found for the provided Azure credentials.") - console.print("[red]No subscriptions found for the provided credentials.[/red]") + logger.error( + "No subscriptions found for the provided Azure credentials." + ) + console.print( + "[red]No subscriptions found for the provided credentials.[/red]" + ) return selected_subscription = select_subscription(subscriptions) @@ -192,13 +239,19 @@ def handle_azure(args): resource_groups = list(resource_client.resource_groups.list()) if not resource_groups: logger.error("No resource groups found in the selected subscription.") - console.print("[red]No resource groups found in the selected subscription.[/red]") + console.print( + "[red]No resource groups found in the selected subscription.[/red]" + ) return resource_group_name = select_resource_group(resource_groups) exit_strategy, assessment_type = prompt_required_inputs() config = { - "name": args.name.strip() if args.name else f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}", + "name": ( + args.name.strip() + if args.name + else f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + ), "cloudServiceProvider": cloud_provider, "exitStrategy": exit_strategy, "assessmentType": assessment_type, @@ -222,16 +275,16 @@ def handle_azure(args): try: # Authenticate using the provided credentials credential = ClientSecretCredential( - tenant_id=tenant_id, - client_id=client_id, - client_secret=client_secret + tenant_id=tenant_id, client_id=client_id, client_secret=client_secret ) subscription_client = SubscriptionClient(credential) # Fetch and prompt the user to select a subscription subscriptions = list(subscription_client.subscriptions.list()) if not subscriptions: - console.print("[red]No subscriptions found. Please check your credentials.[/red]") + console.print( + "[red]No subscriptions found. Please check your credentials.[/red]" + ) return selected_subscription = select_subscription(subscriptions) @@ -241,14 +294,20 @@ def handle_azure(args): resource_client = ResourceManagementClient(credential, subscription_id) resource_groups = list(resource_client.resource_groups.list()) if not resource_groups: - console.print("[red]No resource groups found in the selected subscription.[/red]") + console.print( + "[red]No resource groups found in the selected subscription.[/red]" + ) return resource_group_name = select_resource_group(resource_groups) # Build the configuration config = { - "name": args.name.strip() if args.name else f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}", + "name": ( + args.name.strip() + if args.name + else f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + ), "cloudServiceProvider": cloud_provider, "exitStrategy": exit_strategy, "assessmentType": assessment_type, @@ -266,9 +325,10 @@ def handle_azure(args): return # Run the Azure assessment pipeline - #logger.info("Starting Azure assessment pipeline.") + # logger.info("Starting Azure assessment pipeline.") run_assessment(config, "azure") + def run_assessment(config, provider_name): # Record the assessment start time to propagate across stages started_at = int(time.time()) @@ -292,7 +352,9 @@ def run_assessment(config, provider_name): print_step("ExitCloud integration not configured.", status="warning") # Overwrite assessment type to basic if config["assessmentType"] != 1: - print_step("Forcing Basic Assessment due to offline mode.", status="warning") + print_step( + "Forcing Basic Assessment due to offline mode.", status="warning" + ) config["assessmentType"] = 1 # Create directories @@ -305,16 +367,18 @@ def run_assessment(config, provider_name): # Handle the result provider_name = ( - "Microsoft Azure" if config["cloudServiceProvider"] == 1 - else "AWS" if config["cloudServiceProvider"] == 2 - else "Unknown" + "Microsoft Azure" + if config["cloudServiceProvider"] == 1 + else "AWS" if config["cloudServiceProvider"] == 2 else "Unknown" ) # Stage 1: Verify Credentials console.print("-------------------------------------------") console.print("Stage #1 - Validate Credentials", style="bold") # Test Connection - connection_success, logs = verify_credentials(config["cloudServiceProvider"], config["providerDetails"]) + connection_success, logs = verify_credentials( + config["cloudServiceProvider"], config["providerDetails"] + ) if connection_success: print_step(f"Connecting to {provider_name}...", status="ok") else: @@ -328,25 +392,37 @@ def run_assessment(config, provider_name): console.print("Stage #2 - Validate Permissions", style="bold") # Labels for permission types - permission_reader_label = "Reader" if config["cloudServiceProvider"] == 1 else "ViewOnlyAccess" - permission_cost_label = "Cost Management Reader" if config["cloudServiceProvider"] == 1 else "AWSBillingReadOnlyAccess" + permission_reader_label = ( + "Reader" if config["cloudServiceProvider"] == 1 else "ViewOnlyAccess" + ) + permission_cost_label = ( + "Cost Management Reader" + if config["cloudServiceProvider"] == 1 + else "AWSBillingReadOnlyAccess" + ) # Test permissions with spinners with console.status("Validating permissions...", spinner="dots"): - permission_valid, permission_reader, permission_cost, logs = test_permissions( - config["cloudServiceProvider"], config["providerDetails"] + permission_valid, permission_reader, permission_cost, logs = ( + test_permissions( + config["cloudServiceProvider"], config["providerDetails"] + ) ) # Output results for permission checks if permission_reader: print_step(f"Checking {permission_reader_label}...", status="ok") else: - print_step(f"Checking {permission_reader_label}...", status="error", logs=logs) + print_step( + f"Checking {permission_reader_label}...", status="error", logs=logs + ) if permission_cost: print_step(f"Checking {permission_cost_label}...", status="ok") else: - print_step(f"Checking {permission_cost_label}...", status="error", logs=logs) + print_step( + f"Checking {permission_cost_label}...", status="error", logs=logs + ) # Exit if permissions are invalid if not permission_valid: @@ -359,7 +435,9 @@ def run_assessment(config, provider_name): console.print("Stage #3 - Build Resource Inventory", style="bold") # Use a spinner to indicate progress - with console.status(f"Building resource inventory for {provider_name}...", spinner="dots"): + with console.status( + f"Building resource inventory for {provider_name}...", spinner="dots" + ): result = create_resource_inventory( config["cloudServiceProvider"], config["providerDetails"], @@ -368,9 +446,15 @@ def run_assessment(config, provider_name): ) if result["success"]: - print_step(f"Building resource inventory for {provider_name}...", status="ok") + print_step( + f"Building resource inventory for {provider_name}...", status="ok" + ) else: - print_step(f"Building resource inventory for {provider_name}...", status="error", logs=result["logs"]) + print_step( + f"Building resource inventory for {provider_name}...", + status="error", + logs=result["logs"], + ) return console.print("-------------------------------------------") @@ -379,7 +463,9 @@ def run_assessment(config, provider_name): console.print("Stage #4 - Build Cost Inventory", style="bold") # Use a spinner to indicate progress - with console.status(f"Building cost inventory for {provider_name}...", spinner="dots"): + with console.status( + f"Building cost inventory for {provider_name}...", spinner="dots" + ): cost_result = create_cost_inventory( config["cloudServiceProvider"], config["providerDetails"], @@ -391,12 +477,19 @@ def run_assessment(config, provider_name): if cost_result["success"]: print_step(f"Building cost inventory for {provider_name}...", status="ok") else: - print_step(f"Building cost inventory for {provider_name}...", status="error", logs=cost_result["logs"]) + print_step( + f"Building cost inventory for {provider_name}...", + status="error", + logs=cost_result["logs"], + ) return console.print("-------------------------------------------") - name = config.get("name") or f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + name = ( + config.get("name") + or f"Exit Assessment {datetime.now().strftime('%Y%m%d_%H%M%S')}" + ) # Stage 5 – Online / Offline Risk Assessment if mode == "online": @@ -408,8 +501,8 @@ def run_assessment(config, provider_name): report_path=report_path, metadata={ "cloud_service_provider": config["cloudServiceProvider"], - "exit_strategy": config["exitStrategy"], - "assessment_type": config["assessmentType"], + "exit_strategy": config["exitStrategy"], + "assessment_type": config["assessmentType"], }, mode=mode, token=jwt, @@ -429,7 +522,9 @@ def run_assessment(config, provider_name): ) status = "ok" if risk_result["success"] else "error" - print_step("Performing risk assessment...", status=status, logs=risk_result["logs"]) + print_step( + "Performing risk assessment...", status=status, logs=risk_result["logs"] + ) console.print("-------------------------------------------") @@ -445,14 +540,16 @@ def run_assessment(config, provider_name): config["assessmentType"], name, report_path, - raw_data_path + raw_data_path, ) # Handle the result if report_status["success"]: print_step("Generating report...", status="ok") else: - print_step("Generating report...", status="error", logs=report_status["logs"]) + print_step( + "Generating report...", status="error", logs=report_status["logs"] + ) return # Output the report path after the separator @@ -472,6 +569,7 @@ def run_assessment(config, provider_name): except Exception as e: console.print(f"[red]Unexpected error: {e}[/red]") + def parse_arguments(): parser = argparse.ArgumentParser( description="EscapeCloud - Community Edition", @@ -485,30 +583,47 @@ def parse_arguments(): " python3 main.py azure --config config.json # Use a configuration file for Azure\n" " python3 main.py azure --cli # Use Azure CLI credentials\n" " python3 main.py azure --name 'DMS System' # Use a pre-defined assessment name\n" - ), formatter_class=argparse.RawDescriptionHelpFormatter, ) - subparsers = parser.add_subparsers(dest="cloud_provider", help="Specify the cloud provider (aws or azure).") + subparsers = parser.add_subparsers( + dest="cloud_provider", help="Specify the cloud provider (aws or azure)." + ) # Subparser for AWS aws_parser = subparsers.add_parser("aws", help="Perform an AWS assessment.") aws_group = aws_parser.add_mutually_exclusive_group(required=False) - aws_group.add_argument("--config", type=str, help="Path to the configuration file (JSON format).") - aws_group.add_argument("--profile", type=str, help="AWS profile name to use credentials from ~/.aws/credentials.") - aws_parser.add_argument("--name", type=str, help="Assessment Name (Optional / Max. 50 characters).") - + aws_group.add_argument( + "--config", type=str, help="Path to the configuration file (JSON format)." + ) + aws_group.add_argument( + "--profile", + type=str, + help="AWS profile name to use credentials from ~/.aws/credentials.", + ) + aws_parser.add_argument( + "--name", type=str, help="Assessment Name (Optional / Max. 50 characters)." + ) # Subparser for Azure azure_parser = subparsers.add_parser("azure", help="Perform an Azure assessment.") azure_group = azure_parser.add_mutually_exclusive_group(required=False) - azure_group.add_argument("--config", type=str, help="Path to the configuration file (JSON format).") - azure_group.add_argument("--cli", action="store_true", help="Use Azure CLI credentials for authentication.") - azure_parser.add_argument("--name", type=str, help="Assessment Name (Optional / Max. 50 characters).") + azure_group.add_argument( + "--config", type=str, help="Path to the configuration file (JSON format)." + ) + azure_group.add_argument( + "--cli", + action="store_true", + help="Use Azure CLI credentials for authentication.", + ) + azure_parser.add_argument( + "--name", type=str, help="Assessment Name (Optional / Max. 50 characters)." + ) return parser.parse_args() + def main(): # Print ASCII art console.print(ascii_art, style="bold cyan") @@ -529,17 +644,24 @@ def main(): elif args.cloud_provider == "azure": handle_azure(args) else: - console.print("[red]Invalid command. Use 'aws' or 'azure' as the first argument.[/red]") - console.print("[green]Run 'python3 main.py --help' for usage instructions.[/green]") + console.print( + "[red]Invalid command. Use 'aws' or 'azure' as the first argument.[/red]" + ) + console.print( + "[green]Run 'python3 main.py --help' for usage instructions.[/green]" + ) + if __name__ == "__main__": try: main() except KeyboardInterrupt: - console.print("\n[bold yellow]Operation cancelled by user (Ctrl+C). Exiting gracefully.[/bold yellow]") - #logger.warning("Process interrupted by user via KeyboardInterrupt.") + console.print( + "\n[bold yellow]Operation cancelled by user (Ctrl+C). Exiting gracefully.[/bold yellow]" + ) + # logger.warning("Process interrupted by user via KeyboardInterrupt.") sys.exit(0) except Exception as e: - #logger.error(f"An unexpected error occurred: {e}", exc_info=True) + # logger.error(f"An unexpected error occurred: {e}", exc_info=True) console.print(f"[red]Unexpected error: {e}[/red]") sys.exit(1) diff --git a/tests/test_utils_and_main.py b/tests/test_utils_and_main.py index d07bb59..bdd5309 100644 --- a/tests/test_utils_and_main.py +++ b/tests/test_utils_and_main.py @@ -49,7 +49,9 @@ def test_invalid_config_stops_before_pipeline_side_effects(self): } with ( - patch("main.validate_config", side_effect=ValueError("bad config")) as mock_validate, + patch( + "main.validate_config", side_effect=ValueError("bad config") + ) as mock_validate, patch("main.resolve_mode") as mock_resolve_mode, patch("main.create_directory") as mock_create_directory, patch("main.verify_credentials") as mock_verify_credentials, diff --git a/tests/test_validate.py b/tests/test_validate.py index a88a962..eb6cde2 100644 --- a/tests/test_validate.py +++ b/tests/test_validate.py @@ -65,7 +65,9 @@ def test_rejects_azure_config_without_client_credentials(self): del config["providerDetails"]["clientId"] del config["providerDetails"]["clientSecret"] - with self.assertRaisesRegex(ValueError, "Missing required fields in providerDetails"): + with self.assertRaisesRegex( + ValueError, "Missing required fields in providerDetails" + ): validate_config(config) def test_rejects_invalid_assessment_type(self): @@ -86,7 +88,9 @@ def test_rejects_invalid_name_characters(self): config = build_aws_config() config["name"] = "Bad/Name" - with self.assertRaisesRegex(ValueError, "Assessment name contains invalid characters"): + with self.assertRaisesRegex( + ValueError, "Assessment name contains invalid characters" + ): validate_config(config) def test_rejects_too_long_name(self): diff --git a/utils/aws.py b/utils/aws.py index 7e62122..1e9fc10 100644 --- a/utils/aws.py +++ b/utils/aws.py @@ -5,14 +5,19 @@ logger = logging.getLogger("main.utils.aws") + def is_aws_cli_installed() -> bool: return shutil.which("aws") is not None def is_aws_profile_valid(profile: str) -> bool: try: - subprocess.run(["aws", "configure", "list", "--profile", profile], - check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + ["aws", "configure", "list", "--profile", profile], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) return True except subprocess.CalledProcessError: return False diff --git a/utils/azure.py b/utils/azure.py index a7bee65..d179394 100644 --- a/utils/azure.py +++ b/utils/azure.py @@ -10,17 +10,25 @@ logger = logging.getLogger("main.utils.azure") console = Console() + def is_azure_cli_installed() -> bool: return shutil.which("az") is not None + def is_azure_cli_logged_in() -> bool: try: # Run the 'az account show' command to check if the user is logged in - subprocess.run(["az", "account", "show"], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + subprocess.run( + ["az", "account", "show"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) return True except subprocess.CalledProcessError: return False + def is_azure_cli_token_expired() -> bool: credential = AzureCliCredential() try: @@ -31,8 +39,9 @@ def is_azure_cli_token_expired() -> bool: return True # Token expired return False + def select_subscription(subscriptions: List[Any]) -> Any: - #logger.info("Listing available subscriptions for selection.") + # logger.info("Listing available subscriptions for selection.") console.print("Available Subscriptions:") for idx, sub in enumerate(subscriptions, start=1): console.print(f"{idx}. {sub.display_name} ({sub.subscription_id})") @@ -42,14 +51,15 @@ def select_subscription(subscriptions: List[Any]) -> Any: if not (1 <= selection <= len(subscriptions)): raise ValueError("Invalid subscription selection.") selected_subscription = subscriptions[selection - 1] - #logger.info(f"Subscription selected: {selected_subscription.display_name} ({selected_subscription.subscription_id})") + # logger.info(f"Subscription selected: {selected_subscription.display_name} ({selected_subscription.subscription_id})") return selected_subscription except ValueError as e: logger.warning(f"Invalid subscription selection: {e}") console.print(f"[red]{e} Please select a valid number.[/red]") + def select_resource_group(resource_groups: List[Any]) -> str: - #logger.info("Listing available resource groups for selection.") + # logger.info("Listing available resource groups for selection.") console.print("Available Resource Groups:") for idx, rg in enumerate(resource_groups, start=1): console.print(f"{idx}. {rg.name}") @@ -59,7 +69,7 @@ def select_resource_group(resource_groups: List[Any]) -> str: if not (1 <= selection <= len(resource_groups)): raise ValueError("Invalid resource group selection.") selected_resource_group = resource_groups[selection - 1].name - #logger.info(f"Resource Group selected: {selected_resource_group}") + # logger.info(f"Resource Group selected: {selected_resource_group}") return selected_resource_group except ValueError as e: logger.warning(f"Invalid resource group selection: {e}") diff --git a/utils/connection.py b/utils/connection.py index bea1eeb..0b71ef6 100644 --- a/utils/connection.py +++ b/utils/connection.py @@ -14,6 +14,7 @@ _AUTH_PATH = "/api/v1/auth/token/" + def _build_url(host: str) -> str: host = host.strip().rstrip("/") if not host.startswith("http"): @@ -21,7 +22,9 @@ def _build_url(host: str) -> str: return f"{host}{_AUTH_PATH}" -def get_jwt_token(host: str | None = None, key: str | None = None, *, timeout: int = 10) -> Optional[str]: +def get_jwt_token( + host: str | None = None, key: str | None = None, *, timeout: int = 10 +) -> Optional[str]: host = host or getattr(config, "HOST", "") if config else "" key = key or getattr(config, "KEY", "") if config else "" @@ -40,11 +43,18 @@ def get_jwt_token(host: str | None = None, key: str | None = None, *, timeout: i resp.raise_for_status() data = resp.json() - token = data.get("access_token") or data.get("token") or data.get("access") or data.get("jwt") + token = ( + data.get("access_token") + or data.get("token") + or data.get("access") + or data.get("jwt") + ) if token: return token - logger.error("Authentication succeeded but token field missing in response: %s", data) + logger.error( + "Authentication succeeded but token field missing in response: %s", data + ) except requests.RequestException as exc: logger.error("EscapeCloud authentication request failed: %s", exc) except ValueError: @@ -52,6 +62,7 @@ def get_jwt_token(host: str | None = None, key: str | None = None, *, timeout: i return None + def resolve_mode() -> Tuple[str, Optional[str]]: host = getattr(config, "HOST", "") if config else "" key = getattr(config, "KEY", "") if config else "" diff --git a/utils/constants.py b/utils/constants.py index ef67a8d..69a6fb5 100644 --- a/utils/constants.py +++ b/utils/constants.py @@ -1,27 +1,33 @@ # utils/constants.py REGION_CHOICES = [ - ('us-east-1', 'us-east-1 (N. Virginia)'), - ('us-east-2', 'us-east-2 (Ohio)'), - ('us-west-1', 'us-west-1 (N. California)'), - ('us-west-2', 'us-west-2 (Oregon)'), - ('af-south-1', 'af-south-1 (Cape Town)'), - ('ap-east-1', 'ap-east-1 (Hong Kong)'), - ('ap-south-1', 'ap-south-1 (Mumbai)'), - ('ap-northeast-1', 'ap-northeast-1 (Tokyo)'), - ('ap-northeast-2', 'ap-northeast-2 (Seoul)'), - ('ap-northeast-3', 'ap-northeast-3 (Osaka)'), - ('ap-southeast-1', 'ap-southeast-1 (Singapore)'), - ('ap-southeast-2', 'ap-southeast-2 (Sydney)'), - ('ca-central-1', 'ca-central-1 (Central)'), - ('eu-central-1', 'eu-central-1 (Frankfurt)'), - ('eu-west-1', 'eu-west-1 (Ireland)'), - ('eu-west-2', 'eu-west-2 (London)'), - ('eu-west-3', 'eu-west-3 (Paris)'), - ('eu-south-1', 'eu-south-1 (Milan)'), - ('eu-north-1', 'eu-north-1 (Stockholm)'), - ('me-south-1', 'me-south-1 (Bahrain)'), - ('sa-east-1', 'sa-east-1 (São Paulo)'), + ("us-east-1", "us-east-1 (N. Virginia)"), + ("us-east-2", "us-east-2 (Ohio)"), + ("us-west-1", "us-west-1 (N. California)"), + ("us-west-2", "us-west-2 (Oregon)"), + ("af-south-1", "af-south-1 (Cape Town)"), + ("ap-east-1", "ap-east-1 (Hong Kong)"), + ("ap-south-1", "ap-south-1 (Mumbai)"), + ("ap-northeast-1", "ap-northeast-1 (Tokyo)"), + ("ap-northeast-2", "ap-northeast-2 (Seoul)"), + ("ap-northeast-3", "ap-northeast-3 (Osaka)"), + ("ap-southeast-1", "ap-southeast-1 (Singapore)"), + ("ap-southeast-2", "ap-southeast-2 (Sydney)"), + ("ca-central-1", "ca-central-1 (Central)"), + ("eu-central-1", "eu-central-1 (Frankfurt)"), + ("eu-west-1", "eu-west-1 (Ireland)"), + ("eu-west-2", "eu-west-2 (London)"), + ("eu-west-3", "eu-west-3 (Paris)"), + ("eu-south-1", "eu-south-1 (Milan)"), + ("eu-north-1", "eu-north-1 (Stockholm)"), + ("me-south-1", "me-south-1 (Bahrain)"), + ("sa-east-1", "sa-east-1 (São Paulo)"), ] -REQUIRED_FIELDS_AZURE = ["clientId", "clientSecret", "tenantId", "subscriptionId", "resourceGroupName"] +REQUIRED_FIELDS_AZURE = [ + "clientId", + "clientSecret", + "tenantId", + "subscriptionId", + "resourceGroupName", +] REQUIRED_FIELDS_AWS = ["accessKey", "secretKey", "region"] diff --git a/utils/data.py b/utils/data.py index a330d57..238a914 100644 --- a/utils/data.py +++ b/utils/data.py @@ -14,6 +14,7 @@ DATASET_FOLDER = Path("datasets") REMOTE_STORAGE_URL = "https://cloudexit-oss-data-eu.fsn1.your-objectstorage.com" + def get_monday_date() -> str: now = datetime.utcnow() monday = now - timedelta(days=now.weekday()) @@ -24,6 +25,7 @@ def get_monday_date() -> str: else: return monday.strftime("cloudexit-%Y-%m-%d.db.gz") + def compute_file_hash(filepath: str) -> str: hash_sha256 = hashlib.sha256() with open(filepath, "rb") as f: @@ -31,6 +33,7 @@ def compute_file_hash(filepath: str) -> str: hash_sha256.update(chunk) return hash_sha256.hexdigest() + def download_file(url: str, destination: str, retries: int = 3, delay: int = 5) -> bool: for attempt in range(retries): try: @@ -44,9 +47,13 @@ def download_file(url: str, destination: str, retries: int = 3, delay: int = 5) return True except ConnectionError: - print(f"[ERROR] Connection failed while downloading {url}. Retrying ({attempt + 1}/{retries})...") + print( + f"[ERROR] Connection failed while downloading {url}. Retrying ({attempt + 1}/{retries})..." + ) except Timeout: - print(f"[ERROR] Request timed out while downloading {url}. Retrying ({attempt + 1}/{retries})...") + print( + f"[ERROR] Request timed out while downloading {url}. Retrying ({attempt + 1}/{retries})..." + ) except RequestException as e: print(f"[ERROR] Failed to download {url}: {e}") break @@ -56,7 +63,10 @@ def download_file(url: str, destination: str, retries: int = 3, delay: int = 5) print(f"[ERROR] Unable to download file after {retries} attempts: {url}") return False -def fetch_remote_checksum(checksum_url: str, retries: int = 3, delay: int = 5) -> Optional[str]: + +def fetch_remote_checksum( + checksum_url: str, retries: int = 3, delay: int = 5 +) -> Optional[str]: for attempt in range(retries): try: response = requests.get(checksum_url, timeout=10) @@ -64,9 +74,13 @@ def fetch_remote_checksum(checksum_url: str, retries: int = 3, delay: int = 5) - return response.text.strip().split()[0] except ConnectionError: - print(f"[ERROR] Connection failed when fetching {checksum_url}. Retrying ({attempt + 1}/{retries})...") + print( + f"[ERROR] Connection failed when fetching {checksum_url}. Retrying ({attempt + 1}/{retries})..." + ) except Timeout: - print(f"[ERROR] Request timed out when fetching {checksum_url}. Retrying ({attempt + 1}/{retries})...") + print( + f"[ERROR] Request timed out when fetching {checksum_url}. Retrying ({attempt + 1}/{retries})..." + ) except RequestException as e: print(f"[ERROR] Failed to fetch {checksum_url}: {e}") break @@ -76,6 +90,7 @@ def fetch_remote_checksum(checksum_url: str, retries: int = 3, delay: int = 5) - print(f"[ERROR] Unable to fetch remote checksum after {retries} attempts.") return None + def initialize_dataset() -> None: DATASET_FOLDER.mkdir(exist_ok=True) @@ -109,7 +124,9 @@ def initialize_dataset() -> None: print("[INFO] Local dataset is up-to-date. No download needed.") return else: - print("[INFO] Local dataset is outdated. Removing old files and downloading new dataset...") + print( + "[INFO] Local dataset is outdated. Removing old files and downloading new dataset..." + ) # Remove all old compressed and extracted files for file in DATASET_FOLDER.glob("cloudexit-*.db.gz"): @@ -119,9 +136,13 @@ def initialize_dataset() -> None: # Download and extract dataset if download_file(latest_file_url, local_compressed_path): - print(f"[INFO] Download successful. Extracting dataset from {latest_file}...") + print( + f"[INFO] Download successful. Extracting dataset from {latest_file}..." + ) - with gzip.open(local_compressed_path, "rb") as f_in, open(local_db_path, "wb") as f_out: + with gzip.open(local_compressed_path, "rb") as f_in, open( + local_db_path, "wb" + ) as f_out: shutil.copyfileobj(f_in, f_out) print("[INFO] Dataset updated successfully.") diff --git a/utils/sync.py b/utils/sync.py index d8ab7d1..ec19a61 100644 --- a/utils/sync.py +++ b/utils/sync.py @@ -19,7 +19,13 @@ def _build_url(host: str) -> str: return f"{host}{_ASSESS_PATH}" -def submit_assessment(payload: Dict[str, Any], *, host: str | None = None, key: str | None = None, timeout: int = 10) -> Optional[requests.Response]: +def submit_assessment( + payload: Dict[str, Any], + *, + host: str | None = None, + key: str | None = None, + timeout: int = 10, +) -> Optional[requests.Response]: host = host or getattr(config, "HOST", "") if config else "" if not host: logger.warning("HOST not configured – skipping assessment sync.") diff --git a/utils/utils.py b/utils/utils.py index b9da302..70ea0e9 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -11,18 +11,20 @@ logger = logging.getLogger("main.utils") console = Console() + def load_config(file_path: str) -> Optional[Dict[str, Any]]: try: - #logger.info(f"Attempting to load config file from {file_path}") + # logger.info(f"Attempting to load config file from {file_path}") with open(file_path, "r") as f: config = json.load(f) - #logger.info("Config file loaded successfully.") + # logger.info("Config file loaded successfully.") return config except Exception as e: logger.error(f"Error loading config file: {e}", exc_info=True) console.print(f"[red]Error loading config file: {e}[/red]") return None + def prompt_required_inputs() -> Tuple[int, int]: while True: try: @@ -33,7 +35,7 @@ def prompt_required_inputs() -> Tuple[int, int]: ) if exit_strategy not in [1, 3]: raise ValueError("Invalid exit strategy.") - #logger.info(f"Exit Strategy selected: {exit_strategy}") + # logger.info(f"Exit Strategy selected: {exit_strategy}") break except ValueError as e: logger.warning(f"Invalid exit strategy input: {e}") @@ -48,7 +50,7 @@ def prompt_required_inputs() -> Tuple[int, int]: ) if assessment_type not in [1, 2]: raise ValueError("Invalid assessment type.") - #logger.info(f"Assessment Type selected: {assessment_type}") + # logger.info(f"Assessment Type selected: {assessment_type}") break except ValueError as e: logger.warning(f"Invalid assessment type input: {e}") @@ -56,13 +58,14 @@ def prompt_required_inputs() -> Tuple[int, int]: return exit_strategy, assessment_type -def print_step(description: str, status: str = "pending", logs: Optional[str] = None) -> None: + +def print_step( + description: str, status: str = "pending", logs: Optional[str] = None +) -> None: # Define styles for statuses ok_style = Style(color="green", bold=True) error_style = Style(color="red", bold=True) warning_style = Style(color="yellow", bold=True) - pending_style = Style(color="yellow", bold=True) - # Map statuses to their visual representation status_map = { "ok": "[ ok ]", @@ -73,7 +76,10 @@ def print_step(description: str, status: str = "pending", logs: Optional[str] = # Handle the pending status with a spinner if status == "pending": - with console.status(f"{description:<50} [yellow]{status_map['pending']}[/yellow]", spinner="dots"): + with console.status( + f"{description:<50} [yellow]{status_map['pending']}[/yellow]", + spinner="dots", + ): sleep(2) print_step(description, status="ok") elif status == "ok": @@ -99,6 +105,7 @@ def print_step(description: str, status: str = "pending", logs: Optional[str] = """ + def create_directory(base_path="reports"): # Generate the main directory with a timestamp timestamp = datetime.now().strftime("%Y%m%d%H%M%S") @@ -113,6 +120,7 @@ def create_directory(base_path="reports"): return directory_path, raw_data_path + def print_help_message(): console.print("EscapeCloud - Community Edition", style="bold cyan") console.print("[green]Run the script with one of the following options:[/green]\n") diff --git a/utils/validate.py b/utils/validate.py index fc72afa..baa62b2 100644 --- a/utils/validate.py +++ b/utils/validate.py @@ -2,11 +2,13 @@ from typing import Dict, Any from .constants import REGION_CHOICES, REQUIRED_FIELDS_AZURE, REQUIRED_FIELDS_AWS + def validate_region(region: str) -> None: valid_regions = [choice[0] for choice in REGION_CHOICES] if region not in valid_regions: raise ValueError(f"Invalid AWS region. Choose from: {', '.join(valid_regions)}") + def validate_config(config: Dict[str, Any]) -> bool: try: # Cast key values to integers to handle string input gracefully @@ -14,7 +16,9 @@ def validate_config(config: Dict[str, Any]) -> bool: cloud_service_provider = int(config.get("cloudServiceProvider", 0)) exit_strategy = int(config.get("exitStrategy", 0)) except ValueError: - raise ValueError("Invalid input: assessmentType, cloudServiceProvider, and exitStrategy must be integers.") + raise ValueError( + "Invalid input: assessmentType, cloudServiceProvider, and exitStrategy must be integers." + ) # Validate assessmentType if assessment_type not in [1, 2]: @@ -26,14 +30,18 @@ def validate_config(config: Dict[str, Any]) -> bool: # Validate exitStrategy if exit_strategy not in [1, 2, 3]: - raise ValueError("Invalid exitStrategy. Must be 1 (Repatriation to On-Premises), 2 (Hybrid Cloud Adoption) or 3 (Migration to Alternate Cloud).") + raise ValueError( + "Invalid exitStrategy. Must be 1 (Repatriation to On-Premises), 2 (Hybrid Cloud Adoption) or 3 (Migration to Alternate Cloud)." + ) # Validate name name = config.get("name", "").strip() if len(name) > 50: raise ValueError("Assessment name cannot exceed 50 characters.") if not all(c.isalnum() or c in " ._-()" for c in name): - raise ValueError("Assessment name contains invalid characters. Only letters, numbers, spaces, . _ - ( ) are allowed.") + raise ValueError( + "Assessment name contains invalid characters. Only letters, numbers, spaces, . _ - ( ) are allowed." + ) # Validate providerDetails based on cloudServiceProvider provider_details = config.get("providerDetails", {}) @@ -43,15 +51,23 @@ def validate_config(config: Dict[str, Any]) -> bool: required_fields = ["tenantId", "subscriptionId", "resourceGroupName"] else: required_fields = REQUIRED_FIELDS_AZURE - missing_fields = [field for field in required_fields if field not in provider_details] + missing_fields = [ + field for field in required_fields if field not in provider_details + ] elif cloud_service_provider == 2: # AWS - missing_fields = [field for field in REQUIRED_FIELDS_AWS if field not in provider_details] + missing_fields = [ + field for field in REQUIRED_FIELDS_AWS if field not in provider_details + ] if "region" in provider_details: validate_region(provider_details["region"]) else: - raise ValueError(f"Invalid cloudServiceProvider: {cloud_service_provider}. Supported values: 1 (Azure), 2 (AWS).") + raise ValueError( + f"Invalid cloudServiceProvider: {cloud_service_provider}. Supported values: 1 (Azure), 2 (AWS)." + ) if missing_fields: - raise ValueError(f"Missing required fields in providerDetails: {', '.join(missing_fields)}") + raise ValueError( + f"Missing required fields in providerDetails: {', '.join(missing_fields)}" + ) return True