# Municipal Mergers Analysis (Gemeindefusionen)

This notebook analyzes historical changes in Swiss municipalities (Gemeinde) and creates a mapping system to align historical voting data with current municipal structures. This allows comparison of voting patterns across time despite structural changes.

In [None]:
import pandas as pd
import json
from pathlib import Path
from datetime import datetime
import numpy as np
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')

## 1. Load Municipal Changes Data

In [None]:
# Load the Excel file with municipal changes
xlsx_path = Path('data/Mutierte_Gemeinden.xlsx')

# Read all sheets to understand structure
excel_file = pd.ExcelFile(xlsx_path)
print(f"Excel file sheets: {excel_file.sheet_names}")

# Read the first sheet (or all if multiple)
mutations_df = pd.read_excel(xlsx_path, sheet_name=0)
print(f"\nShape: {mutations_df.shape}")
print(f"\nColumns: {list(mutations_df.columns)}")
print(f"\nFirst 5 rows:")
mutations_df.head()

In [None]:
# Examine data types and null values
print("Data types and null values:")
print(mutations_df.info())
print("\nSample data:")
mutations_df.sample(min(10, len(mutations_df)))

# Extract all municipalities from recent voting
def extract_gemeinden(voting_data):
    """Extract all municipalities (Gemeinden) from voting data"""
    gemeinden = {}
    
    # Check if there's a 'gemeinden' array in the first vorlage
    if 'schweiz' in voting_data and 'vorlagen' in voting_data['schweiz']:
        for vorlage in voting_data['schweiz']['vorlagen']:
            if 'gemeinden' in vorlage:
                for gemeinde in vorlage['gemeinden']:
                    gemeinde_id = gemeinde.get('geoLevelnummer', 'Unknown')
                    gemeinde_name = gemeinde.get('geoLevelname', 'Unknown')
                    parent_id = gemeinde.get('geoLevelParentnummer', 'Unknown')
                    
                    # Find canton and district from parent relationships
                    kanton_name = 'Unknown'
                    bezirk_name = 'Unknown'
                    
                    # Try to find the bezirk (district) from the parent number
                    if 'bezirke' in vorlage:
                        for bezirk in vorlage['bezirke']:
                            if bezirk.get('geoLevelnummer') == parent_id:
                                bezirk_name = bezirk.get('geoLevelname', 'Unknown')
                                break
                    
                    # Try to find the canton
                    if 'kantone' in vorlage:
                        for kanton in vorlage['kantone']:
                            # Check if this gemeinde belongs to this canton
                            # Based on the parent bezirk
                            if 'bezirke' in kanton:
                                for bezirk in kanton['bezirke']:
                                    if bezirk.get('geoLevelnummer') == parent_id:
                                        kanton_name = kanton.get('geoLevelname', 'Unknown')
                                        bezirk_name = bezirk.get('geoLevelname', 'Unknown')
                                        break
                    
                    gemeinden[gemeinde_id] = {
                        'name': gemeinde_name,
                        'kanton': kanton_name,
                        'bezirk': bezirk_name,
                        'parent_id': parent_id,
                        'data': gemeinde
                    }
                
                # Only process first vorlage (all should have same gemeinden structure)
                break
    
    return gemeinden

# Extract municipalities from recent voting
if recent_file:
    current_gemeinden = extract_gemeinden(recent_voting)
    print(f"Found {len(current_gemeinden)} municipalities in recent voting")
    
    if len(current_gemeinden) == 0:
        print("\\nTrying alternative structure...")
        # Alternative: gemeinden might be at top level
        if 'gemeinden' in recent_voting:
            for gemeinde in recent_voting['gemeinden']:
                gemeinde_id = gemeinde.get('geoLevelnummer', 'Unknown')
                gemeinde_name = gemeinde.get('geoLevelname', 'Unknown')
                current_gemeinden[gemeinde_id] = {
                    'name': gemeinde_name,
                    'kanton': 'Unknown',
                    'bezirk': 'Unknown',
                    'data': gemeinde
                }
            print(f"Found {len(current_gemeinden)} municipalities at top level")
    
    # Show sample
    for gid, gdata in list(current_gemeinden.items())[:5]:
        print(f"  {gid}: {gdata['name']} (Parent: {gdata.get('parent_id', 'N/A')})")

