# ISO/IEC 27002:2022 CONTROL ASSESSMENT

Analyses 3: Comprehensive assessment of ISO 27002 controls for identified CPSS devices.

Input:
- cpss_all_services_enhanced.csv (from analyses_2)

Output:
- cpss_iso27002_assessment.csv
- cpss_iso27002_heatmap.png
- cpss_iso27002_report.txt

ISO 27002 Controls Assessed:
- A.5.15: Access control
- A.5.37: Documented operating procedures
- A.8.2: Privileged access rights
- A.8.5: Secure authentication
- A.8.9: Configuration management
- A.8.21: Security of network services
- A.8.22: Segregation of networks
- A.8.23: Web filtering
- A.8.24: Use of cryptography
- A.8.26: Application security requirements


## Configuration

In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from datetime import datetime
import re

# ============================================================================
# CONFIGURATION
# ============================================================================

# Input/Output paths
INPUT_FILE = Path('./output/2_cpss_identification/cpss_all_services_enhanced.csv')
OUTPUT_DIR = Path('./output/3_iso27002_assessment')
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# ISO 27002:2022 Control Definitions
ISO_CONTROLS = {
    'A.5.15': {
        'name': 'Access control',
        'description': 'Rules to control physical and logical access'
    },
    'A.8.2': {
        'name': 'Privileged access rights',
        'description': 'Allocation and use of privileged access rights'
    },
    'A.8.5': {
        'name': 'Secure authentication',
        'description': 'Secure authentication technologies and procedures'
    },
    'A.8.9': {
        'name': 'Configuration management',
        'description': 'Configurations including security configurations'
    },
    'A.8.21': {
        'name': 'Security of network services',
        'description': 'Security mechanisms, service levels, requirements'
    },
    'A.8.22': {
        'name': 'Segregation of networks',
        'description': 'Groups of information services, users and systems'
    },
    'A.8.23': {
        'name': 'Web filtering',
        'description': 'Access to external websites managed'
    },
    'A.8.24': {
        'name': 'Use of cryptography',
        'description': 'Rules for effective use of cryptography'
    },
    'A.8.26': {
        'name': 'Application security requirements',
        'description': 'Information security requirements for applications'
    }
}

# Risk scoring thresholds
RISK_THRESHOLDS = {
    'critical': 8.0,
    'high': 6.0,
    'medium': 4.0,
    'low': 0.0
}

# Color scheme
COLORS = {
    'critical': '#DC2626',  # Red
    'high': '#EA580C',      # Orange
    'medium': '#F59E0B',    # Amber
    'low': '#10B981',       # Green
    'compliant': '#059669', # Dark green
}

# ============================================================================
# HELPER FUNCTIONS
# ============================================================================

def safe_get(row, column, default=''):
    """Safely get column value, return default if missing or NaN"""
    try:
        val = row.get(column, default)
        if pd.isna(val):
            return default
        return val
    except:
        return default

def detect_category(row):
    """Detect CPSS category from row data"""
    # Check for category column variations
    for col in ['category', 'cpss_category', 'device_category', 'type']:
        if col in row.index and pd.notna(row.get(col)):
            return str(row[col])

    # Infer from boolean flags
    if row.get('is_eacs', False):
        return 'EACS'
    elif row.get('is_vss', False):
        return 'VSS'
    elif row.get('is_ihas', False):
        return 'I&HAS'

    # Infer from brand if available
    brand = safe_get(row, 'detected_brand', '').lower()
    if brand:
        vss_brands = ['hikvision', 'dahua', 'axis', 'hanwha', 'mobotix', 'geovision']
        eacs_brands = ['nedap', 'paxton', 'genetec', 'salto', 'assa']
        ihas_brands = ['ajax', 'vanderbilt', 'honeywell', 'bosch']

        if any(b in brand for b in vss_brands):
            return 'VSS'
        elif any(b in brand for b in eacs_brands):
            return 'EACS'
        elif any(b in brand for b in ihas_brands):
            return 'I&HAS'

    return 'Unknown'

