# Notebook 11: RDLS Vulnerability & Loss Block Extractor

**Purpose**: Extract and populate RDLS v0.3 Vulnerability and Loss component blocks from HDX metadata.

**Note**: Vulnerability and Loss data are less common in HDX. This notebook focuses on:
1. Detecting V/L signals where they exist
2. Building partial blocks with available information
3. Flagging records that may benefit from manual enrichment

**RDLS Vulnerability Block Structure**:
```json
"vulnerability": {
  "functions": {
    "vulnerability": [...],
    "fragility": [...],
    "damage_to_loss": [...]
  },
  "socio_economic": [...]
}
```

**RDLS Loss Block Structure**:
```json
"loss": {
  "losses": [{
    "id": "...",
    "hazard_type": "...",
    "asset_category": "...",
    "asset_dimension": "...",
    "impact_and_losses": {...}
  }]
}
```

**Author**: Benny Istanto/Risk Data Librarian/GFDRR  
**Version**: 2026.1

---

## 1. Setup

In [None]:
"""
1.1 Import Dependencies
"""

import json
import re
import yaml
from pathlib import Path
from collections import Counter
from datetime import datetime
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field, asdict

import pandas as pd
import numpy as np

try:
    from tqdm.notebook import tqdm
    HAS_TQDM = True
except ImportError:
    HAS_TQDM = False

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 120)

print(f"Notebook started: {datetime.now().isoformat()}")

In [None]:
"""
1.2 Paths and Configuration
"""

NOTEBOOK_DIR = Path.cwd()
BASE_DIR = NOTEBOOK_DIR.parent if NOTEBOOK_DIR.name == 'notebook' else NOTEBOOK_DIR

DATASET_METADATA_DIR = BASE_DIR / 'hdx_dataset_metadata_dump' / 'dataset_metadata'
SIGNAL_DICT_PATH = BASE_DIR / 'hdx_dataset_metadata_dump' / 'config' / 'signal_dictionary.yaml'
RDLS_SCHEMA_PATH = BASE_DIR / 'hdx_dataset_metadata_dump' / 'rdls' / 'schema' / 'rdls_schema_v0.3.json'

OUTPUT_DIR = BASE_DIR / 'hdx_dataset_metadata_dump' / 'rdls' / 'extracted'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

with open(SIGNAL_DICT_PATH, 'r', encoding='utf-8') as f:
    SIGNAL_DICT = yaml.safe_load(f)

with open(RDLS_SCHEMA_PATH, 'r', encoding='utf-8') as f:
    RDLS_SCHEMA = json.load(f)

print(f"Configuration loaded.")

## 2. Vulnerability & Loss Detection Patterns

In [None]:
"""
2.1 Define Detection Patterns

Vulnerability and Loss data are specialized. These patterns detect
the presence of such data rather than extracting detailed parameters.
"""

# Vulnerability function indicators
VULNERABILITY_FUNCTION_PATTERNS = {
    'vulnerability_curve': [
        r'\b(vulnerability.?curve|vulnerability.?function|damage.?curve)\b',
        r'\b(mean.?damage.?ratio|mdr)\b',
        r'\b(damage.?state|ds\d)\b',
    ],
    'fragility_curve': [
        r'\b(fragility.?curve|fragility.?function)\b',
        r'\b(probability.?of.?damage|failure.?probability)\b',
        r'\b(capacity.?spectrum|pushover)\b',
    ],
    'damage_to_loss': [
        r'\b(damage.?to.?loss|loss.?function|consequence.?function)\b',
        r'\b(repair.?cost|replacement.?cost)\b',
    ],
}

# Socio-economic vulnerability indicators
SOCIOECONOMIC_PATTERNS = [
    r'\b(social.?vulnerability|sovi|svi)\b',
    r'\b(socio.?economic.?vulnerability|socioeconomic)\b',
    r'\b(coping.?capacity|adaptive.?capacity)\b',
    r'\b(resilience.?index|vulnerability.?index)\b',
    r'\b(livelihood|poverty|deprivation)\b',
]

# Loss data indicators
LOSS_PATTERNS = {
    'economic_loss': [
        r'\b(economic.?loss|financial.?loss|monetary.?loss)\b',
        r'\b(damage.?cost|loss.?cost|loss.?estimate)\b',
        r'\b(aal|average.?annual.?loss)\b',
        r'\b(probable.?maximum.?loss|pml)\b',
    ],
    'human_loss': [
        r'\b(casualty|fatality|mortality|death)\b',
        r'\b(injury|injured|wounded)\b',
        r'\b(displaced|homeless|evacuated)\b',
        r'\b(affected.?population|people.?affected)\b',
    ],
    'physical_damage': [
        r'\b(building.?damage|structural.?damage)\b',
        r'\b(damage.?assessment|damage.?survey)\b',
        r'\b(destroyed|collapsed|damaged)\b',
    ],
}