In [None]:
# Load a recent voting file to understand municipal structure
votes_dir = Path('data/votes')
voting_files = sorted(votes_dir.glob('*.json'))

# Get the most recent file
recent_file = voting_files[-1] if voting_files else None
print(f"Loading recent voting file: {recent_file.name if recent_file else 'None'}")

if recent_file:
    with open(recent_file, 'r', encoding='utf-8') as f:
        recent_voting = json.load(f)
    
    print(f"Voting date: {recent_voting.get('abstimmtag', 'N/A')}")
    print(f"Spatial references: {recent_voting.get('spatial_reference', [])}")

In [None]:
# Extract all municipalities from recent voting
def extract_gemeinden(voting_data):
    """Extract all municipalities (Gemeinden) from voting data"""
    gemeinden = {}
    
    # Navigate through the hierarchical structure
    if 'kantone' in voting_data:
        for kanton in voting_data['kantone']:
            kanton_name = kanton.get('geoLevelname', 'Unknown')
            
            if 'bezirke' in kanton:
                for bezirk in kanton['bezirke']:
                    bezirk_name = bezirk.get('geoLevelname', 'Unknown')
                    
                    if 'gemeinden' in bezirk:
                        for gemeinde in bezirk['gemeinden']:
                            gemeinde_id = gemeinde.get('geoLevelnummer', 'Unknown')
                            gemeinde_name = gemeinde.get('geoLevelname', 'Unknown')
                            gemeinden[gemeinde_id] = {
                                'name': gemeinde_name,
                                'kanton': kanton_name,
                                'bezirk': bezirk_name,
                                'data': gemeinde
                            }
    
    return gemeinden

# Extract municipalities from recent voting
if recent_file:
    current_gemeinden = extract_gemeinden(recent_voting)
    print(f"Found {len(current_gemeinden)} municipalities in recent voting")
    
    # Show sample
    for gid, gdata in list(current_gemeinden.items())[:5]:
        print(f"  {gid}: {gdata['name']} ({gdata['kanton']} / {gdata['bezirk']})")

## 3. Analyze Municipal Changes Pattern

In [None]:
# Analyze the structure of mutations data
# This will vary based on actual Excel structure
print("Unique values analysis:")

for col in mutations_df.columns:
    unique_count = mutations_df[col].nunique()
    if unique_count < 20:  # Only show columns with reasonable number of unique values
        print(f"\n{col}: {unique_count} unique values")
        print(f"  Sample: {mutations_df[col].dropna().unique()[:10]}")

In [None]:
# Try to identify date columns and merger patterns
date_columns = []
id_columns = []
name_columns = []

for col in mutations_df.columns:
    col_lower = col.lower()
    
    # Identify different column types
    if 'datum' in col_lower or 'date' in col_lower or 'jahr' in col_lower:
        date_columns.append(col)
    elif 'id' in col_lower or 'nummer' in col_lower or 'bfs' in col_lower:
        id_columns.append(col)
    elif 'name' in col_lower or 'gemeinde' in col_lower:
        name_columns.append(col)

print(f"Date columns: {date_columns}")
print(f"ID columns: {id_columns}")
print(f"Name columns: {name_columns}")

# Display sample of these columns
relevant_cols = date_columns + id_columns + name_columns
if relevant_cols:
    print("\nSample of relevant columns:")
    mutations_df[relevant_cols].head(10)

## 4. Build Municipal Mapping System

In [None]:
# Create a mapping system for municipal changes
# This structure maps old municipality IDs to current ones

class MunicipalMapper:
    def __init__(self):
        self.mappings = {}  # {date: {old_id: new_id}}
        self.mergers = []   # List of merger events
        self.current_municipalities = set()  # Current valid municipality IDs
        
    def add_merger(self, date, old_ids, new_id, description=""):
        """Record a municipal merger event"""
        self.mergers.append({
            'date': date,
            'old_ids': old_ids,
            'new_id': new_id,
            'description': description
        })
        
        # Update mappings
        if date not in self.mappings:
            self.mappings[date] = {}
        
        for old_id in old_ids:
            self.mappings[date][old_id] = new_id
    
    def map_to_current(self, municipality_id, reference_date):
        """Map a historical municipality to current structure"""
        # Start with the original ID
        current_id = municipality_id
        
        # Apply all mappings after the reference date
        for date in sorted(self.mappings.keys()):
            if date > reference_date and current_id in self.mappings[date]:
                current_id = self.mappings[date][current_id]
        
        return current_id
    
    def get_mapping_for_date(self, reference_date):
        """Get complete mapping for a specific date"""
        mapping = {}
        
        # This would need to be implemented based on actual data structure
        # For now, return a placeholder
        return mapping