## Assessment

### Assessment functions

In [7]:
# ============================================================================
# ASSESSMENT FUNCTIONS
# ============================================================================

def assess_access_control(row):
    """Assess A.5.15: Access control"""
    issues = []
    score = 10.0

    title = str(safe_get(row, 'http.html_title', '')).lower()
    path = str(safe_get(row, 'http.path', '')).lower()
    banner = str(safe_get(row, 'service.banner', '')).lower()

    if any(word in title for word in ['login', 'admin', 'management', 'config']):
        issues.append('Exposed authentication interface')
        score = 7.0

    if any(p in path for p in ['/admin', '/login', '/config', '/setup']):
        issues.append('Sensitive path exposed')
        score = max(score, 7.0)

    if 'no authentication' in banner or 'anonymous' in banner:
        issues.append('No authentication required')
        score = 10.0

    if not issues:
        score = 2.0
        issues.append('No obvious access control issues')

    return {
        'control': 'A.5.15',
        'score': score,
        'issues': '; '.join(issues),
        'compliant': score < RISK_THRESHOLDS['medium']
    }

def assess_privileged_access(row):
    """Assess A.8.2: Privileged access rights"""
    issues = []
    score = 10.0

    title = str(safe_get(row, 'http.html_title', '')).lower()
    if 'admin' in title or 'root' in title:
        issues.append('Exposed admin interface')
        score = 8.0

    banner = str(safe_get(row, 'service.banner', '')).lower()
    if 'default' in banner:
        issues.append('Default credentials possible')
        score = 10.0

    if not issues:
        score = 3.0
        issues.append('No obvious privileged access issues')

    return {
        'control': 'A.8.2',
        'score': score,
        'issues': '; '.join(issues),
        'compliant': score < RISK_THRESHOLDS['medium']
    }

def assess_secure_authentication(row):
    """Assess A.8.5: Secure authentication"""
    issues = []
    score = 10.0

    port = safe_get(row, 'service.port', 0)
    if port == 80:
        issues.append('Authentication over unencrypted HTTP')
        score = 9.0

    headers = str(safe_get(row, 'http.headers', '')).lower()
    if 'www-authenticate: basic' in headers:
        issues.append('Basic authentication (weak)')
        score = max(score, 8.0)

    brand = str(safe_get(row, 'detected_brand', '')).lower()
    if brand in ['hikvision', 'dahua']:
        issues.append(f'Vendor {brand} - check default credentials')
        score = max(score, 6.0)

    if not issues:
        score = 3.0
        issues.append('No obvious authentication weaknesses')

    return {
        'control': 'A.8.5',
        'score': score,
        'issues': '; '.join(issues),
        'compliant': score < RISK_THRESHOLDS['medium']
    }

def assess_configuration_management(row):
    """Assess A.8.9: Configuration management"""
    issues = []
    score = 10.0

    default_ports = {80, 554, 8080, 8000, 37777, 34567, 9999}
    port = safe_get(row, 'service.port', 0)

    if port in default_ports:
        issues.append(f'Default port {port}')
        score = 6.0

    path = str(safe_get(row, 'http.path', '')).lower()
    if '/config' in path or '/setup' in path:
        issues.append('Configuration interface exposed')
        score = max(score, 7.0)

    if not issues:
        score = 3.0
        issues.append('No obvious configuration issues')

    return {
        'control': 'A.8.9',
        'score': score,
        'issues': '; '.join(issues),
        'compliant': score < RISK_THRESHOLDS['medium']
    }

def assess_network_security(row):
    """Assess A.8.21: Security of network services"""
    issues = ['Service internet-exposed']
    score = 7.0

    protocol = str(safe_get(row, 'service.protocol', '')).lower()
    if any(p in protocol for p in ['http', 'ftp', 'telnet', 'rtsp']):
        issues.append(f'Insecure protocol: {protocol}')
        score = 8.0

    return {
        'control': 'A.8.21',
        'score': score,
        'issues': '; '.join(issues),
        'compliant': False
    }

