In [1]:
# ============================================================
# üöÄ GOOGLE COLAB SETUP - Run this cell first!
# ============================================================
# If running on Google Colab, uncomment and run the lines below.
# Binder users: packages are automatically installed from requirements.txt

# !pip install -q -r https://raw.githubusercontent.com/arunissun/Montandon-Data-Fetching-Examples/master/requirements.txt

# # Set your API token (you'll be prompted to enter it securely)
# import os
# from getpass import getpass
# if 'MONTANDON_API_TOKEN' not in os.environ:
#     os.environ['MONTANDON_API_TOKEN'] = getpass('Enter your Montandon API token: ')


## 1. Setup and Installation

In [2]:
# Install required packages (uncomment if needed)
# !pip install pystac-client pandas matplotlib seaborn plotly -q

In [3]:
# Import libraries
import os
from getpass import getpass
import pandas as pd
import numpy as np
from pystac_client import Client
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from typing import List, Dict, Any, Optional, Tuple
import warnings
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

warnings.filterwarnings('ignore')

# Plotting style
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (14, 7)

## 2. Define Target Countries and Hazard Codes

In [4]:
# Define target countries with ISO3 codes
TARGET_COUNTRIES = {
    # Nordic Countries
    "NOR": "Norway",
    "SWE": "Sweden", 
    "FIN": "Finland",
    
    # Baltic States
    "EST": "Estonia",
    "LTU": "Lithuania",
    "LVA": "Latvia",
    
    # Eastern Europe
    "RUS": "Russia",
    "BLR": "Belarus",
    "POL": "Poland",
    "UKR": "Ukraine",
    "ROU": "Romania",
    "SVK": "Slovakia",
    "HUN": "Hungary",
    "MDA": "Moldova",
    
    # Balkans
    "SRB": "Serbia",
    "BIH": "Bosnia and Herzegovina",
    "ALB": "Albania",
    "XKX": "Kosovo",  # Note: Kosovo uses XKX or XKS
    "MKD": "North Macedonia",
    
    # Central Asia
    "KGZ": "Kyrgyzstan",
    "KAZ": "Kazakhstan",
    "TJK": "Tajikistan",
    "TKM": "Turkmenistan"
}

# List of ISO3 codes for filtering
COUNTRY_CODES = list(TARGET_COUNTRIES.keys())

print(f"Target Countries: {len(COUNTRY_CODES)}")

Target Countries: 23


In [5]:
# Define hazard codes for snow storms and cold waves
HAZARD_CODES = [
    # GLIDE codes
    "CW",           # Cold Wave (GLIDE)
    "OT",           # Other - includes blizzard, severe winter (GLIDE)
    
    # EM-DAT codes
    "nat-met-ext-col",   # Cold wave (EM-DAT)
    "nat-met-sto-bli",   # Blizzard/Winter storm (EM-DAT)
    "nat-met-ext-sev",   # Severe winter conditions (EM-DAT)
    
    # UNDRR-ISC 2025 codes
    "MH0502",       # Cold Wave
    "MH0403",       # Blizzard
    "MH0405",       # Snow
    "MH0406",       # Snow Storm
    "MH0503",       # Dzud (severe winter - Mongolia/Central Asia)
    "MH0504",       # Freeze
    "MH0505",       # Frost
]

# Define impact collections to search
IMPACT_COLLECTIONS = [
    "desinventar-impacts",
    "emdat-impacts",
    "ifrcevent-impacts"
]

# Date range: Focus on last 5 years (2020-2025)
START_DATE = "2000-01-01"
END_DATE = datetime.now().strftime("%Y-%m-%d")
DATETIME_RANGE = f"{START_DATE}/{END_DATE}"

print(f"Hazard Codes: {len(HAZARD_CODES)}")
print(f"Collections: {IMPACT_COLLECTIONS}")
print(f"Date Range: {START_DATE} to {END_DATE} (Last 5 years)")
print(f"Focus: Simplified approach, no CSV streaming")

Hazard Codes: 12
Collections: ['desinventar-impacts', 'emdat-impacts', 'ifrcevent-impacts']
Date Range: 2000-01-01 to 2025-12-10 (Last 5 years)
Focus: Simplified approach, no CSV streaming