mapper = MunicipalMapper()
print("Municipal mapper initialized")

In [None]:
# Create a comprehensive mapping table
def create_mapping_table(mutations_df, voting_files):
    """Create a comprehensive mapping table for all dates"""
    
    mapping_records = []
    
    # Process each voting file to extract municipality lists
    for voting_file in voting_files[:5]:  # Limit to first 5 for testing
        try:
            with open(voting_file, 'r', encoding='utf-8') as f:
                voting_data = json.load(f)
            
            date = voting_data.get('abstimmtag', '')
            gemeinden = extract_gemeinden(voting_data)
            
            # If no gemeinden found in vorlagen, check at top level
            if len(gemeinden) == 0 and 'gemeinden' in voting_data:
                for gemeinde in voting_data['gemeinden']:
                    gemeinde_id = gemeinde.get('geoLevelnummer', 'Unknown')
                    gemeinde_name = gemeinde.get('geoLevelname', 'Unknown')
                    gemeinden[gemeinde_id] = {
                        'name': gemeinde_name,
                        'kanton': 'Unknown',
                        'bezirk': 'Unknown',
                        'data': gemeinde
                    }
            
            for gid, gdata in gemeinden.items():
                mapping_records.append({
                    'date': date,
                    'municipality_id': gid,
                    'municipality_name': gdata['name'],
                    'canton': gdata.get('kanton', 'Unknown'),
                    'district': gdata.get('bezirk', 'Unknown'),
                    'parent_id': gdata.get('parent_id', 'Unknown')
                })
        
        except Exception as e:
            print(f"Error processing {voting_file.name}: {e}")
    
    # Create DataFrame
    mapping_df = pd.DataFrame(mapping_records)
    
    return mapping_df

# Create the mapping table
print("Creating mapping table (this may take a moment)...")
mapping_df = create_mapping_table(mutations_df, voting_files)

if not mapping_df.empty:
    print(f"\\nMapping table created:")
    print(f"  Shape: {mapping_df.shape}")
    print(f"  Date range: {mapping_df['date'].min()} to {mapping_df['date'].max()}")
    print(f"  Unique municipalities: {mapping_df['municipality_id'].nunique()}")
    
    # Show sample
    print("\\nSample entries:")
    mapping_df.head()

## 5. Load Historical Voting Data for Validation

In [None]:
# Load oldest voting file to compare with current
oldest_file = voting_files[0] if voting_files else None

if oldest_file:
    print(f"Loading oldest voting file: {oldest_file.name}")
    
    with open(oldest_file, 'r', encoding='utf-8') as f:
        oldest_voting = json.load(f)
    
    oldest_date = oldest_voting.get('abstimmtag', 'N/A')
    print(f"Oldest voting date: {oldest_date}")
    
    # Extract municipalities from oldest voting
    old_gemeinden = extract_gemeinden(oldest_voting)
    print(f"Found {len(old_gemeinden)} municipalities in oldest voting")
    
    # Compare with current
    if 'current_gemeinden' in locals():
        old_ids = set(old_gemeinden.keys())
        current_ids = set(current_gemeinden.keys())
        
        disappeared = old_ids - current_ids
        appeared = current_ids - old_ids
        
        print(f"\nChanges from {oldest_date} to {recent_voting.get('abstimmtag', 'N/A')}:")
        print(f"  Disappeared municipalities: {len(disappeared)}")
        print(f"  New municipalities: {len(appeared)}")
        print(f"  Unchanged: {len(old_ids & current_ids)}")

## 6. Create Comprehensive Mapping Table