def assess_network_segregation(row):
    """Assess A.8.22: Segregation of networks"""
    issues = ['Direct internet exposure - no segregation']
    score = 8.0

    title = str(safe_get(row, 'http.html_title', '')).lower()
    if any(word in title for word in ['admin', 'management', 'config']):
        issues.append('Management interface on public internet')
        score = 10.0

    return {
        'control': 'A.8.22',
        'score': score,
        'issues': '; '.join(issues),
        'compliant': False
    }

def assess_web_filtering(row):
    """Assess A.8.23: Web filtering"""
    issues = []
    score = 10.0

    port = safe_get(row, 'service.port', 0)
    if port in [80, 443, 8080, 8000]:
        issues.append('Web interface publicly accessible')
        score = 7.0

    if not issues:
        score = 6.0
        issues.append('Service accessible - insufficient filtering')

    return {
        'control': 'A.8.23',
        'score': score,
        'issues': '; '.join(issues),
        'compliant': score < RISK_THRESHOLDS['medium']
    }

def assess_cryptography(row):
    """Assess A.8.24: Use of cryptography"""
    issues = []
    score = 10.0

    port = safe_get(row, 'service.port', 0)
    if port == 80:
        issues.append('Unencrypted HTTP in use')
        score = 8.0
    elif port == 443:
        score = 2.0
        issues.append('HTTPS in use')

    protocol = str(safe_get(row, 'service.protocol', '')).lower()
    if any(p in protocol for p in ['http', 'ftp', 'telnet', 'rtsp']) and port != 443:
        issues.append(f'Unencrypted protocol: {protocol}')
        score = 8.0

    if not issues:
        score = 3.0
        issues.append('No cryptography in use')

    return {
        'control': 'A.8.24',
        'score': score,
        'issues': '; '.join(issues),
        'compliant': score < RISK_THRESHOLDS['medium']
    }

def assess_application_security(row):
    """Assess A.8.26: Application security requirements"""
    issues = []
    score = 10.0

    version = str(safe_get(row, 'service.product_version', ''))
    if version and version != '':
        issues.append(f'Version disclosed: {version}')
        score = 6.0

    if not issues:
        score = 4.0
        issues.append('No obvious application security issues')

    return {
        'control': 'A.8.26',
        'score': score,
        'issues': '; '.join(issues),
        'compliant': score < RISK_THRESHOLDS['medium']
    }

### Main Assessment

In [8]:
# ============================================================================
# MAIN ASSESSMENT
# ============================================================================

def run_assessment(df):
    """Run complete ISO 27002 assessment"""
    print("\n" + "="*70)
    print("ISO/IEC 27002:2022 CONTROL ASSESSMENT")
    print("="*70)

    results = []

    print(f"\nAssessing {len(df):,} CPSS devices...")

    for idx, row in df.iterrows():
        device_results = {
            'ip': safe_get(row, 'ip.ip', 'unknown'),
            'port': safe_get(row, 'service.port', 0),
            'category': detect_category(row),
            'brand': safe_get(row, 'detected_brand', 'Unknown'),
        }

        # Run assessments
        assessments = [
            assess_access_control(row),
            assess_privileged_access(row),
            assess_secure_authentication(row),
            assess_configuration_management(row),
            assess_network_security(row),
            assess_network_segregation(row),
            assess_web_filtering(row),
            assess_cryptography(row),
            assess_application_security(row),
        ]

        # Add results
        for assessment in assessments:
            control = assessment['control']
            device_results[f'{control}_score'] = assessment['score']
            device_results[f'{control}_issues'] = assessment['issues']
            device_results[f'{control}_compliant'] = assessment['compliant']

        # Overall metrics
        scores = [a['score'] for a in assessments]
        device_results['overall_risk_score'] = np.mean(scores)
        device_results['overall_compliant'] = all(a['compliant'] for a in assessments)
        device_results['compliance_rate'] = sum(a['compliant'] for a in assessments) / len(assessments)

        results.append(device_results)

        if (idx + 1) % 500 == 0:
            print(f"  Processed {idx + 1:,}/{len(df):,} devices...")

    results_df = pd.DataFrame(results)

    print("\nAssessment complete")
    print(f"  Devices assessed: {len(results_df):,}")
    print(f"  Average compliance: {results_df['compliance_rate'].mean()*100:.1f}%")

    return results_df