## 3. Connect to Montandon STAC API

In [6]:
# ============================================================================
# AUTHENTICATION & CONNECTION TO MONTANDON STAC API
# ============================================================================

# Montandon STAC API URL (CORRECT URL with /stac suffix)
STAC_API_URL = "https://montandon-eoapi-stage.ifrc.org/stac"

# First try to get token from environment variable
api_token = os.getenv('MONTANDON_API_TOKEN')

# If not set, prompt user to enter token
if api_token is None:
    print("=" * 70)
    print("AUTHENTICATION REQUIRED")
    print("=" * 70)
    print("\nThe Montandon STAC API requires a Bearer Token for authentication.")
    print("\nHow to get your token:")
    print("  1. Visit: https://goadmin-stage.ifrc.org/")
    print("  2. Log in with your IFRC credentials")
    print("  3. Generate an API token from your account settings")
    print("\nAlternatively, set the MONTANDON_API_TOKEN environment variable:")
    print("  PowerShell: $env:MONTANDON_API_TOKEN = 'your_token_here'")
    print("  Bash: export MONTANDON_API_TOKEN='your_token_here'")
    print("\n" + "=" * 70)
    api_token = getpass("Enter your Montandon API Token: ")

# Create authentication headers for pystac_client
AUTH_HEADERS = {
    "Authorization": f"Bearer {api_token}"
}

# Connect to the STAC API using pystac_client
try:
    catalog = Client.open(STAC_API_URL, headers=AUTH_HEADERS)
    print(f"Connected to: {catalog.title}")
    
    # List available impact collections
    collections = list(catalog.get_collections())
    impact_collections_available = [c.id for c in collections if '-impacts' in c.id]
    print(f"Available Impact Collections: {len(impact_collections_available)}")
except Exception as e:
    print(f"Connection failed: {e}")
    catalog = None

AUTHENTICATION REQUIRED

The Montandon STAC API requires a Bearer Token for authentication.

How to get your token:
  1. Visit: https://goadmin-stage.ifrc.org/
  2. Log in with your IFRC credentials
  3. Generate an API token from your account settings

Alternatively, set the MONTANDON_API_TOKEN environment variable:
  PowerShell: $env:MONTANDON_API_TOKEN = 'your_token_here'
  Bash: export MONTANDON_API_TOKEN='your_token_here'

Connected to: stac-fastapi
Connected to: stac-fastapi
Available Impact Collections: 9
Available Impact Collections: 9


In [None]:
# FAST year-by-year search with parallel processing
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

