diff --git a/backend/app/cli/scap_to_openwatch_converter_enhanced.py b/backend/app/cli/scap_to_openwatch_converter_enhanced.py index 29b510d2..958c1288 100644 --- a/backend/app/cli/scap_to_openwatch_converter_enhanced.py +++ b/backend/app/cli/scap_to_openwatch_converter_enhanced.py @@ -72,13 +72,27 @@ class ComparisonResult: TemplateProcessor ) +# Import SCAP YAML parser for variable/remediation extraction +import sys +sys.path.insert(0, str(Path(__file__).parent.parent)) +from services.scap_yaml_parser_service import extract_scap_metadata + class EnhancedSCAPConverter: """Enhanced converter with BSON and dry-run support""" - def __init__(self, scap_content_path: str, output_path: str, dry_run: bool = False): + def __init__( + self, + scap_content_path: str, + output_path: str, + dry_run: bool = False, + extract_variables: bool = False, + extract_remediation: bool = False + ): self.scap_content_path = Path(scap_content_path) self.output_path = Path(output_path) self.dry_run = dry_run + self.extract_variables = extract_variables + self.extract_remediation = extract_remediation self.framework_mapper = FrameworkMapper() self.template_processor = TemplateProcessor() self.stats = ConversionStats() @@ -171,7 +185,8 @@ def _convert_single_rule(self, rule_file: Path, output_format: str) -> None: def _build_openwatch_rule(self, rule_data: Dict[str, Any], rule_file: Path, rule_id: str) -> Dict[str, Any]: """Build OpenWatch rule structure (from original converter)""" - return { + # Base rule structure + rule = { "_id": f"ow-{rule_id}", "rule_id": f"ow-{rule_id}", "scap_rule_id": f"xccdf_org.ssgproject.content_rule_{rule_id}", @@ -212,6 +227,33 @@ def _build_openwatch_rule(self, rule_data: Dict[str, Any], rule_file: Path, rule "identifiers": self._extract_identifiers(rule_data) } + # Extract Phase 1 metadata (variables, remediation, scanner type) + if self.extract_variables or self.extract_remediation: + try: + extracted_metadata = extract_scap_metadata(rule_data, rule_file) + + # Add XCCDF variables + if self.extract_variables and extracted_metadata.get('xccdf_variables'): + rule['xccdf_variables'] = extracted_metadata['xccdf_variables'] + + # Add remediation content + if self.extract_remediation and extracted_metadata.get('remediation'): + rule['remediation'] = extracted_metadata['remediation'] + + # Always add scanner type (defaults to 'oscap' if not detected) + rule['scanner_type'] = extracted_metadata.get('scanner_type', 'oscap') + + except Exception as e: + logger.warning(f"Failed to extract metadata for {rule_id}: {e}") + # Set defaults on error + rule['scanner_type'] = 'oscap' + + else: + # If not extracting, set default scanner type + rule['scanner_type'] = 'oscap' + + return rule + def _write_json_rule(self, rule: Dict[str, Any], rule_id: str) -> None: """Write rule as JSON file""" output_file = self.output_path / f"ow-{rule_id}.json" @@ -493,6 +535,8 @@ def main(): convert_parser.add_argument('--output-path', default='/home/rracine/hanalyx/openwatch/data/compliance_rules_converted') convert_parser.add_argument('--format', choices=['json', 'bson'], default='json') convert_parser.add_argument('--dry-run', action='store_true', help='Show what would be converted') + convert_parser.add_argument('--extract-variables', action='store_true', help='Extract XCCDF variables (Phase 1)') + convert_parser.add_argument('--extract-remediation', action='store_true', help='Extract remediation content (Phase 1)') convert_parser.add_argument('--create-bundle', action='store_true', help='Create tar.gz bundle after conversion') convert_parser.add_argument('--bundle-version', default='0.0.1', help='Bundle version') @@ -511,7 +555,13 @@ def main(): args = parser.parse_args() if args.command == 'convert': - converter = EnhancedSCAPConverter(args.scap_path, args.output_path, args.dry_run) + converter = EnhancedSCAPConverter( + args.scap_path, + args.output_path, + args.dry_run, + args.extract_variables, + args.extract_remediation + ) stats = converter.convert_all_rules(args.format) # Create bundle if requested diff --git a/backend/app/models/mongo_models.py b/backend/app/models/mongo_models.py index 066c917b..1c6136d9 100644 --- a/backend/app/models/mongo_models.py +++ b/backend/app/models/mongo_models.py @@ -171,6 +171,102 @@ class FixContent(BaseModel): ) +class XCCDFVariable(BaseModel): + """ + XCCDF variable definition for scan-time customization + + XCCDF variables allow users to customize compliance checks at scan time. + Examples: session timeout values, login banner text, password policies. + + Supports Solution A (XCCDF Variables) for hybrid scanning architecture. + See: /docs/REMEDIATION_WITH_XCCDF_VARIABLES.md + """ + + model_config = { + "exclude_none": True, + "exclude_unset": True + } + + id: str = Field( + description="Variable identifier (e.g., 'var_accounts_tmout', 'login_banner_text')" + ) + title: str = Field( + description="Human-readable variable title" + ) + description: Optional[str] = Field( + default=None, + description="Detailed description of what this variable controls" + ) + type: str = Field( + pattern="^(string|number|boolean)$", + description="Variable data type: string, number, or boolean" + ) + default_value: str = Field( + description="Default value if user doesn't provide custom value" + ) + interactive: bool = Field( + default=True, + description="Whether this variable can be customized via UI/API (set to False for system variables)" + ) + sensitive: bool = Field( + default=False, + description="Whether this variable contains sensitive data (passwords, keys, etc.). Encrypted in storage, masked in UI." + ) + constraints: Optional[Dict[str, Any]] = Field( + default=None, + description=""" + Validation constraints for variable values: + - min_value/max_value: For numeric types + - min_length/max_length: For string types + - choices: List of allowed values (enum-like) + - pattern: Regex pattern for validation (string types) + + Examples: + - {"min_value": 60, "max_value": 3600} # Session timeout 1-60 mins + - {"choices": ["300", "600", "900"]} # Predefined timeout options + - {"pattern": "^grub\\.pbkdf2\\.sha512\\."} # GRUB password hash format + """ + ) + + @validator('type') + def validate_type(cls, v): + """Ensure type is one of the supported XCCDF types""" + valid_types = ['string', 'number', 'boolean'] + if v not in valid_types: + raise ValueError(f"Invalid type '{v}'. Must be one of: {', '.join(valid_types)}") + return v + + @validator('constraints') + def validate_constraints(cls, v, values): + """Validate constraints match the variable type""" + if not v: + return v + + var_type = values.get('type') + + if var_type == 'number': + # Validate numeric constraints + if 'min_value' in v and 'max_value' in v: + if v['min_value'] > v['max_value']: + raise ValueError("min_value cannot be greater than max_value") + + elif var_type == 'string': + # Validate string constraints + if 'min_length' in v and 'max_length' in v: + if v['min_length'] > v['max_length']: + raise ValueError("min_length cannot be greater than max_length") + + # Validate pattern if provided + if 'pattern' in v: + import re + try: + re.compile(v['pattern']) + except re.error as e: + raise ValueError(f"Invalid regex pattern: {e}") + + return v + + class ComplianceRule(Document): """Enhanced MongoDB model for compliance rules with inheritance and multi-platform support""" @@ -178,6 +274,14 @@ class Settings: name = "compliance_rules" use_state_management = True validate_on_save = True + indexes = [ + "rule_id", # Primary lookup + "scanner_type", # Phase 1: Route rules to appropriate scanner + "version", # Version queries + "is_latest", # Current version queries + [("rule_id", 1), ("version", -1)], # Compound: rule + version + [("scanner_type", 1), ("is_latest", 1)], # Phase 1: Latest rules by scanner type + ] # Core Identifiers rule_id: str = Field( @@ -414,7 +518,107 @@ class Settings: default=None, description="Rule ID that replaces this deprecated rule" ) - + + # ============================================================================ + # Phase 1: Hybrid Scanning Architecture (XCCDF Variables + Native Scanners) + # ============================================================================ + + # XCCDF Variables for Scan-Time Customization (Solution A) + xccdf_variables: Optional[Dict[str, XCCDFVariable]] = Field( + default=None, + description=""" + XCCDF variables that can be customized at scan time. + + Enables user customization of compliance checks without modifying rules: + - Session timeouts (var_accounts_tmout) + - Login banners (login_banner_text) + - Password policies (var_password_pam_minlen) + - GRUB credentials (grub2_bootloader_password) + - etc. + + Maps variable IDs to XCCDFVariable definitions. + + Example: + { + "var_accounts_tmout": XCCDFVariable( + id="var_accounts_tmout", + title="Account Inactivity Timeout", + type="number", + default_value="600", + constraints={"min_value": 60, "max_value": 3600} + ) + } + + See: /docs/REMEDIATION_WITH_XCCDF_VARIABLES.md + """ + ) + + # Scanner Type Routing (Polyglot Scanner Architecture) + scanner_type: str = Field( + default="oscap", + pattern="^(oscap|inspec|python|bash|aws_api|azure_api|gcp_api|kubernetes|docker|sql|mongodb|elasticsearch|opa_rego|custom)$", + description=""" + Scanner engine to use for this rule. + + OpenWatch Native Scanning Engine uses domain-specific scanners: + - oscap: Traditional OSCAP/OVAL checks (Linux/Unix) + - inspec: Chef Inspec DSL checks + - python: Custom Python scripts (sandboxed) + - bash: Simple shell checks + - aws_api: AWS cloud resources (S3, IAM, VPC, etc.) + - azure_api: Azure cloud resources + - gcp_api: GCP cloud resources + - kubernetes: K8s resource compliance (kube-bench, OPA) + - docker: Container image scanning (Trivy, Falco) + - sql: Database configuration (PostgreSQL, MySQL, etc.) + - mongodb: MongoDB configuration checks + - elasticsearch: Elasticsearch settings + - opa_rego: Open Policy Agent / Rego policies + - custom: Organization-specific custom scanner + + See: /docs/ADVANCED_SCANNING_ARCHITECTURE.md + """ + ) + + # Remediation Content for ORSA (Open Remediation Standard Adapter) + remediation: Optional[Dict[str, Any]] = Field( + default=None, + description=""" + Remediation content for ORSA (Open Remediation Standard Adapter) plugins. + + Supports multiple remediation formats extracted from XCCDF or custom-defined: + - ansible: Ansible tasks with variable bindings + - bash: Bash scripts with variable substitution + - puppet: Puppet manifests + - chef: Chef recipes + - powershell: PowerShell scripts (Windows) + - terraform: Terraform configuration changes (cloud) + - kubectl: Kubernetes manifest updates + + Example: + { + "ansible": { + "tasks": "- name: Set timeout\\n lineinfile:\\n path: /etc/profile\\n line: 'TMOUT={{ var_accounts_tmout }}'", + "variables": ["var_accounts_tmout"], + "complexity": "low", + "disruption": "low" + }, + "bash": { + "script": "echo 'TMOUT=$XCCDF_VALUE_VAR_ACCOUNTS_TMOUT' >> /etc/profile", + "variables": ["var_accounts_tmout"] + } + } + + ORSA plugins extract remediation from this field and execute via appropriate tool. + + See: /docs/PLUGIN_ARCHITECTURE.md (ORSA section) + """ + ) + + # ============================================================================ + # End Phase 1 Fields + # ============================================================================ + @validator('rule_id') def validate_rule_id(cls, v): if not v or len(v) < 3: diff --git a/backend/app/services/scap_yaml_parser_service.py b/backend/app/services/scap_yaml_parser_service.py new file mode 100644 index 00000000..7fbc28a5 --- /dev/null +++ b/backend/app/services/scap_yaml_parser_service.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 +""" +SCAP YAML Parser Service - Extract XCCDF Variables and Remediation Content + +This service parses SCAP YAML rules (ComplianceAsCode format) to extract: +- XCCDF variable definitions (for Solution A scan-time customization) +- Remediation content (Ansible, Bash scripts) +- Scanner type detection (oscap vs custom scanners) + +Part of Phase 1, Issue #2: Enhanced SCAP Converter with Variable Extraction +""" + +import re +from typing import Dict, List, Any, Optional, Set +from pathlib import Path +import logging + +logger = logging.getLogger(__name__) + + +class XCCDFVariableExtractor: + """ + Extracts XCCDF variable definitions from SCAP YAML rules + + XCCDF variables enable scan-time customization of compliance checks. + These are referenced in rule templates and can be overridden by users. + + Examples: + - var_accounts_tmout: Session timeout (300-3600 seconds) + - login_banner_text: Custom login banner + - var_password_pam_minlen: Minimum password length + """ + + # Common XCCDF variable patterns in YAML + VARIABLE_PATTERNS = [ + r'{{% set\s+(\w+)\s*=\s*(.+?)\s*%}}', # Jinja2 set + r'\$(\w+)', # Shell variable + r'{{{?\s*(\w+)\s*}}}?', # Jinja2 variable reference + ] + + # Variable type inference from naming conventions + TYPE_INFERENCE = { + 'timeout': 'number', + 'minlen': 'number', + 'maxlen': 'number', + 'min_': 'number', + 'max_': 'number', + 'count': 'number', + 'banner': 'string', + 'text': 'string', + 'password': 'string', + 'enabled': 'boolean', + 'disabled': 'boolean', + 'enforce': 'boolean', + } + + def extract_variables(self, rule_data: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Extract XCCDF variables from SCAP rule + + Args: + rule_data: Parsed YAML rule data + + Returns: + Dict mapping variable IDs to XCCDFVariable definitions or None + """ + variables = {} + + # Extract from template vars (most reliable source) + template = rule_data.get('template', {}) + if template and isinstance(template, dict): + template_vars = template.get('vars', {}) + for var_id, var_value in template_vars.items(): + # Skip internal template variables (starts with underscore or is internal) + if var_id.startswith('_') or var_id in ['name', 'ocp_data', 'filepath', 'yamlpath']: + continue + + var_def = self._create_variable_definition(var_id, var_value, rule_data) + if var_def: + variables[var_id] = var_def + + # Extract from description text (look for variable references) + description = rule_data.get('description', '') + var_refs = self._find_variable_references(description) + + for var_ref in var_refs: + if var_ref not in variables: + # Create placeholder variable definition + var_def = self._create_placeholder_variable(var_ref, rule_data) + if var_def: + variables[var_ref] = var_def + + return variables if variables else None + + def _create_variable_definition( + self, + var_id: str, + var_value: Any, + rule_data: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """ + Create XCCDFVariable definition from template variable + + Args: + var_id: Variable identifier + var_value: Variable default value + rule_data: Full rule data for context + + Returns: + XCCDFVariable dict or None + """ + # Determine variable type + var_type = self._infer_type(var_id, var_value) + + # Create variable definition + var_def = { + "id": var_id, + "title": self._generate_title(var_id), + "description": self._extract_variable_description(var_id, rule_data), + "type": var_type, + "default_value": str(var_value), + "interactive": True, # Most template vars are user-customizable + "sensitive": self._is_sensitive(var_id), + } + + # Add constraints if we can infer them + constraints = self._infer_constraints(var_id, var_value, var_type) + if constraints: + var_def["constraints"] = constraints + + return var_def + + def _create_placeholder_variable( + self, + var_id: str, + rule_data: Dict[str, Any] + ) -> Optional[Dict[str, Any]]: + """ + Create placeholder for variable referenced but not defined + """ + return { + "id": var_id, + "title": self._generate_title(var_id), + "description": f"Referenced in rule but not explicitly defined. May be inherited from profile.", + "type": "string", # Conservative default + "default_value": "", + "interactive": True, + "sensitive": self._is_sensitive(var_id), + } + + def _infer_type(self, var_id: str, var_value: Any) -> str: + """Infer XCCDF variable type""" + if isinstance(var_value, bool): + return "boolean" + if isinstance(var_value, (int, float)): + return "number" + + if isinstance(var_value, str): + if var_value.lower() in ('true', 'false', 'yes', 'no', '0', '1'): + return "boolean" + try: + float(var_value) + return "number" + except ValueError: + pass + + var_id_lower = var_id.lower() + for pattern, var_type in self.TYPE_INFERENCE.items(): + if pattern in var_id_lower: + return var_type + + return "string" + + def _generate_title(self, var_id: str) -> str: + """Generate human-readable title from variable ID""" + title = var_id.replace('_', ' ').replace('var ', '').title() + return title + + def _extract_variable_description(self, var_id: str, rule_data: Dict[str, Any]) -> Optional[str]: + """Extract variable description from rule metadata""" + description = rule_data.get('description', '') + if var_id in description: + sentences = description.split('.') + for sentence in sentences: + if var_id in sentence: + return sentence.strip() + + rationale = rule_data.get('rationale', '') + if var_id in rationale: + sentences = rationale.split('.') + for sentence in sentences: + if var_id in sentence: + return sentence.strip() + + return None + + def _is_sensitive(self, var_id: str) -> bool: + """Determine if variable contains sensitive data""" + sensitive_patterns = [ + 'password', 'passwd', 'secret', 'key', 'private', + 'credential', 'token', 'api_key', 'auth', + ] + var_id_lower = var_id.lower() + return any(pattern in var_id_lower for pattern in sensitive_patterns) + + def _infer_constraints( + self, + var_id: str, + var_value: Any, + var_type: str + ) -> Optional[Dict[str, Any]]: + """Infer validation constraints""" + constraints = {} + + if var_type == "number": + if 'timeout' in var_id.lower() or 'tmout' in var_id.lower(): + constraints['min_value'] = 60 + constraints['max_value'] = 3600 + elif 'minlen' in var_id.lower(): + constraints['min_value'] = 6 + constraints['max_value'] = 128 + elif 'maxlen' in var_id.lower(): + constraints['min_value'] = 8 + constraints['max_value'] = 256 + + elif var_type == "string": + if 'banner' in var_id.lower() or 'text' in var_id.lower(): + constraints['min_length'] = 1 + constraints['max_length'] = 1024 + + if 'password' in var_id.lower() and 'grub' in var_id.lower(): + constraints['pattern'] = r'^grub\.pbkdf2\.sha512\.' + + return constraints if constraints else None + + def _find_variable_references(self, text: str) -> Set[str]: + """Find all variable references in text""" + var_refs = set() + + for pattern in self.VARIABLE_PATTERNS: + matches = re.findall(pattern, text) + if matches: + if isinstance(matches[0], tuple): + matches = [m[0] for m in matches] + var_refs.update(matches) + + filtered = { + v for v in var_refs + if v.startswith('var_') or len(v) > 3 + } + + return filtered + + +class RemediationExtractor: + """Extract remediation content from SCAP rules""" + + # Template to Ansible module mappings + TEMPLATE_MAPPINGS = { + 'sysctl': 'ansible.posix.sysctl', + 'file_permissions': 'ansible.builtin.file', + 'file_owner': 'ansible.builtin.file', + 'file_groupowner': 'ansible.builtin.file', + 'service_enabled': 'ansible.builtin.systemd', + 'service_disabled': 'ansible.builtin.systemd', + 'package_installed': 'ansible.builtin.package', + 'package_removed': 'ansible.builtin.package', + 'mount_option': 'ansible.posix.mount', + 'kernel_module_disabled': 'community.general.kernel_blacklist', + 'audit_rules': 'ansible.builtin.lineinfile', + 'grub2_argument': 'ansible.builtin.lineinfile', + 'yamlfile_value': None, # K8s/OpenShift - no Ansible needed + } + + def extract_remediations(self, rule_data: Dict[str, Any], rule_file: Path) -> Optional[Dict[str, Any]]: + """Extract all remediation content from SCAP rule""" + remediations = {} + + # Extract from template + template = rule_data.get('template', {}) + if template and isinstance(template, dict): + template_name = template.get('name', '') + template_vars = template.get('vars', {}) + + remediation_content = self._extract_from_template(template_name, template_vars) + if remediation_content: + remediations.update(remediation_content) + + # Check for separate remediation files + rule_dir = rule_file.parent + remediation_files = self._find_remediation_files(rule_dir) + + for rem_type, rem_file in remediation_files.items(): + try: + with open(rem_file, 'r', encoding='utf-8') as f: + remediations[rem_type] = f.read() + except Exception as e: + logger.warning(f"Could not read remediation file {rem_file}: {e}") + + return remediations if remediations else None + + def _extract_from_template(self, template_name: str, template_vars: Dict[str, Any]) -> Dict[str, Any]: + """Extract remediation from template definition""" + remediations = {} + + if template_name in self.TEMPLATE_MAPPINGS: + ansible_module = self.TEMPLATE_MAPPINGS[template_name] + + if ansible_module: + ansible_task = self._generate_ansible_task(ansible_module, template_vars) + remediations['ansible'] = ansible_task + + bash_script = self._generate_bash_script(template_name, template_vars) + if bash_script: + remediations['bash'] = bash_script + + return remediations + + def _generate_ansible_task(self, module_name: str, vars_dict: Dict[str, Any]) -> str: + """Generate Ansible task YAML""" + import yaml + task = { + 'name': f'Apply {module_name} configuration', + module_name: self._map_vars_to_module_params(module_name, vars_dict) + } + return yaml.dump([task], default_flow_style=False, sort_keys=False) + + def _map_vars_to_module_params(self, module_name: str, vars_dict: Dict[str, Any]) -> Dict[str, Any]: + """Map template variables to Ansible module parameters""" + params = {} + + if 'sysctl' in module_name: + params['name'] = vars_dict.get('sysctlvar', vars_dict.get('name')) + params['value'] = vars_dict.get('sysctlval', vars_dict.get('value')) + params['state'] = 'present' + params['reload'] = True + + elif 'file' in module_name: + params['path'] = vars_dict.get('filepath', vars_dict.get('path')) + if 'filemode' in vars_dict: + params['mode'] = vars_dict['filemode'] + if 'fileuid' in vars_dict: + params['owner'] = vars_dict['fileuid'] + if 'filegid' in vars_dict: + params['group'] = vars_dict['filegid'] + + elif 'systemd' in module_name or 'service' in module_name: + params['name'] = vars_dict.get('servicename', vars_dict.get('name')) + params['enabled'] = vars_dict.get('servicestate') != 'disabled' + params['state'] = 'started' if params['enabled'] else 'stopped' + + elif 'package' in module_name: + params['name'] = vars_dict.get('packagename', vars_dict.get('name')) + params['state'] = 'present' if 'installed' in module_name else 'absent' + + else: + params = vars_dict.copy() + + return params + + def _generate_bash_script(self, template_name: str, vars_dict: Dict[str, Any]) -> Optional[str]: + """Generate Bash script for simple remediation""" + script_lines = ['#!/bin/bash', f'# Apply {template_name} configuration', ''] + + if template_name == 'sysctl': + sysctl_var = vars_dict.get('sysctlvar') + sysctl_val = vars_dict.get('sysctlval') + if sysctl_var and sysctl_val is not None: + script_lines.append(f'sysctl -w {sysctl_var}={sysctl_val}') + script_lines.append(f'echo "{sysctl_var} = {sysctl_val}" >> /etc/sysctl.conf') + return '\n'.join(script_lines) + + elif template_name in ['file_permissions', 'file_owner', 'file_groupowner']: + filepath = vars_dict.get('filepath') + if filepath: + if 'filemode' in vars_dict: + script_lines.append(f'chmod {vars_dict["filemode"]} {filepath}') + if 'fileuid' in vars_dict: + script_lines.append(f'chown {vars_dict["fileuid"]} {filepath}') + if 'filegid' in vars_dict: + script_lines.append(f'chgrp {vars_dict["filegid"]} {filepath}') + return '\n'.join(script_lines) + + elif template_name in ['service_enabled', 'service_disabled']: + service = vars_dict.get('servicename') + if service: + action = 'enable' if 'enabled' in template_name else 'disable' + script_lines.append(f'systemctl {action} {service}') + script_lines.append(f'systemctl {"start" if action == "enable" else "stop"} {service}') + return '\n'.join(script_lines) + + return None + + def _find_remediation_files(self, rule_dir: Path) -> Dict[str, Path]: + """Find separate remediation files in rule directory""" + remediation_files = {} + + ansible_file = rule_dir / 'ansible' / 'shared.yml' + if ansible_file.exists(): + remediation_files['ansible'] = ansible_file + + bash_file = rule_dir / 'bash' / 'shared.sh' + if bash_file.exists(): + remediation_files['bash'] = bash_file + + puppet_file = rule_dir / 'puppet' / 'shared.pp' + if puppet_file.exists(): + remediation_files['puppet'] = puppet_file + + return remediation_files + + +class ScannerTypeDetector: + """Detects scanner type from SCAP rule metadata""" + + PLATFORM_SCANNER_MAPPINGS = { + 'kubernetes': 'kubernetes', + 'openshift': 'kubernetes', + 'docker': 'docker', + 'podman': 'docker', + 'aws': 'aws_api', + 'azure': 'azure_api', + 'gcp': 'gcp_api', + } + + def detect_scanner(self, rule_data: Dict[str, Any], rule_file: Path) -> str: + """Detect scanner type from rule metadata""" + # Method 1: Detect from template name + template = rule_data.get('template', {}) + if template and isinstance(template, dict): + template_name = template.get('name', '') + + if template_name == 'yamlfile_value': + template_vars = template.get('vars', {}) + if template_vars.get('ocp_data') == 'true': + return 'kubernetes' + + if 'aws' in template_name.lower(): + return 'aws_api' + if 'azure' in template_name.lower(): + return 'azure_api' + if 'gcp' in template_name.lower(): + return 'gcp_api' + + # Method 2: Detect from file path + path_str = str(rule_file).lower() + for platform, scanner in self.PLATFORM_SCANNER_MAPPINGS.items(): + if platform in path_str: + return scanner + + # Default: OSCAP for traditional Linux/Unix compliance + return 'oscap' + + +def extract_scap_metadata(rule_data: Dict[str, Any], rule_file: Path) -> Dict[str, Any]: + """ + Extract all SCAP metadata (variables, remediation, scanner type) + + Args: + rule_data: Parsed YAML rule data + rule_file: Path to rule.yml file + + Returns: + Dict with keys: xccdf_variables, remediation, scanner_type + """ + var_extractor = XCCDFVariableExtractor() + rem_extractor = RemediationExtractor() + scanner_detector = ScannerTypeDetector() + + return { + 'xccdf_variables': var_extractor.extract_variables(rule_data), + 'remediation': rem_extractor.extract_remediations(rule_data, rule_file), + 'scanner_type': scanner_detector.detect_scanner(rule_data, rule_file), + }