## Analyses & Reporting

In [9]:
# ============================================================================
# ANALYSIS & REPORTING
# ============================================================================

def generate_summary_statistics(results_df):
    """Generate summary statistics"""

    print("\n" + "="*70)
    print("SUMMARY STATISTICS")
    print("="*70)

    print(f"\nTotal devices: {len(results_df):,}")
    print(f"Average compliance: {results_df['compliance_rate'].mean()*100:.1f}%")
    print(f"Fully compliant: {results_df['overall_compliant'].sum():,}")

    # By category
    if 'category' in results_df.columns:
        print("\nBy Category:")
        for cat in results_df['category'].unique():
            cat_df = results_df[results_df['category'] == cat]
            print(f"  {cat}: {len(cat_df):,} devices, {cat_df['compliance_rate'].mean()*100:.1f}% compliant")

def create_visualizations(results_df):
    """Create visualizations"""

    print("\n" + "="*70)
    print("CREATING VISUALIZATIONS")
    print("="*70)

    fig, axes = plt.subplots(2, 2, figsize=(16, 12))
    fig.suptitle('ISO/IEC 27002:2022 CPSS Compliance Assessment', fontsize=16, fontweight='bold')

    # 1. Compliance by category
    ax1 = axes[0, 0]
    if 'category' in results_df.columns and results_df['category'].nunique() > 0:
        compliance_by_cat = results_df.groupby('category')['compliance_rate'].mean() * 100
        compliance_by_cat.plot(kind='bar', ax=ax1, color='#3B82F6')
        ax1.set_title('Compliance Rate by Category')
        ax1.set_ylabel('Compliance Rate (%)')
        ax1.grid(axis='y', alpha=0.3)
    else:
        ax1.text(0.5, 0.5, 'No category data available', ha='center', va='center', transform=ax1.transAxes)
        ax1.set_title('Compliance by Category (N/A)')

    # 2. Control compliance
    ax2 = axes[0, 1]
    controls = ['A.5.15', 'A.8.2', 'A.8.5', 'A.8.9', 'A.8.21', 'A.8.22', 'A.8.23', 'A.8.24', 'A.8.26']
    control_rates = []
    control_labels = []

    for control in controls:
        col = f'{control}_compliant'
        if col in results_df.columns:
            rate = results_df[col].sum() / len(results_df) * 100
            control_rates.append(rate)
            control_labels.append(control)

    if control_rates:
        ax2.barh(control_labels, control_rates, color='#10B981')
        ax2.set_xlabel('Compliance Rate (%)')
        ax2.set_title('Compliance by Control')
        ax2.grid(axis='x', alpha=0.3)

    # 3. Risk distribution
    ax3 = axes[1, 0]
    results_df['overall_risk_score'].hist(bins=20, ax=ax3, color='#F59E0B', edgecolor='black')
    ax3.set_xlabel('Risk Score')
    ax3.set_ylabel('Count')
    ax3.set_title('Risk Score Distribution')
    ax3.axvline(x=results_df['overall_risk_score'].mean(), color='red', linestyle='--',
                label=f'Mean: {results_df["overall_risk_score"].mean():.2f}')
    ax3.legend()
    ax3.grid(axis='y', alpha=0.3)

    # 4. Compliance summary
    ax4 = axes[1, 1]
    compliant_count = results_df['overall_compliant'].sum()
    non_compliant = len(results_df) - compliant_count

    ax4.pie([compliant_count, non_compliant],
            labels=['Compliant', 'Non-Compliant'],
            autopct='%1.1f%%',
            colors=['#10B981', '#EF4444'])
    ax4.set_title('Overall Compliance')

    plt.tight_layout()

    output_file = OUTPUT_DIR / 'cpss_iso27002_assessment.png'
    plt.savefig(output_file, dpi=300, bbox_inches='tight')
    print(f"Saved: {output_file.name}")
    plt.close()