def search_impacts_by_year_fast(
    collections: List[str],
    country_codes: List[str],
    hazard_codes: List[str],
    start_year: int,
    end_year: int,
    max_items_per_year: int = 2000,
    max_workers: int = 3
) -> list:
    """
    FAST year-by-year search using parallel processing.
    
    SPEED OPTIMIZATIONS:
    1. Parallel collection search (ThreadPoolExecutor)
    2. Generator-based item processing (less memory)
    3. Immediate filtering (discard unwanted data early)
    4. Shared memory lock (thread-safe)
    
    Parameters:
    -----------
    collections : list
        Collection IDs to search
    country_codes : list
        ISO3 country codes to filter
    hazard_codes : list
        Hazard classification codes to filter
    start_year : int
        Starting year (e.g., 2020)
    end_year : int
        Ending year (e.g., 2025)
    max_items_per_year : int
        Maximum items per year per collection
    max_workers : int
        Number of parallel workers (default: 3, increase for faster search)
    
    Returns:
    --------
    list of STAC items
    """
    if not catalog:
        print("No catalog connection available. Please check authentication.")
        return []
    
    all_items = []
    lock = threading.Lock()  # Thread-safe list updates
    
    # Build CQL2 filter for country codes
    country_filters = [
        {"op": "a_contains", "args": [{"property": "monty:country_codes"}, code]}
        for code in country_codes
    ]
    filter_body = {"op": "or", "args": country_filters}
    
    print(f" search: {len(collections)} collections, {max_workers} parallel workers\n")
    
    # Helper function to search one collection for one year
    def search_single(collection: str, year: int) -> tuple:
        """Search single collection for single year (run in parallel)"""
        year_start = f"{year}-01-01"
        year_end = f"{year}-12-31"
        datetime_range = f"{year_start}/{year_end}"
        
        try:
            search = catalog.search(
                collections=[collection],
                max_items=max_items_per_year,
                datetime=datetime_range,
                filter=filter_body
            )
            
            # OPTIMIZATION: Filter immediately while iterating (generator pattern)
            filtered_items = [
                item for item in search.items()
                if any(code in item.properties.get('monty:hazard_codes', []) 
                       for code in hazard_codes)
            ]
            
            return (year, collection, filtered_items)
            
        except:
            # Fallback without filter
            try:
                search = catalog.search(
                    collections=[collection],
                    max_items=max_items_per_year,
                    datetime=datetime_range
                )
                
                filtered = [
                    item for item in search.items()
                    if any(c in item.properties.get('monty:country_codes', []) 
                           for c in country_codes) and
                       any(h in item.properties.get('monty:hazard_codes', [])
                           for h in hazard_codes)
                ]
                
                return (year, collection, filtered)
            except:
                return (year, collection, [])
    
    # PARALLEL SEARCH: Process all collections for each year in parallel
    for year in range(start_year, end_year + 1):
        year_total = 0
        
        # Submit all collection searches for this year in parallel
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(search_single, coll, year): coll 
                for coll in collections
            }
            
            # Collect results as they complete
            for future in as_completed(futures):
                year_val, coll, items = future.result()
                
                if items:
                    with lock:  # Thread-safe append
                        all_items.extend(items)
                    year_total += len(items)
        
        print(f"Year {year}: {year_total} items")
    
    return all_items



‚úì FAST year-by-year search function loaded
  Features: Parallel processing, generator-based filtering, thread-safe


## 4. Search for Cold Wave and Snow Storm Impacts (2020-2025)



In [None]:
# Execute FAST year-by-year search for 2020-2025
import time

START_YEAR = 2000
END_YEAR = datetime.now().year

print(f"Searching from {START_YEAR} to {END_YEAR}...")
print(f"Target: {len(COUNTRY_CODES)} countries, {len(HAZARD_CODES)} hazard codes")
print(f"Collections: {IMPACT_COLLECTIONS}")
print(f"Parallel workers: 3 (increase to 5-6 for even faster search)\n")

start_time = time.time()

# FAST parallel search
all_impact_items = search_impacts_by_year_fast(
    collections=IMPACT_COLLECTIONS,
    country_codes=COUNTRY_CODES,
    hazard_codes=HAZARD_CODES,
    start_year=START_YEAR,
    end_year=END_YEAR,
    max_items_per_year=2000,
    max_workers=6  # Increase to 5-6 for faster search (if API allows)
)

elapsed_time = time.time() - start_time

print(f"\n{'='*60}")
print(f"‚úì Search Complete!")
print(f"  Total items found: {len(all_impact_items)}")
print(f"  Years searched: {START_YEAR}-{END_YEAR}")
print(f"  Search time: {elapsed_time:.2f} seconds")
print(f"  Speed: {len(all_impact_items)/elapsed_time:.1f} items/second")
print(f"{'='*60}")

Searching from 2000 to 2025...
Target: 23 countries, 12 hazard codes
Collections: ['desinventar-impacts', 'emdat-impacts', 'ifrcevent-impacts']
Parallel workers: 3 (increase to 5-6 for even faster search)

üöÄ FAST search: 3 collections, 6 parallel workers

Year 2000: 18 items
Year 2000: 18 items
Year 2001: 29 items
Year 2001: 29 items
Year 2002: 10 items
Year 2002: 10 items
Year 2003: 2 items
Year 2003: 2 items
Year 2004: 1 items
Year 2004: 1 items
Year 2005: 11 items
Year 2005: 11 items
Year 2006: 17 items
Year 2006: 17 items
Year 2007: 1 items
Year 2007: 1 items
Year 2008: 8 items
Year 2008: 8 items
Year 2009: 13 items
Year 2009: 13 items
Year 2010: 11 items
Year 2010: 11 items
Year 2011: 3 items
Year 2011: 3 items