# Impact type patterns
IMPACT_TYPE_PATTERNS = {
    'direct': [r'\b(direct.?loss|direct.?damage|direct.?impact)\b'],
    'indirect': [r'\b(indirect.?loss|business.?interruption|downtime)\b'],
    'total': [r'\b(total.?loss|combined.?loss|aggregate)\b'],
}

print("Vulnerability/Loss patterns defined.")

In [None]:
"""
2.2 Data Classes
"""

@dataclass
class VulnerabilityExtraction:
    """Vulnerability extraction results."""
    has_vulnerability_functions: bool = False
    function_types: List[str] = field(default_factory=list)
    has_socioeconomic: bool = False
    socioeconomic_hints: List[str] = field(default_factory=list)
    confidence: float = 0.0

@dataclass
class LossExtraction:
    """Loss extraction results."""
    has_loss_data: bool = False
    loss_types: List[str] = field(default_factory=list)
    impact_type: Optional[str] = None
    hazard_type: Optional[str] = None
    asset_category: Optional[str] = None
    confidence: float = 0.0

@dataclass
class VulnLossExtraction:
    """Combined V/L extraction."""
    vulnerability: VulnerabilityExtraction = field(default_factory=VulnerabilityExtraction)
    loss: LossExtraction = field(default_factory=LossExtraction)
    overall_confidence: float = 0.0
    
    def has_any_signal(self) -> bool:
        return (self.vulnerability.has_vulnerability_functions or 
                self.vulnerability.has_socioeconomic or
                self.loss.has_loss_data)

print("Data classes defined.")

In [None]:
"""
2.3 Vulnerability/Loss Extractor Class
"""