In [None]:
def validate_mapping(voting_file_path, mapping):
    """Validate that all municipalities in a voting file can be mapped"""
    
    with open(voting_file_path, 'r', encoding='utf-8') as f:
        voting_data = json.load(f)
    
    date = voting_data.get('abstimmtag', '')
    gemeinden = extract_gemeinden(voting_data)
    
    # If no gemeinden found in vorlagen, check at top level
    if len(gemeinden) == 0 and 'gemeinden' in voting_data:
        for gemeinde in voting_data['gemeinden']:
            gemeinde_id = gemeinde.get('geoLevelnummer', 'Unknown')
            gemeinde_name = gemeinde.get('geoLevelname', 'Unknown')
            gemeinden[gemeinde_id] = {
                'name': gemeinde_name,
                'kanton': 'Unknown',
                'bezirk': 'Unknown',
                'data': gemeinde
            }
    
    validation_results = {
        'date': date,
        'file': voting_file_path.name,
        'total_municipalities': len(gemeinden),
        'mapped': 0,
        'unmapped': 0,
        'unmapped_list': []
    }
    
    # Check mapping for each municipality
    date_mapping = mapping.get(date, {})
    
    for gid, gdata in gemeinden.items():
        if gid in date_mapping and date_mapping[gid] is not None:
            validation_results['mapped'] += 1
        else:
            validation_results['unmapped'] += 1
            validation_results['unmapped_list'].append({
                'id': gid,
                'name': gdata['name'],
                'canton': gdata.get('kanton', 'Unknown')
            })
    
    return validation_results

# Validate a few files
print("Validating mapping for sample files...\\n")

for voting_file in voting_files[:3]:  # Validate first 3 files
    results = validate_mapping(voting_file, historical_mapping)
    
    print(f"File: {results['file']}")
    print(f"  Date: {results['date']}")
    print(f"  Total municipalities: {results['total_municipalities']}")
    
    if results['total_municipalities'] > 0:
        print(f"  Mapped: {results['mapped']} ({results['mapped']/results['total_municipalities']*100:.1f}%)")
        print(f"  Unmapped: {results['unmapped']}")
        
        if results['unmapped_list']:
            print(f"  First 5 unmapped:")
            for unmapped in results['unmapped_list'][:5]:
                print(f"    - {unmapped['id']}: {unmapped['name']} ({unmapped['canton']})")
    else:
        print("  No municipalities found in this file")
    print()

## 7. Build Historical-to-Current Mapping

In [None]:
# Build the actual mapping from historical to current municipalities
def build_historical_mapping(mapping_df):
    """Build mapping from historical to current municipalities"""
    
    # Get the most recent date
    if mapping_df.empty:
        return {}
    
    recent_date = mapping_df['date'].max()
    
    # Get current municipalities
    current_municipalities = set(
        mapping_df[mapping_df['date'] == recent_date]['municipality_id'].unique()
    )
    
    # Build mapping for each date
    historical_mapping = {}
    
    for date in mapping_df['date'].unique():
        date_municipalities = set(
            mapping_df[mapping_df['date'] == date]['municipality_id'].unique()
        )
        
        # Find disappeared municipalities
        disappeared = date_municipalities - current_municipalities
        
        # Create mapping (for now, simple identity mapping)
        # This needs to be enhanced with actual merger information
        date_mapping = {}
        for mid in date_municipalities:
            if mid in current_municipalities:
                date_mapping[mid] = mid  # Municipality still exists
            else:
                date_mapping[mid] = None  # Needs merger information
        
        historical_mapping[date] = date_mapping
    
    return historical_mapping

# Build the mapping
historical_mapping = build_historical_mapping(mapping_df)

print(f"Built historical mapping for {len(historical_mapping)} dates")

# Analyze unmapped municipalities
for date, mapping in list(historical_mapping.items())[:3]:  # Show first 3 dates
    unmapped = sum(1 for v in mapping.values() if v is None)
    total = len(mapping)
    print(f"  {date}: {unmapped}/{total} municipalities need mapping")