In [None]:
# Summary of search results
if all_impact_items:
    print(f"\nüìä Data Collection Summary:")
    print(f"   Total items: {len(all_impact_items)}")
    print(f"   Date range: 2020-2025 (last 5 years)")
    print(f"   Countries: {len(COUNTRY_CODES)}")
    print(f"   Hazard codes: {len(HAZARD_CODES)}")
    
    # Count by collection
    from collections import Counter
    collections_count = Counter(item.collection_id for item in all_impact_items)
    print(f"\n   Items by collection:")
    for coll, count in collections_count.items():
        print(f"     - {coll}: {count}")
else:
    print("‚ö†Ô∏è  No items found. Check filters or date range.")


üìä Data Collection Summary:
   Total items: 23
   Date range: 2020-2025 (last 5 years)
   Countries: 23
   Hazard codes: 12

   Items by collection:
     - emdat-impacts: 23


## 5. Extract and Process Impact Data

In [None]:
# Extract impact data from STAC items into DataFrame
def extract_impact_data(items: list) -> pd.DataFrame:
    """
    Extract impact details from STAC items.
    Each item's impact_detail can contain multiple impact records.
    """
    records = []
    
    for item in items:
        props = item.properties
        
        # Base information
        base_info = {
            'id': item.id,
            'collection': item.collection_id,
            'datetime': props.get('datetime'),
            'title': props.get('title', ''),
            'country_codes': ', '.join(props.get('monty:country_codes', [])),
            'hazard_codes': ', '.join(props.get('monty:hazard_codes', [])),
        }
        
        # Get impact_detail - can be a dict or list of dicts
        impact_detail = props.get('monty:impact_detail')
        
        if impact_detail:
            # Handle if it's a single dict
            if isinstance(impact_detail, dict):
                impact_detail = [impact_detail]
            
            # Handle if it's a list
            if isinstance(impact_detail, list):
                for impact in impact_detail:
                    if isinstance(impact, dict):
                        record = base_info.copy()
                        record['impact_category'] = impact.get('category', 'UNKNOWN')
                        record['impact_type'] = impact.get('type', 'UNKNOWN')
                        record['impact_value'] = impact.get('value', 0)
                        record['impact_unit'] = impact.get('unit', '')
                        record['estimate_type'] = impact.get('estimate_type', 'PRIMARY')
                        records.append(record)
            else:
                # Fallback: create record with no impact data
                record = base_info.copy()
                record['impact_category'] = 'NO_DATA'
                record['impact_type'] = 'NO_DATA'
                record['impact_value'] = None
                record['impact_unit'] = ''
                record['estimate_type'] = ''
                records.append(record)
        else:
            # No impact detail at all
            record = base_info.copy()
            record['impact_category'] = 'NO_DATA'
            record['impact_type'] = 'NO_DATA'
            record['impact_value'] = None
            record['impact_unit'] = ''
            record['estimate_type'] = ''
            records.append(record)
    
    return pd.DataFrame(records)

# Extract data
if all_impact_items:
    print("Extracting impact details from items...")
    impacts_df = extract_impact_data(all_impact_items)
    print(f"‚úì Extracted {len(impacts_df)} impact records from {len(all_impact_items)} items")
    
    # Show sample
    print("\nSample records:")
    display(impacts_df.head(10))
else:
    print("No items to process")
    impacts_df = pd.DataFrame()

Extracting impact details from items...
‚úì Extracted 23 impact records from 23 items

Sample records:


Unnamed: 0,id,collection,datetime,title,country_codes,hazard_codes,impact_category,impact_type,impact_value,impact_unit,estimate_type
0,emdat-impact-2015-0577-POL-total_deaths,emdat-impacts,2015-01-01T00:00:00Z,Cold wave in Poland - total_deaths,POL,nat-met-ext-col,people,death,77.0,count,primary
1,emdat-impact-2014-0496-MKD-total_affected,emdat-impacts,2014-12-28T00:00:00Z,Severe winter conditions in North Macedonia - ...,MKD,nat-met-ext-sev,people,affected_total,8800.0,count,primary
2,emdat-impact-2014-0496-MKD-no_affected,emdat-impacts,2014-12-28T00:00:00Z,Severe winter conditions in North Macedonia - ...,MKD,nat-met-ext-sev,people,affected_total,8800.0,count,primary
3,emdat-impact-2016-0001-POL-total_deaths,emdat-impacts,2016-01-02T00:00:00Z,Cold wave in Poland - total_deaths,POL,nat-met-ext-col,people,death,21.0,count,primary
4,emdat-impact-2016-0001-UKR-total_deaths,emdat-impacts,2016-01-01T00:00:00Z,Cold wave in Ukraine - total_deaths,UKR,nat-met-ext-col,people,death,37.0,count,primary
5,emdat-impact-2017-0010-SRB-total_deaths,emdat-impacts,2017-01-20T00:00:00Z,Severe winter conditions in Serbia - total_deaths,SRB,nat-met-ext-sev,people,death,6.0,count,primary
6,emdat-impact-2017-0010-ALB-total_deaths,emdat-impacts,2017-01-20T00:00:00Z,Severe winter conditions in Albania - total_de...,ALB,nat-met-ext-sev,people,death,6.0,count,primary
7,emdat-impact-2017-0010-MKD-total_deaths,emdat-impacts,2017-01-05T00:00:00Z,Severe winter conditions in North Macedonia - ...,MKD,nat-met-ext-sev,people,death,3.0,count,primary
8,emdat-impact-2017-0010-MKD-total_affected,emdat-impacts,2017-01-05T00:00:00Z,Severe winter conditions in North Macedonia - ...,MKD,nat-met-ext-sev,people,affected_total,2220.0,count,primary
9,emdat-impact-2017-0010-MKD-no_affected,emdat-impacts,2017-01-05T00:00:00Z,Severe winter conditions in North Macedonia - ...,MKD,nat-met-ext-sev,people,affected_total,2220.0,count,primary


In [None]:
# Process datetime and add year column
if len(impacts_df) > 0:
    impacts_df['datetime'] = pd.to_datetime(impacts_df['datetime'], errors='coerce')
    impacts_df['year'] = impacts_df['datetime'].dt.year
    impacts_df['month'] = impacts_df['datetime'].dt.month
    
    # Extract first country code for grouping
    impacts_df['primary_country'] = impacts_df['country_codes'].apply(
        lambda x: x.split(',')[0].strip() if pd.notna(x) and x else 'UNKNOWN'
    )
    
    # Add country name
    impacts_df['country_name'] = impacts_df['primary_country'].map(TARGET_COUNTRIES)
    impacts_df['country_name'] = impacts_df['country_name'].fillna(impacts_df['primary_country'])
    
    print(" Data processed with datetime and country information")
    print(f"\n Year range: {impacts_df['year'].min()} - {impacts_df['year'].max()}")
    print(f"\n Countries with data:")
    print(impacts_df['country_name'].value_counts())
else:
    print("No impact data to process")

 Data processed with datetime and country information

 Year range: 2014 - 2019

 Countries with data:
country_name
North Macedonia    5
Belarus            4
Poland             3
Ukraine            2
Hungary            2
Moldova            2
Albania            1
Serbia             1
Romania            1
Lithuania          1
Estonia            1
Name: count, dtype: int64


## 6. Analyze Impact Data by Collection

In [None]:
# Summary by collection
if len(impacts_df) > 0:
    print("Impact Records by Collection:")
    collection_summary = impacts_df.groupby('collection').agg({
        'id': 'count',
        'impact_value': 'sum',
        'country_name': 'nunique',
        'year': ['min', 'max']
    }).round(0)
    collection_summary.columns = ['Records', 'Total Impact Value', 'Countries', 'From Year', 'To Year']
    display(collection_summary)
else:
    print("No data to summarize")

Impact Records by Collection:


Unnamed: 0_level_0,Records,Total Impact Value,Countries,From Year,To Year
collection,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
emdat-impacts,23,134649.0,11,2014,2019