class VulnLossExtractor:
    """
    Extracts RDLS Vulnerability and Loss signals from HDX metadata.
    
    Due to the specialized nature of V/L data, this extractor focuses on:
    - Detecting presence of V/L information
    - Identifying type of V/L data
    - Linking to hazard/exposure context where available
    """
    
    def __init__(self, signal_dict: Dict[str, Any]):
        self.signal_dict = signal_dict
        self._compile_patterns()
    
    def _compile_patterns(self) -> None:
        """Compile regex patterns."""
        self.vuln_func_patterns = {}
        for func_type, patterns in VULNERABILITY_FUNCTION_PATTERNS.items():
            self.vuln_func_patterns[func_type] = [
                re.compile(p, re.IGNORECASE) for p in patterns
            ]
        
        self.socioeconomic_patterns = [
            re.compile(p, re.IGNORECASE) for p in SOCIOECONOMIC_PATTERNS
        ]
        
        self.loss_patterns = {}
        for loss_type, patterns in LOSS_PATTERNS.items():
            self.loss_patterns[loss_type] = [
                re.compile(p, re.IGNORECASE) for p in patterns
            ]
        
        self.impact_patterns = {}
        for impact_type, patterns in IMPACT_TYPE_PATTERNS.items():
            self.impact_patterns[impact_type] = [
                re.compile(p, re.IGNORECASE) for p in patterns
            ]
        
        # Compile hazard patterns from signal dict for context
        self.hazard_patterns = {}
        for hazard, config in self.signal_dict.get('hazard_type', {}).items():
            self.hazard_patterns[hazard] = [
                re.compile(p, re.IGNORECASE) for p in config.get('patterns', [])
            ]
        
        # Compile exposure patterns for context
        self.exposure_patterns = {}
        for cat, config in self.signal_dict.get('exposure_category', {}).items():
            self.exposure_patterns[cat] = [
                re.compile(p, re.IGNORECASE) for p in config.get('patterns', [])
            ]
    
    def _get_all_text(self, record: Dict[str, Any]) -> str:
        """Concatenate all text fields."""
        parts = [
            record.get('title', ''),
            record.get('name', ''),
            record.get('notes', ''),
            ' '.join(record.get('tags', [])),
            record.get('methodology_other', '') or '',
        ]
        for r in record.get('resources', []):
            parts.append(r.get('name', ''))
            parts.append(r.get('description', ''))
        
        return ' '.join(filter(None, parts)).lower()
    
    def _extract_vulnerability(self, text: str) -> VulnerabilityExtraction:
        """Extract vulnerability signals."""
        function_types = []
        socioeconomic_hints = []
        
        # Check vulnerability function patterns
        for func_type, patterns in self.vuln_func_patterns.items():
            for p in patterns:
                if p.search(text):
                    function_types.append(func_type)
                    break
        
        # Check socioeconomic patterns
        for p in self.socioeconomic_patterns:
            match = p.search(text)
            if match:
                socioeconomic_hints.append(match.group(0))
        
        has_functions = len(function_types) > 0
        has_socio = len(socioeconomic_hints) > 0
        
        confidence = 0.0
        if has_functions:
            confidence = 0.8
        elif has_socio:
            confidence = 0.6
        
        return VulnerabilityExtraction(
            has_vulnerability_functions=has_functions,
            function_types=list(set(function_types)),
            has_socioeconomic=has_socio,
            socioeconomic_hints=list(set(socioeconomic_hints))[:5],
            confidence=confidence
        )
    
    def _extract_loss(self, text: str) -> LossExtraction:
        """Extract loss signals."""
        loss_types = []
        
        for loss_type, patterns in self.loss_patterns.items():
            for p in patterns:
                if p.search(text):
                    loss_types.append(loss_type)
                    break
        
        # Detect impact type
        impact_type = None
        for itype, patterns in self.impact_patterns.items():
            for p in patterns:
                if p.search(text):
                    impact_type = itype
                    break
            if impact_type:
                break
        
        # Detect hazard context
        hazard_type = None
        for hazard, patterns in self.hazard_patterns.items():
            for p in patterns:
                if p.search(text):
                    hazard_type = hazard
                    break
            if hazard_type:
                break
        
        # Detect asset category context
        asset_category = None
        for cat, patterns in self.exposure_patterns.items():
            for p in patterns:
                if p.search(text):
                    asset_category = cat
                    break
            if asset_category:
                break
        
        has_loss = len(loss_types) > 0
        confidence = 0.7 if has_loss else 0.0
        if has_loss and hazard_type:
            confidence = 0.85
        
        return LossExtraction(
            has_loss_data=has_loss,
            loss_types=list(set(loss_types)),
            impact_type=impact_type or 'direct',
            hazard_type=hazard_type,
            asset_category=asset_category,
            confidence=confidence
        )
    
    def extract(self, record: Dict[str, Any]) -> VulnLossExtraction:
        """Extract V/L information from record."""
        text = self._get_all_text(record)
        
        vuln = self._extract_vulnerability(text)
        loss = self._extract_loss(text)
        
        overall = max(vuln.confidence, loss.confidence)
        
        return VulnLossExtraction(
            vulnerability=vuln,
            loss=loss,
            overall_confidence=overall
        )

# Initialize
vl_extractor = VulnLossExtractor(SIGNAL_DICT)
print("VulnLossExtractor initialized.")

## 3. RDLS Block Builders

In [None]:
"""
3.1 Build RDLS Vulnerability Block
"""

def build_vulnerability_block(
    extraction: VulnerabilityExtraction,
    dataset_id: str
) -> Optional[Dict[str, Any]]:
    """
    Build RDLS vulnerability block.
    
    Note: Full vulnerability functions require many parameters that
    are typically not available in HDX metadata. This builds a
    minimal/placeholder structure.
    """
    if not extraction.has_vulnerability_functions and not extraction.has_socioeconomic:
        return None
    
    block = {'functions': {}}
    
    # Add function type placeholders
    if extraction.has_vulnerability_functions:
        if 'vulnerability_curve' in extraction.function_types:
            block['functions']['vulnerability'] = []
        if 'fragility_curve' in extraction.function_types:
            block['functions']['fragility'] = []
        if 'damage_to_loss' in extraction.function_types:
            block['functions']['damage_to_loss'] = []
    
    # Add socioeconomic if detected
    if extraction.has_socioeconomic:
        block['socio_economic'] = [{
            'id': f"socio_{dataset_id[:8]}",
            'indicators_detected': extraction.socioeconomic_hints
        }]
    
    return block if block.get('functions') or block.get('socio_economic') else None

print("Vulnerability block builder defined.")

In [None]:
"""
3.2 Build RDLS Loss Block
"""