def generate_report(results_df):
    """Generate text report"""

    report_file = OUTPUT_DIR / 'cpss_iso27002_report.txt'

    with open(report_file, 'w', encoding='utf-8') as f:
        f.write("="*70 + "\n")
        f.write("ISO/IEC 27002:2022 CPSS COMPLIANCE ASSESSMENT\n")
        f.write("="*70 + "\n")
        f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"Total devices: {len(results_df):,}\n")
        f.write("="*70 + "\n\n")

        f.write(f"Overall compliance: {results_df['compliance_rate'].mean()*100:.1f}%\n")
        f.write(f"Fully compliant: {results_df['overall_compliant'].sum():,}\n")
        f.write(f"Average risk score: {results_df['overall_risk_score'].mean():.2f}/10\n\n")

        # Control details
        controls = ['A.5.15', 'A.8.2', 'A.8.5', 'A.8.9', 'A.8.21', 'A.8.22', 'A.8.23', 'A.8.24', 'A.8.26']

        f.write("COMPLIANCE BY CONTROL:\n")
        f.write("-"*70 + "\n")

        for control in controls:
            col = f'{control}_compliant'
            if col in results_df.columns:
                rate = results_df[col].sum() / len(results_df) * 100
                f.write(f"{control} ({ISO_CONTROLS[control]['name']}): {rate:.1f}%\n")

        f.write("\n" + "="*70 + "\n")

    print(f"Saved: {report_file.name}")

## Run

In [10]:
# ============================================================================
# MAIN
# ============================================================================

if __name__ == '__main__':
    print("\n" + "="*70)
    print("ANALYSES 3: ISO 27002 ASSESSMENT")
    print("="*70)

    # Load data
    print(f"\nLoading: {INPUT_FILE}")
    df = pd.read_csv(INPUT_FILE)
    print(f"Loaded {len(df):,} devices")

    # Run assessment
    results_df = run_assessment(df)

    # Save results
    output_file = OUTPUT_DIR / 'cpss_iso27002_assessment.csv'
    results_df.to_csv(output_file, index=False)
    print(f"\nSaved: {output_file.name}")

    # Analytics
    generate_summary_statistics(results_df)
    create_visualizations(results_df)
    generate_report(results_df)

    print("\n" + "="*70)
    print("COMPLETE")
    print("="*70)
    print(f"\nOutputs in: {OUTPUT_DIR}")
    print("="*70 + "\n")


ANALYSES 3: ISO 27002 ASSESSMENT

Loading: output\2_cpss_identification\cpss_all_services_enhanced.csv
Loaded 3,289 devices

ISO/IEC 27002:2022 CONTROL ASSESSMENT

Assessing 3,289 CPSS devices...
  Processed 500/3,289 devices...
  Processed 1,000/3,289 devices...
  Processed 1,500/3,289 devices...
  Processed 2,000/3,289 devices...
  Processed 2,500/3,289 devices...
  Processed 3,000/3,289 devices...

Assessment complete
  Devices assessed: 3,289
  Average compliance: 44.6%

Saved: cpss_iso27002_assessment.csv

SUMMARY STATISTICS

Total devices: 3,289
Average compliance: 44.6%
Fully compliant: 0

By Category:
  Unknown: 435 devices, 42.7% compliant
  EACS: 18 devices, 55.6% compliant
  VSS: 2,491 devices, 44.9% compliant
  I&HAS: 345 devices, 44.3% compliant

CREATING VISUALIZATIONS
Saved: cpss_iso27002_assessment.png
Saved: cpss_iso27002_report.txt

COMPLETE

Outputs in: output\3_iso27002_assessment