In [None]:
# Analyze impact types
if len(impacts_df) > 0 and 'impact_type' in impacts_df.columns:
    print("Impact Types Distribution:")
    impact_type_summary = impacts_df.groupby('impact_type').agg({
        'id': 'count',
        'impact_value': ['sum', 'max']
    }).round(2)
    impact_type_summary.columns = ['Count', 'Total Value', 'Max Value']
    impact_type_summary = impact_type_summary.sort_values('Count', ascending=False)
    display(impact_type_summary.head(15))

Impact Types Distribution:


Unnamed: 0_level_0,Count,Total Value,Max Value
impact_type,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
death,14,611.0,179.0
affected_total,8,133499.0,50539.0
injured,1,539.0,539.0


## 7. Deaths and Casualties Analysis

In [None]:
# Filter for death-related impacts
death_types = ['DEATHS', 'DEATH', 'deaths', 'death']

if len(impacts_df) > 0:
    deaths_df = impacts_df[impacts_df['impact_type'].str.upper().isin([t.upper() for t in death_types])].copy()
    
    if len(deaths_df) > 0:
        print(f"Death Records Found: {len(deaths_df)}")
        
        # Deaths by country
        deaths_by_country = deaths_df.groupby('country_name')['impact_value'].sum().sort_values(ascending=False)
        print("\nTotal Deaths by Country (Cold Wave/Snow Storm 2000-2025):")
        display(deaths_by_country)
        
        # Deaths by year
        deaths_by_year = deaths_df.groupby('year')['impact_value'].sum().sort_index()
        print("\nDeaths by Year:")
        display(deaths_by_year)
    else:
        print("No death records found in filtered data")
else:
    print("No data available")

Death Records Found: 14

Total Deaths by Country (Cold Wave/Snow Storm 2000-2025):


country_name
Hungary            358.0
Poland             107.0
Ukraine             74.0
Belarus             43.0
Estonia              7.0
Albania              6.0
Serbia               6.0
Lithuania            5.0
North Macedonia      3.0
Romania              2.0
Name: impact_value, dtype: float64


Deaths by Year:


year
2015     77.0
2016     58.0
2017     95.0
2018    381.0
Name: impact_value, dtype: float64

In [None]:
# Visualize deaths by country
if len(impacts_df) > 0 and 'deaths_by_country' in dir():
    if len(deaths_by_country) > 0:
        fig = px.bar(
            x=deaths_by_country.index,
            y=deaths_by_country.values,
            title='Deaths from Cold Waves and Snow Storms (2000-2025)',
            labels={'x': 'Country', 'y': 'Total Deaths'},
            color=deaths_by_country.values,
            color_continuous_scale='Reds'
        )
        fig.update_layout(
            xaxis_tickangle=-45,
            height=500
        )
        fig.show()

In [None]:
# Visualize deaths trend over years
if len(impacts_df) > 0 and 'deaths_by_year' in dir():
    if len(deaths_by_year) > 0:
        fig = px.line(
            x=deaths_by_year.index,
            y=deaths_by_year.values,
            title='Deaths from Cold Waves and Snow Storms Over Time',
            labels={'x': 'Year', 'y': 'Deaths'},
            markers=True
        )
        fig.update_layout(height=400)
        fig.show()

## 8. Affected Population Analysis

In [None]:
# Filter for affected population
affected_types = ['AFFECTED_TOTAL', 'TOTAL_AFFECTED', 'DIRECTLY_AFFECTED', 'INDIRECTLY_AFFECTED', 
                  'affected_total', 'affected', 'AFFECTED']

if len(impacts_df) > 0:
    affected_df = impacts_df[impacts_df['impact_type'].str.upper().isin([t.upper() for t in affected_types])].copy()
    
    if len(affected_df) > 0:
        print(f"Affected Population Records: {len(affected_df)}")
        
        # Affected by country
        affected_by_country = affected_df.groupby('country_name')['impact_value'].sum().sort_values(ascending=False)
        print("\nTotal Affected Population by Country:")
        display(affected_by_country)
    else:
        print("No affected population records found")

Affected Population Records: 8

Total Affected Population by Country:


country_name
Belarus            100539.0
North Macedonia     22040.0
Moldova             10920.0
Name: impact_value, dtype: float64