def aggregate_voting_results(voting_data, mapping):
    """Aggregate historical voting results to current municipal structure"""
    
    date = voting_data.get('abstimmtag', '')
    date_mapping = mapping.get('mappings', {}).get(date, {})
    
    # Aggregate results
    aggregated = defaultdict(lambda: {
        'ja_stimmen': 0,
        'nein_stimmen': 0,
        'stimmberechtigte': 0,
        'gueltige_stimmen': 0,
        'source_municipalities': []
    })
    
    gemeinden = extract_gemeinden(voting_data)
    
    # If no gemeinden found in vorlagen, check at top level
    if len(gemeinden) == 0 and 'gemeinden' in voting_data:
        for gemeinde in voting_data['gemeinden']:
            gemeinde_id = gemeinde.get('geoLevelnummer', 'Unknown')
            gemeinde_name = gemeinde.get('geoLevelname', 'Unknown')
            gemeinden[gemeinde_id] = {
                'name': gemeinde_name,
                'data': gemeinde
            }
    
    for old_id, gemeinde_data in gemeinden.items():
        # Get current municipality ID
        current_id = date_mapping.get(old_id, old_id)  # Default to same ID if no mapping
        
        if current_id:
            # Aggregate voting results
            results = gemeinde_data.get('data', {}).get('resultat', {})
            
            aggregated[current_id]['ja_stimmen'] += results.get('jaStimmenAbsolut', 0)
            aggregated[current_id]['nein_stimmen'] += results.get('neinStimmenAbsolut', 0)
            aggregated[current_id]['stimmberechtigte'] += results.get('anzahlStimmberechtigte', 0)
            aggregated[current_id]['gueltige_stimmen'] += results.get('gueltigeStimmen', 0)
            aggregated[current_id]['source_municipalities'].append({
                'id': old_id,
                'name': gemeinde_data.get('name', 'Unknown')
            })
    
    # Calculate percentages
    for current_id, data in aggregated.items():
        if data['gueltige_stimmen'] > 0:
            data['ja_prozent'] = (data['ja_stimmen'] / data['gueltige_stimmen']) * 100
        else:
            data['ja_prozent'] = 0
        
        if data['stimmberechtigte'] > 0:
            data['stimmbeteiligung'] = (data['gueltige_stimmen'] / data['stimmberechtigte']) * 100
        else:
            data['stimmbeteiligung'] = 0
    
    return dict(aggregated)

print("Aggregation function ready for use")
print("\\nExample usage:")
print("  aggregated = aggregate_voting_results(voting_data, final_mapping)")
print("\\nThis function will:")
print("  - Map historical municipalities to current ones")
print("  - Aggregate voting results for merged municipalities")
print("  - Calculate percentages for the aggregated data")

In [None]:
def validate_mapping(voting_file_path, mapping):
    """Validate that all municipalities in a voting file can be mapped"""
    
    with open(voting_file_path, 'r', encoding='utf-8') as f:
        voting_data = json.load(f)
    
    date = voting_data.get('abstimmtag', '')
    gemeinden = extract_gemeinden(voting_data)
    
    validation_results = {
        'date': date,
        'file': voting_file_path.name,
        'total_municipalities': len(gemeinden),
        'mapped': 0,
        'unmapped': 0,
        'unmapped_list': []
    }
    
    # Check mapping for each municipality
    date_mapping = mapping.get(date, {})
    
    for gid, gdata in gemeinden.items():
        if gid in date_mapping and date_mapping[gid] is not None:
            validation_results['mapped'] += 1
        else:
            validation_results['unmapped'] += 1
            validation_results['unmapped_list'].append({
                'id': gid,
                'name': gdata['name'],
                'canton': gdata['kanton']
            })
    
    return validation_results

# Validate a few files
print("Validating mapping for sample files...\n")

for voting_file in voting_files[:3]:  # Validate first 3 files
    results = validate_mapping(voting_file, historical_mapping)
    
    print(f"File: {results['file']}")
    print(f"  Date: {results['date']}")
    print(f"  Total municipalities: {results['total_municipalities']}")
    print(f"  Mapped: {results['mapped']} ({results['mapped']/results['total_municipalities']*100:.1f}%)")
    print(f"  Unmapped: {results['unmapped']}")
    
    if results['unmapped_list']:
        print(f"  First 5 unmapped:")
        for unmapped in results['unmapped_list'][:5]:
            print(f"    - {unmapped['id']}: {unmapped['name']} ({unmapped['canton']})")
    print()

