Objective
Implement MongoDB-based scan service that routes compliance checks to appropriate scanners (OSCAP, Kubernetes, cloud APIs) and applies XCCDF variable customization.
Context
This implements Phase 1, Task 1.4 of the 7-phase hybrid scanning architecture plan. Builds on:
Requirements
1. Scan Orchestrator
Central service that coordinates multi-scanner execution:
- Query MongoDB for rules to scan
- Group rules by
scanner_type (oscap, kubernetes, aws_api, etc.)
- Route rule groups to appropriate scanner implementations
- Aggregate results from all scanners
- Store results in MongoDB
2. Scanner Interface
Abstract base class for scanner implementations:
class BaseScanner(ABC):
@abstractmethod
async def scan(
self,
rules: List[ComplianceRule],
target: ScanTarget,
variables: Dict[str, str]
) -> ScanResult:
pass
3. OSCAP Scanner Implementation
Execute traditional OVAL-based checks:
- Generate XCCDF benchmark from rules
- Apply variable overrides via tailoring file
- Execute
oscap xccdf eval on target host
- Parse XCCDF results XML
- Return structured results
4. Kubernetes Scanner Implementation
Execute YAML-based checks for OpenShift/K8s:
- Extract yamlpath queries from rule metadata
- Query Kubernetes API for resources
- Evaluate conditions against actual values
- Return pass/fail per rule
5. Cloud Scanner Stubs
Placeholder implementations for future cloud scanners:
AWSScanner: AWS API checks (S3, IAM, VPC)
AzureScanner: Azure Resource Manager checks
GCPScanner: Google Cloud API checks
6. Scan Configuration
class ScanConfiguration:
target: ScanTarget # Host, cluster, cloud account
framework: str # nist, cis, stig
framework_version: str # 800-53r5, v2.0.0
variable_overrides: Dict[str, str] # Custom variable values
rule_filter: Optional[Dict] # Additional MongoDB query
7. Scan Results Storage
class ScanResult(Document):
scan_id: str
scan_config: ScanConfiguration
started_at: datetime
completed_at: datetime
status: str # running, completed, failed
total_rules: int
passed: int
failed: int
error: int
not_applicable: int
results_by_rule: List[RuleResult]
scanner_metadata: Dict[str, Any]
Implementation Plan
Files to Create
backend/app/services/scan_orchestrator_service.py - Central coordinator
backend/app/services/scanners/base_scanner.py - Abstract base class
backend/app/services/scanners/oscap_scanner.py - OSCAP implementation
backend/app/services/scanners/kubernetes_scanner.py - K8s implementation
backend/app/services/scanners/__init__.py - Scanner factory
backend/app/models/scan_models.py - MongoDB models for scans
backend/app/api/v1/endpoints/scans_api.py - API endpoints
Code Structure
Scan Orchestrator
class ScanOrchestrator:
def __init__(self, db: AsyncIOMotorDatabase):
self.db = db
self.scanners = {
'oscap': OSCAPScanner(),
'kubernetes': KubernetesScanner(),
'aws_api': AWSScanner(), # Stub
'azure_api': AzureScanner(), # Stub
'gcp_api': GCPScanner(), # Stub
}
async def execute_scan(
self,
scan_config: ScanConfiguration
) -> ScanResult:
# 1. Query rules from MongoDB
rules = await self._get_rules(scan_config)
# 2. Group by scanner_type
rules_by_scanner = self._group_by_scanner(rules)
# 3. Execute each scanner in parallel
scanner_results = await asyncio.gather(*[
self.scanners[scanner_type].scan(
rules=scanner_rules,
target=scan_config.target,
variables=scan_config.variable_overrides
)
for scanner_type, scanner_rules in rules_by_scanner.items()
])
# 4. Aggregate results
aggregated = self._aggregate_results(scanner_results)
# 5. Store in MongoDB
await self._store_results(aggregated)
return aggregated
OSCAP Scanner
class OSCAPScanner(BaseScanner):
async def scan(
self,
rules: List[ComplianceRule],
target: ScanTarget,
variables: Dict[str, str]
) -> ScanResult:
# 1. Generate XCCDF benchmark
generator = XCCDFGeneratorService(self.db)
benchmark_xml = await generator.generate_benchmark(...)
# 2. Generate tailoring file with variable overrides
if variables:
tailoring_xml = await generator.generate_tailoring(
variable_overrides=variables
)
# 3. Execute oscap via SSH or locally
if target.is_remote:
result = await self._scan_remote(
host=target.host,
benchmark=benchmark_xml,
tailoring=tailoring_xml
)
else:
result = await self._scan_local(...)
# 4. Parse XCCDF results XML
parsed = self._parse_oscap_results(result.stdout)
return ScanResult(**parsed)
async def _scan_remote(self, host, benchmark, tailoring):
# Use oscap-ssh for remote scanning
cmd = [
'oscap-ssh',
f'{host.username}@{host.address}',
'xccdf', 'eval',
'--profile', profile_id,
'--tailoring-file', '/tmp/tailoring.xml',
'--results', '/tmp/results.xml',
'/tmp/benchmark.xml'
]
return await asyncio.create_subprocess_exec(*cmd, ...)
Kubernetes Scanner
class KubernetesScanner(BaseScanner):
async def scan(
self,
rules: List[ComplianceRule],
target: ScanTarget,
variables: Dict[str, str]
) -> ScanResult:
# 1. Initialize K8s client
config.load_kube_config(target.kubeconfig)
api = client.ApiClient()
results = []
for rule in rules:
# 2. Extract yamlpath query
check_content = rule.check_content
yamlpath = check_content['yamlpath']
expected = check_content['expected_value']
# 3. Query K8s API
resource = await self._query_resource(
api,
check_content['resource_type'],
yamlpath
)
# 4. Evaluate condition
result = self._evaluate(resource, expected)
results.append(result)
return ScanResult(results=results)
API Endpoints
@router.post('/scans/execute')
async def execute_scan(
config: ScanConfiguration,
current_user: Dict = Depends(get_current_user)
) -> ScanResult:
orchestrator = ScanOrchestrator(db)
result = await orchestrator.execute_scan(config)
return result
@router.get('/scans/{scan_id}')
async def get_scan_result(
scan_id: str,
current_user: Dict = Depends(get_current_user)
) -> ScanResult:
return await ScanResult.get(scan_id)
@router.get('/scans')
async def list_scans(
skip: int = 0,
limit: int = 50,
current_user: Dict = Depends(get_current_user)
) -> List[ScanResult]:
return await ScanResult.find().skip(skip).limit(limit).to_list()
Testing Requirements
Unit Tests
- Test scanner routing logic (rules grouped by scanner_type)
- Test OSCAP command generation with variable overrides
- Test Kubernetes API query construction
- Test result aggregation from multiple scanners
Integration Tests
- Execute OSCAP scan on test host with NIST rules
- Execute Kubernetes scan on test cluster
- Test variable override application
- Verify results stored correctly in MongoDB
End-to-End Test
# 1. Generate benchmark
curl -X POST /api/v1/xccdf/generate-benchmark -d '{...}'
# 2. Execute scan with variable overrides
curl -X POST /api/v1/scans/execute -d '{
"target": {"type": "ssh", "host": "test.example.com"},
"framework": "nist",
"framework_version": "800-53r5",
"variable_overrides": {
"xccdf_com.hanalyx.openwatch_value_var_accounts_tmout": "300"
}
}'
# 3. Check scan status
curl /api/v1/scans/{scan_id}
# 4. Verify results
# - Check pass/fail counts
# - Verify variable override applied
# - Validate results stored in MongoDB
Acceptance Criteria
Dependencies
Estimated Time
5-7 days
Branch Name
feature/scan-service
Related Documentation
Objective
Implement MongoDB-based scan service that routes compliance checks to appropriate scanners (OSCAP, Kubernetes, cloud APIs) and applies XCCDF variable customization.
Context
This implements Phase 1, Task 1.4 of the 7-phase hybrid scanning architecture plan. Builds on:
Requirements
1. Scan Orchestrator
Central service that coordinates multi-scanner execution:
scanner_type(oscap, kubernetes, aws_api, etc.)2. Scanner Interface
Abstract base class for scanner implementations:
3. OSCAP Scanner Implementation
Execute traditional OVAL-based checks:
oscap xccdf evalon target host4. Kubernetes Scanner Implementation
Execute YAML-based checks for OpenShift/K8s:
5. Cloud Scanner Stubs
Placeholder implementations for future cloud scanners:
AWSScanner: AWS API checks (S3, IAM, VPC)AzureScanner: Azure Resource Manager checksGCPScanner: Google Cloud API checks6. Scan Configuration
7. Scan Results Storage
Implementation Plan
Files to Create
backend/app/services/scan_orchestrator_service.py- Central coordinatorbackend/app/services/scanners/base_scanner.py- Abstract base classbackend/app/services/scanners/oscap_scanner.py- OSCAP implementationbackend/app/services/scanners/kubernetes_scanner.py- K8s implementationbackend/app/services/scanners/__init__.py- Scanner factorybackend/app/models/scan_models.py- MongoDB models for scansbackend/app/api/v1/endpoints/scans_api.py- API endpointsCode Structure
Scan Orchestrator
OSCAP Scanner
Kubernetes Scanner
API Endpoints
Testing Requirements
Unit Tests
Integration Tests
End-to-End Test
Acceptance Criteria
/api/v1/scans/*)Dependencies
Estimated Time
5-7 days
Branch Name
feature/scan-serviceRelated Documentation
/docs/IMPLEMENTATION_PLAN_7_PHASES.md(Phase 1, Task 1.4)