In [None]:
# Visualize affected population
if 'affected_by_country' in dir() and len(affected_by_country) > 0:
    fig = px.bar(
        x=affected_by_country.index,
        y=affected_by_country.values,
        title='Population Affected by Cold Waves and Snow Storms (2000-2025)',
        labels={'x': 'Country', 'y': 'Total Affected'},
        color=affected_by_country.values,
        color_continuous_scale='Blues'
    )
    fig.update_layout(
        xaxis_tickangle=-45,
        height=500
    )
    fig.show()

## 9. Economic Losses Analysis

In [None]:
# Filter for economic losses
economic_types = ['LOSS_COST', 'COST', 'LOSS', 'DAMAGED', 'DESTROYED', 'DAMAGES']

if len(impacts_df) > 0:
    economic_df = impacts_df[impacts_df['impact_type'].str.upper().isin([t.upper() for t in economic_types])].copy()
    
    if len(economic_df) > 0:
        print(f"Economic Impact Records: {len(economic_df)}")
        
        # Filter for USD values
        usd_losses = economic_df[economic_df['impact_unit'].str.contains('USD|dollar', case=False, na=False)]
        
        if len(usd_losses) > 0:
            losses_by_country = usd_losses.groupby('country_name')['impact_value'].sum().sort_values(ascending=False)
            print("\nEconomic Losses by Country (USD):")
            display(losses_by_country)
        else:
            print("\nNo USD-denominated losses found. All economic impacts:")
            display(economic_df[['country_name', 'impact_type', 'impact_value', 'impact_unit']].head(20))
    else:
        print("No economic impact records found")

No economic impact records found


## 10. Temporal Analysis - Events by Month

In [None]:
# Analyze events by month (cold waves typically winter months)
if len(impacts_df) > 0 and 'month' in impacts_df.columns:
    monthly_events = impacts_df.groupby('month')['id'].nunique()
    
    month_names = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 
                   'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
    
    fig = px.bar(
        x=month_names,
        y=[monthly_events.get(i+1, 0) for i in range(12)],
        title='Cold Wave and Snow Storm Events by Month',
        labels={'x': 'Month', 'y': 'Number of Events'},
        color=[monthly_events.get(i+1, 0) for i in range(12)],
        color_continuous_scale='RdBu_r'
    )
    fig.update_layout(height=400)
    fig.show()

## 11. Summary 

In [None]:
# Create summary statistics
if len(impacts_df) > 0:
    print("="*60)
    print("SUMMARY: Cold Wave & Snow Storm Impacts (2000-2025)")
    print("   Europe and Central Asia Target Countries")
    print("="*60)
    
    print(f"\nData Sources:")
    for col in impacts_df['collection'].unique():
        count = len(impacts_df[impacts_df['collection'] == col])
        print(f"   - {col}: {count} impact records")
    
    print(f"\nCountries with Impact Data: {impacts_df['country_name'].nunique()}")
    
    print(f"\nTime Period: {impacts_df['year'].min()} - {impacts_df['year'].max()}")
    
    # Deaths summary
    if 'deaths_df' in dir() and len(deaths_df) > 0:
        total_deaths = deaths_df['impact_value'].sum()
        print(f"\nTotal Deaths: {total_deaths:,.0f}")
    
    # Affected summary
    if 'affected_df' in dir() and len(affected_df) > 0:
        total_affected = affected_df['impact_value'].sum()
        print(f"\nTotal Affected: {total_affected:,.0f}")
    
    print("\n" + "="*60)
else:
    print("No data available for summary")

SUMMARY: Cold Wave & Snow Storm Impacts (2000-2025)
   Europe and Central Asia Target Countries

Data Sources:
   - emdat-impacts: 23 impact records

Countries with Impact Data: 11

Time Period: 2014 - 2019

Total Deaths: 611

Total Affected: 133,499