## 9. Export Mapping for Future Use

In [None]:
# Create final mapping structure for export
final_mapping = {
    'metadata': {
        'created_at': datetime.now().isoformat(),
        'description': 'Mapping of historical Swiss municipalities to current structure',
        'note': 'Maps municipality IDs from historical voting data to current (most recent) IDs'
    },
    'mappings': historical_mapping,
    'statistics': {
        'dates_covered': len(historical_mapping),
        'date_range': {
            'start': min(historical_mapping.keys()) if historical_mapping else None,
            'end': max(historical_mapping.keys()) if historical_mapping else None
        }
    }
}

# Save to JSON
output_path = Path('data/municipality_mapping.json')

try:
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(final_mapping, f, ensure_ascii=False, indent=2)
    
    file_size_kb = output_path.stat().st_size / 1024
    print(f"✓ Mapping saved to {output_path}")
    print(f"  File size: {file_size_kb:.2f} KB")
    
except Exception as e:
    print(f"✗ Error saving mapping: {e}")

In [None]:
# Create a summary report
print("\n" + "="*60)
print("MUNICIPAL MAPPING SUMMARY")
print("="*60)

if not mapping_df.empty:
    # Analyze changes over time
    municipality_counts = mapping_df.groupby('date')['municipality_id'].nunique().sort_index()
    
    print(f"\nMunicipality count evolution:")
    for date, count in municipality_counts.items():
        print(f"  {date}: {count} municipalities")
    
    print(f"\nTotal change: {municipality_counts.iloc[0]} → {municipality_counts.iloc[-1]}")
    print(f"Net reduction: {municipality_counts.iloc[0] - municipality_counts.iloc[-1]} municipalities")

print("\n" + "="*60)
print("NEXT STEPS:")
print("="*60)
print("1. Review unmapped municipalities in validation results")
print("2. Enhance mapping with actual merger information from Excel file")
print("3. Use municipality_mapping.json to aggregate historical voting data")
print("4. Test with full dataset to ensure all municipalities are properly mapped")

## 10. Helper Functions for Data Aggregation

In [None]:
def aggregate_voting_results(voting_data, mapping):
    """Aggregate historical voting results to current municipal structure"""
    
    date = voting_data.get('abstimmtag', '')
    date_mapping = mapping.get('mappings', {}).get(date, {})
    
    # Aggregate results
    aggregated = defaultdict(lambda: {
        'ja_stimmen': 0,
        'nein_stimmen': 0,
        'stimmberechtigte': 0,
        'gueltige_stimmen': 0,
        'source_municipalities': []
    })
    
    gemeinden = extract_gemeinden(voting_data)
    
    for old_id, gemeinde_data in gemeinden.items():
        # Get current municipality ID
        current_id = date_mapping.get(old_id, old_id)  # Default to same ID if no mapping
        
        if current_id:
            # Aggregate voting results
            results = gemeinde_data.get('data', {}).get('resultat', {})
            
            aggregated[current_id]['ja_stimmen'] += results.get('jaStimmenAbsolut', 0)
            aggregated[current_id]['nein_stimmen'] += results.get('neinStimmenAbsolut', 0)
            aggregated[current_id]['stimmberechtigte'] += results.get('anzahlStimmberechtigte', 0)
            aggregated[current_id]['gueltige_stimmen'] += results.get('gueltigeStimmen', 0)
            aggregated[current_id]['source_municipalities'].append({
                'id': old_id,
                'name': gemeinde_data['name']
            })
    
    # Calculate percentages
    for current_id, data in aggregated.items():
        if data['gueltige_stimmen'] > 0:
            data['ja_prozent'] = (data['ja_stimmen'] / data['gueltige_stimmen']) * 100
        else:
            data['ja_prozent'] = 0
        
        if data['stimmberechtigte'] > 0:
            data['stimmbeteiligung'] = (data['gueltige_stimmen'] / data['stimmberechtigte']) * 100
        else:
            data['stimmbeteiligung'] = 0
    
    return dict(aggregated)

print("Aggregation function ready for use")
print("\nExample usage:")
print("  aggregated = aggregate_voting_results(voting_data, final_mapping)")