def build_loss_block(
    extraction: LossExtraction,
    dataset_id: str
) -> Optional[Dict[str, Any]]:
    """
    Build RDLS loss block.
    """
    if not extraction.has_loss_data:
        return None
    
    losses = []
    
    for i, loss_type in enumerate(extraction.loss_types):
        loss_entry = {
            'id': f"loss_{dataset_id[:8]}_{i+1}",
        }
        
        # Add hazard type if detected
        if extraction.hazard_type:
            loss_entry['hazard_type'] = extraction.hazard_type
        
        # Add asset category if detected
        if extraction.asset_category:
            loss_entry['asset_category'] = extraction.asset_category
        else:
            # Default based on loss type
            if loss_type == 'human_loss':
                loss_entry['asset_category'] = 'population'
            elif loss_type == 'physical_damage':
                loss_entry['asset_category'] = 'buildings'
        
        # Add asset dimension
        if loss_type == 'human_loss':
            loss_entry['asset_dimension'] = 'Other'
        elif loss_type == 'economic_loss':
            loss_entry['asset_dimension'] = 'Structure'
        else:
            loss_entry['asset_dimension'] = 'Structure'
        
        # Impact and losses
        loss_entry['impact_and_losses'] = {
            'impact_type': extraction.impact_type or 'direct',
            'loss_type_detected': loss_type
        }
        
        losses.append(loss_entry)
    
    return {'losses': losses} if losses else None

print("Loss block builder defined.")

## 4. Test Extraction

In [None]:
"""
4.1 Find and Load V/L Candidate Records
"""

# Search for files with V/L keywords
vl_keywords = ['vulnerability', 'loss', 'damage', 'impact', 'casualty', 'fatality']

vl_files = []
for kw in vl_keywords:
    vl_files.extend(list(DATASET_METADATA_DIR.glob(f'*{kw}*.json'))[:5])

# Also check for risk assessment files
vl_files.extend(list(DATASET_METADATA_DIR.glob('*risk*.json'))[:10])

# Deduplicate
vl_files = list(set(vl_files))

sample_records = []
for filepath in vl_files[:30]:
    try:
        with open(filepath, 'r', encoding='utf-8') as f:
            sample_records.append(json.load(f))
    except:
        pass

print(f"Loaded {len(sample_records)} potential V/L records.")

In [None]:
"""
4.2 Run Extraction
"""

print("=" * 80)
print("VULNERABILITY/LOSS EXTRACTION RESULTS")
print("=" * 80)

vl_found = 0

for record in sample_records:
    extraction = vl_extractor.extract(record)
    
    if extraction.has_any_signal():
        vl_found += 1
        print(f"\n{'â”€' * 80}")
        print(f"Title: {record.get('title', '')[:70]}")
        print(f"Overall Confidence: {extraction.overall_confidence:.2f}")
        
        if extraction.vulnerability.has_vulnerability_functions:
            print(f"Vulnerability Functions: {extraction.vulnerability.function_types}")
        
        if extraction.vulnerability.has_socioeconomic:
            print(f"Socioeconomic: {extraction.vulnerability.socioeconomic_hints}")
        
        if extraction.loss.has_loss_data:
            print(f"Loss Types: {extraction.loss.loss_types}")
            print(f"  Hazard context: {extraction.loss.hazard_type}")
            print(f"  Asset category: {extraction.loss.asset_category}")

print(f"\n\nSummary: {vl_found}/{len(sample_records)} records have V/L signals")

## 5. Batch Processing

In [None]:
"""
5.1 Process Full Corpus
"""

def process_vl_extraction(
    metadata_dir: Path,
    extractor: VulnLossExtractor,
    limit: Optional[int] = None
) -> pd.DataFrame:
    """Process records for V/L extraction."""
    json_files = list(metadata_dir.glob('*.json'))
    if limit:
        json_files = json_files[:limit]
    
    results = []
    iterator = tqdm(json_files, desc="Extracting V/L") if HAS_TQDM else json_files
    
    for filepath in iterator:
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                record = json.load(f)
            
            extraction = extractor.extract(record)
            
            results.append({
                'id': record.get('id'),
                'title': record.get('title'),
                'organization': record.get('organization'),
                'has_vulnerability': extraction.vulnerability.has_vulnerability_functions,
                'has_socioeconomic': extraction.vulnerability.has_socioeconomic,
                'vuln_function_types': extraction.vulnerability.function_types,
                'has_loss': extraction.loss.has_loss_data,
                'loss_types': extraction.loss.loss_types,
                'loss_hazard': extraction.loss.hazard_type,
                'loss_asset': extraction.loss.asset_category,
                'overall_confidence': extraction.overall_confidence,
                'has_any_vl': extraction.has_any_signal(),
                'extraction': extraction
            })
            
        except Exception as e:
            results.append({'id': filepath.stem, 'error': str(e)})
    
    return pd.DataFrame(results)