In [None]:
# Create a comprehensive impact summary table
if len(impacts_df) > 0:
    # Pivot table by country and impact type
    summary_pivot = impacts_df.pivot_table(
        index='country_name',
        columns='impact_type',
        values='impact_value',
        aggfunc='sum',
        fill_value=0
    )
    
    # Select key impact types if they exist
    key_types = ['DEATHS', 'TOTAL_AFFECTED', 'AFFECTED_TOTAL', 'INJURED', 'MISSING', 
                 'DESTROYED', 'DAMAGED', 'EVACUATED']
    available_types = [t for t in key_types if t in summary_pivot.columns]
    
    if available_types:
        summary_filtered = summary_pivot[available_types]
        print("Impact Summary by Country and Type:")
        display(summary_filtered)
    else:
        print("Full Impact Summary:")
        display(summary_pivot)

Full Impact Summary:


impact_type,affected_total,death,injured
country_name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Albania,0.0,6.0,0.0
Belarus,100539.0,43.0,539.0
Estonia,0.0,7.0,0.0
Hungary,0.0,358.0,0.0
Lithuania,0.0,5.0,0.0
Moldova,10920.0,0.0,0.0
North Macedonia,22040.0,3.0,0.0
Poland,0.0,107.0,0.0
Romania,0.0,2.0,0.0
Serbia,0.0,6.0,0.0


## 12. Export Results

In [None]:
# Export detailed data to CSV
if len(impacts_df) > 0:
    output_file = 'cold_wave_snow_storm_impacts_europe_central_asia.csv'
    impacts_df.to_csv(output_file, index=False)
    print(f"Data exported to: {output_file}")
    print(f"Total records: {len(impacts_df)}")
else:
    print("No data to export")

Data exported to: cold_wave_snow_storm_impacts_europe_central_asia.csv
Total records: 23


In [None]:
# Display sample of the data
if len(impacts_df) > 0:
    print("Sample Data (first 20 records):")
    display(impacts_df[['datetime', 'country_name', 'hazard_codes', 'impact_type', 
                        'impact_value', 'impact_unit', 'collection']].head(20))

Sample Data (first 20 records):


Unnamed: 0,datetime,country_name,hazard_codes,impact_type,impact_value,impact_unit,collection
0,2015-01-01 00:00:00+00:00,Poland,nat-met-ext-col,death,77.0,count,emdat-impacts
1,2014-12-28 00:00:00+00:00,North Macedonia,nat-met-ext-sev,affected_total,8800.0,count,emdat-impacts
2,2014-12-28 00:00:00+00:00,North Macedonia,nat-met-ext-sev,affected_total,8800.0,count,emdat-impacts
3,2016-01-02 00:00:00+00:00,Poland,nat-met-ext-col,death,21.0,count,emdat-impacts
4,2016-01-01 00:00:00+00:00,Ukraine,nat-met-ext-col,death,37.0,count,emdat-impacts
5,2017-01-20 00:00:00+00:00,Serbia,nat-met-ext-sev,death,6.0,count,emdat-impacts
6,2017-01-20 00:00:00+00:00,Albania,nat-met-ext-sev,death,6.0,count,emdat-impacts
7,2017-01-05 00:00:00+00:00,North Macedonia,nat-met-ext-sev,death,3.0,count,emdat-impacts
8,2017-01-05 00:00:00+00:00,North Macedonia,nat-met-ext-sev,affected_total,2220.0,count,emdat-impacts
9,2017-01-05 00:00:00+00:00,North Macedonia,nat-met-ext-sev,affected_total,2220.0,count,emdat-impacts


---

## Key Findings

This notebook analyzed cold wave and snow storm impacts across 23 countries in Europe and Central Asia from 2000 to 2025.

### Data Sources Used:
- **DesInventar**: Detailed local disaster impact records
- **EM-DAT**: Global disaster database with comprehensive impact metrics
- **IFRC DREF**: Red Cross disaster response operations

### Impact Types Analyzed:
- Deaths and casualties
- Affected population
- Economic losses
- Infrastructure damage

### Hazard Codes Searched:
- Cold Wave (CW, MH0502, nat-met-ext-col)
- Blizzard/Winter Storm (nat-met-sto-bli, MH0403)
- Snow Storm (MH0406)
- Severe Winter Conditions (nat-met-ext-sev, MH0503)

---
*Data source: Montandon STAC API - https://montandon-eoapi-stage.ifrc.org/stac*