PROCESS_LIMIT = 2000  # V/L is rarer, process more

print(f"Processing {PROCESS_LIMIT or 'all'} records...")
df_vl = process_vl_extraction(DATASET_METADATA_DIR, vl_extractor, limit=PROCESS_LIMIT)

In [None]:
"""
5.2 Statistics
"""

print("=" * 60)
print("VULNERABILITY/LOSS EXTRACTION STATISTICS")
print("=" * 60)

total = len(df_vl)

print(f"\nTotal records: {total:,}")
print(f"\nVulnerability:")
print(f"  With vulnerability functions: {df_vl['has_vulnerability'].sum()} ({df_vl['has_vulnerability'].mean()*100:.2f}%)")
print(f"  With socioeconomic indicators: {df_vl['has_socioeconomic'].sum()} ({df_vl['has_socioeconomic'].mean()*100:.2f}%)")

print(f"\nLoss:")
print(f"  With loss data: {df_vl['has_loss'].sum()} ({df_vl['has_loss'].mean()*100:.2f}%)")

print(f"\nCombined:")
print(f"  Any V/L signal: {df_vl['has_any_vl'].sum()} ({df_vl['has_any_vl'].mean()*100:.2f}%)")

# Loss type distribution
loss_types = Counter()
for types in df_vl['loss_types'].dropna():
    if isinstance(types, list):
        loss_types.update(types)

print(f"\nLoss Type Distribution:")
for lt, count in loss_types.most_common():
    print(f"  {lt}: {count}")

## 6. Export Results

In [None]:
"""
6.1 Export Results
"""

export_df = df_vl[[
    'id', 'title', 'organization', 'has_vulnerability', 'has_socioeconomic',
    'vuln_function_types', 'has_loss', 'loss_types', 'loss_hazard',
    'loss_asset', 'overall_confidence', 'has_any_vl'
]].copy()

for col in ['vuln_function_types', 'loss_types']:
    export_df[col] = export_df[col].apply(
        lambda x: '|'.join(x) if isinstance(x, list) else ''
    )

output_file = OUTPUT_DIR / 'vuln_loss_extraction_results.csv'
export_df.to_csv(output_file, index=False)
print(f"Saved: {output_file}")

# Records with V/L signals
vl_records = export_df[export_df['has_any_vl']]
vl_file = OUTPUT_DIR / 'vuln_loss_detected_records.csv'
vl_records.to_csv(vl_file, index=False)
print(f"Saved: {vl_file} ({len(vl_records)} records)")

In [None]:
"""
6.2 Generate Sample RDLS Records
"""

# Get records with highest confidence V/L signals
top_vl = df_vl[
    df_vl['has_any_vl'] & 
    (df_vl['overall_confidence'] >= 0.6)
].nlargest(10, 'overall_confidence')

print(f"\nGenerating {len(top_vl)} sample RDLS V/L records...")

for idx, row in top_vl.iterrows():
    extraction = row['extraction']
    
    rdls_dataset = {
        'id': f"rdls_vln-hdx_{row['id'][:8]}",
        'title': row['title'],
        'risk_data_type': [],
        'links': [{
            'href': 'https://docs.riskdatalibrary.org/en/0__3__0/rdls_schema.json',
            'rel': 'describedby'
        }]
    }
    
    # Add vulnerability block
    if extraction.vulnerability.has_vulnerability_functions or extraction.vulnerability.has_socioeconomic:
        vuln_block = build_vulnerability_block(extraction.vulnerability, row['id'])
        if vuln_block:
            rdls_dataset['vulnerability'] = vuln_block
            rdls_dataset['risk_data_type'].append('vulnerability')
    
    # Add loss block
    if extraction.loss.has_loss_data:
        loss_block = build_loss_block(extraction.loss, row['id'])
        if loss_block:
            rdls_dataset['loss'] = loss_block
            rdls_dataset['risk_data_type'].append('loss')
    
    if rdls_dataset['risk_data_type']:
        rdls_record = {'datasets': [rdls_dataset]}
        
        output_path = OUTPUT_DIR / f"rdls_vln-hdx_{row['id'][:8]}.json"
        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(rdls_record, f, indent=2)
        
        print(f"  Created: {output_path.name}")

print(f"\nDone.")

In [None]:
print(f"\nNotebook completed: {datetime.now().isoformat()}")