# Comprehensive Hydrological Analysis for CAMELS-RU Dataset

This notebook demonstrates the usage of the reorganized hydrological statistics package for comprehensive analysis of multiple gauge stations. The new modular approach allows for efficient calculation of standardized hydrological indices across the entire CAMELS-RU dataset.

## Features:
- **Batch processing** of multiple gauge stations
- **Comprehensive metrics** from all hydrological modules
- **Standardized output** as DataFrame with gauge_id as index
- **Performance optimized** calculations
- **Error handling** for incomplete or problematic data

## Modules Used:
- `src.hydro.base_flow` - Base flow separation and BFI calculation
- `src.hydro.flow_duration` - Flow Duration Curve analysis
- `src.hydro.flow_extremes` - High/low flow analysis
- `src.hydro.flow_timing` - Temporal flow characteristics
- `src.hydro.flow_variability` - Multi-scale variability analysis
- `src.hydro.flow_indices` - Comprehensive hydrological indices

In [None]:
# Setup and imports
%load_ext autoreload
%autoreload 2

import pandas as pd
import numpy as np
import glob
import warnings
from pathlib import Path
from typing import Dict, List, Optional
import time
from tqdm.auto import tqdm

# Import the new modular hydro package
import sys

sys.path.append("../")

from src.hydro import (
    calculate_comprehensive_metrics,
    BaseFlowSeparation,
    FlowDurationCurve,
    FlowExtremes,
    FlowTiming,
    FlowVariability,
    HydrologicalIndices,
)

# Suppress warnings for cleaner output
warnings.filterwarnings("ignore")

In [None]:
# Load discharge data
print("Loading discharge data...")
discharge_data = pd.read_csv("../discharge.csv", index_col="date")
discharge_data.index = pd.to_datetime(discharge_data.index)

# Remove columns with too many NaN values (less than 2 years of data)
min_data_points = 730  # Minimum 2 years of daily data
discharge_data = discharge_data.dropna(axis=1, thresh=min_data_points)

print(f"Loaded data for {len(discharge_data.columns)} gauge stations")
print(f"Date range: {discharge_data.index.min()} to {discharge_data.index.max()}")
print(f"Total data points per station (max): {len(discharge_data)}")

# Display basic info about the dataset
discharge_data.info()

## Core Function: Batch Hydrological Analysis

The following function processes multiple gauge stations efficiently and returns a standardized DataFrame with comprehensive hydrological statistics.

In [None]:
def calculate_hydrological_statistics_batch(
    discharge_data: pd.DataFrame,
    gauge_ids: Optional[List[str]] = None,
    min_data_years: float = 2.0,
    include_bfi: bool = True,
    include_detailed_metrics: bool = True,
    progress_bar: bool = True,
) -> pd.DataFrame:
    """
    Calculate comprehensive hydrological statistics for multiple gauge stations.

    Parameters:
    -----------
    discharge_data : pd.DataFrame
        DataFrame with datetime index and gauge IDs as columns
    gauge_ids : List[str], optional
        List of specific gauge IDs to process. If None, processes all columns
    min_data_years : float
        Minimum years of data required for analysis (default: 2.0)
    include_bfi : bool
        Whether to calculate BFI (computationally intensive, default: True)
    include_detailed_metrics : bool
        Whether to include all detailed metrics from all modules (default: True)
    progress_bar : bool
        Whether to show progress bar (default: True)

    Returns:
    --------
    pd.DataFrame
        DataFrame with gauge_id as index and calculated statistics as columns
    """

    # Select gauge IDs to process
    if gauge_ids is None:
        gauge_ids = discharge_data.columns.tolist()

    # Filter gauge IDs that actually exist in the data
    available_gauges = [gid for gid in gauge_ids if gid in discharge_data.columns]

    print(f"Processing {len(available_gauges)} gauge stations...")

    # Initialize results storage
    results = {}
    failed_gauges = []

    # Setup progress bar
    iterator = tqdm(available_gauges) if progress_bar else available_gauges

    for gauge_id in iterator:
        try:
            if progress_bar:
                iterator.set_description(f"Processing {gauge_id}")

            # Extract discharge series for this gauge
            discharge_series = discharge_data[gauge_id].dropna()

            # Check minimum data requirement
            years_of_data = len(discharge_series) / 365.25
            if years_of_data < min_data_years:
                if progress_bar:
                    iterator.set_postfix({"status": f"skipped - {years_of_data:.1f}y"})
                continue

            # Calculate comprehensive metrics
            metrics = calculate_comprehensive_metrics(
                discharge_series, include_bfi=include_bfi, include_all_modules=include_detailed_metrics
            )

            # Add basic data quality metrics
            metrics.update(
                {
                    "data_years": years_of_data,
                    "data_points": len(discharge_series),
                    "data_completeness": len(discharge_series) / len(discharge_data) * 100,
                    "start_date": discharge_series.index.min(),
                    "end_date": discharge_series.index.max(),
                }
            )

            results[gauge_id] = metrics

            if progress_bar:
                iterator.set_postfix({"status": f"completed - {len(metrics)} metrics"})

        except Exception as e:
            failed_gauges.append((gauge_id, str(e)))
            if progress_bar:
                iterator.set_postfix({"status": f"failed - {str(e)[:20]}..."})
            continue

    # Convert results to DataFrame
    if results:
        results_df = pd.DataFrame.from_dict(results, orient="index")
        results_df.index.name = "gauge_id"

        print(f"\nSuccessfully processed {len(results_df)} gauge stations")
        print(f"Failed to process {len(failed_gauges)} gauge stations")

        if failed_gauges:
            print("Failed gauges:")
            for gauge_id, error in failed_gauges[:5]:  # Show first 5 failures
                print(f"  {gauge_id}: {error}")
            if len(failed_gauges) > 5:
                print(f"  ... and {len(failed_gauges) - 5} more")

        return results_df
    else:
        print("No gauge stations were successfully processed!")
        return pd.DataFrame()

## Use Case 1: Quick Analysis for All Stations

Process all available gauge stations with a subset of key metrics for rapid assessment.

In [None]:
# Quick analysis for first 10 stations (for demonstration)
sample_stations = discharge_data.columns[:10].tolist()

print("=== USE CASE 1: Quick Analysis (Key Metrics Only) ===")
start_time = time.time()

# Quick analysis without BFI and detailed metrics for speed
quick_results = calculate_hydrological_statistics_batch(
    discharge_data,
    gauge_ids=sample_stations,
    min_data_years=1.0,
    include_bfi=False,  # Skip BFI for speed
    include_detailed_metrics=False,  # Only basic metrics
    progress_bar=True,
)

elapsed_time = time.time() - start_time
print(f"\nQuick analysis completed in {elapsed_time:.2f} seconds")
print(f"Generated {len(quick_results.columns)} metrics per station")

# Display first few results
print("\nSample results (first 5 stations, selected metrics):")
key_metrics = [
    col
    for col in quick_results.columns
    if any(x in col.lower() for x in ["mean", "cv", "q05", "q95", "fdc_slope", "data_years"])
]
print(quick_results[key_metrics].head())

## Use Case 2: Comprehensive Analysis for Selected Stations

Perform detailed analysis including BFI calculation and all available metrics for a subset of high-quality stations.

In [None]:
# Select high-quality stations (those with >5 years of data)
print("=== USE CASE 2: Comprehensive Analysis for High-Quality Stations ===")

# Find stations with sufficient data
data_coverage = discharge_data.count() / len(discharge_data) * 100
long_record_stations = data_coverage[data_coverage > 80].index[:5].tolist()  # Top 5 stations

print(f"Selected {len(long_record_stations)} high-quality stations for detailed analysis:")
for station in long_record_stations:
    coverage = data_coverage[station]
    print(f"  {station}: {coverage:.1f}% data coverage")

start_time = time.time()

# Comprehensive analysis with all metrics including BFI
comprehensive_results = calculate_hydrological_statistics_batch(
    discharge_data,
    gauge_ids=long_record_stations,
    min_data_years=3.0,
    include_bfi=True,  # Include BFI calculation
    include_detailed_metrics=True,  # All available metrics
    progress_bar=True,
)

elapsed_time = time.time() - start_time
print(f"\nComprehensive analysis completed in {elapsed_time:.2f} seconds")
print(f"Generated {len(comprehensive_results.columns)} metrics per station")

# Display comprehensive results
print("\nComprehensive results overview:")
print(f"Shape: {comprehensive_results.shape}")
print(f"Metrics categories:")

# Group metrics by category
metric_categories = {}
for col in comprehensive_results.columns:
    category = col.split("_")[0] if "_" in col else "basic"
    if category not in metric_categories:
        metric_categories[category] = []
    metric_categories[category].append(col)

for category, metrics in metric_categories.items():
    print(f"  {category}: {len(metrics)} metrics")

comprehensive_results.head()

## Use Case 3: Custom Metrics for Specific Analysis

Calculate specific sets of metrics for targeted research questions (e.g., drought analysis, flood analysis, baseflow studies).

In [None]:
def calculate_drought_focused_metrics(
    discharge_data: pd.DataFrame, gauge_ids: List[str]
) -> pd.DataFrame:
    """Calculate metrics specifically focused on drought analysis."""

    results = {}

    for gauge_id in tqdm(gauge_ids, desc="Drought analysis"):
        try:
            discharge_series = discharge_data[gauge_id].dropna()

            if len(discharge_series) < 730:  # Skip if less than 2 years
                continue

            # Initialize analyzers
            extremes = FlowExtremes(discharge_series)
            variability = FlowVariability(discharge_series)
            fdc = FlowDurationCurve(discharge_series)

            # Drought-specific metrics
            drought_metrics = extremes.calculate_drought_indices()
            low_flow_metrics = extremes.analyze_low_flows(threshold_multiplier=0.1)  # Stricter threshold

            # Additional drought indicators
            q70 = fdc.get_percentile_flow(70)
            q80 = fdc.get_percentile_flow(80)
            q90 = fdc.get_percentile_flow(90)
            q95 = fdc.get_percentile_flow(95)

            # Combine drought-focused metrics
            metrics = {
                "gauge_id": gauge_id,
                "q70": q70,
                "q80": q80,
                "q90": q90,
                "q95": q95,
                "q95_mean_ratio": drought_metrics["q95_flow"] / np.mean(discharge_series),
                "low_flow_frequency": low_flow_metrics["low_flow_frequency"],
                "low_flow_duration_avg": low_flow_metrics["low_flow_avg_duration"],
                "low_flow_duration_max": low_flow_metrics["low_flow_max_duration"],
                "baseflow_approx": drought_metrics["bfi_approx"],
                "cv": np.std(discharge_series) / np.mean(discharge_series),
                "data_years": len(discharge_series) / 365.25,
            }

            results[gauge_id] = metrics

        except Exception as e:
            continue

    return pd.DataFrame.from_dict(results, orient="index")


def calculate_flood_focused_metrics(discharge_data: pd.DataFrame, gauge_ids: List[str]) -> pd.DataFrame:
    """Calculate metrics specifically focused on flood analysis."""

    results = {}

    for gauge_id in tqdm(gauge_ids, desc="Flood analysis"):
        try:
            discharge_series = discharge_data[gauge_id].dropna()

            if len(discharge_series) < 730:  # Skip if less than 2 years
                continue

            # Initialize analyzers
            extremes = FlowExtremes(discharge_series)
            variability = FlowVariability(discharge_series)
            fdc = FlowDurationCurve(discharge_series)

            # Flood-specific metrics
            flood_metrics = extremes.calculate_flood_indices()
            high_flow_metrics = extremes.analyze_high_flows(
                threshold_multiplier=3.0
            )  # Conservative threshold
            flashiness = variability.calculate_flashiness_index()

            # Additional flood indicators
            q05 = fdc.get_percentile_flow(5)
            q10 = fdc.get_percentile_flow(10)
            q20 = fdc.get_percentile_flow(20)

            # Combine flood-focused metrics
            metrics = {
                "gauge_id": gauge_id,
                "q05": q05,
                "q10": q10,
                "q20": q20,
                "q05_mean_ratio": q05 / np.mean(discharge_series),
                "high_flow_frequency": high_flow_metrics["high_flow_frequency"],
                "high_flow_duration_avg": high_flow_metrics["high_flow_avg_duration"],
                "high_flow_duration_max": high_flow_metrics["high_flow_max_duration"],
                "flashiness_index": flashiness["flashiness_index"],
                "max_daily_rise": flood_metrics.get("max_daily_rise", np.nan),
                "flood_threshold": flood_metrics["flood_threshold"],
                "data_years": len(discharge_series) / 365.25,
            }

            results[gauge_id] = metrics

        except Exception as e:
            continue

    return pd.DataFrame.from_dict(results, orient="index")


# Example: Drought analysis for selected stations
print("=== USE CASE 3A: Drought-Focused Analysis ===")
drought_results = calculate_drought_focused_metrics(discharge_data, sample_stations)
print(f"Drought analysis completed for {len(drought_results)} stations")
print("\nDrought metrics summary:")
drought_results.describe()

In [None]:
# Example: Flood analysis for selected stations
print("=== USE CASE 3B: Flood-Focused Analysis ===")
flood_results = calculate_flood_focused_metrics(discharge_data, sample_stations)
print(f"Flood analysis completed for {len(flood_results)} stations")
print("\nFlood metrics summary:")
flood_results.describe()

## Use Case 4: Flow Regime Classification

Classify flow regimes across all stations using key hydrological signatures and create a comprehensive regime database.

In [None]:
def classify_flow_regimes(discharge_data: pd.DataFrame, gauge_ids: List[str]) -> pd.DataFrame:
    """Classify flow regimes using key hydrological signatures."""

    from src.hydro.flow_indices import calculate_regime_classification_metrics

    results = {}

    for gauge_id in tqdm(gauge_ids, desc="Regime classification"):
        try:
            discharge_series = discharge_data[gauge_id].dropna()

            if len(discharge_series) < 1095:  # Require at least 3 years
                continue

            # Calculate classification metrics
            classification_metrics = calculate_regime_classification_metrics(discharge_series)

            # Additional signature metrics
            fdc = FlowDurationCurve(discharge_series)
            variability = FlowVariability(discharge_series)

            # Key signatures for classification
            fdc_slope = fdc.calculate_fdc_slope()
            cv = np.std(discharge_series) / np.mean(discharge_series)
            q5_q95_ratio = fdc.get_percentile_flow(5) / fdc.get_percentile_flow(95)

            # Seasonal signatures
            if isinstance(discharge_series.index, pd.DatetimeIndex):
                timing = FlowTiming(discharge_series)
                seasonal_metrics = timing.calculate_seasonal_flows()

                # Summer vs winter flow ratio
                summer_ratio = seasonal_metrics.get("summer_ratio", np.nan)
                winter_ratio = seasonal_metrics.get("winter_ratio", np.nan)
            else:
                summer_ratio = np.nan
                winter_ratio = np.nan

            # Enhanced regime classification
            regime_type = classification_metrics["regime_classification"]

            # Additional classification based on seasonal patterns
            if not np.isnan(summer_ratio) and not np.isnan(winter_ratio):
                if summer_ratio > 1.5:
                    regime_subtype = "snowmelt_dominated"
                elif winter_ratio > 1.5:
                    regime_subtype = "rain_dominated"
                else:
                    regime_subtype = "mixed"
            else:
                regime_subtype = "unknown"

            metrics = {
                "gauge_id": gauge_id,
                "regime_type": regime_type,
                "regime_subtype": regime_subtype,
                "fdc_slope": fdc_slope,
                "coefficient_of_variation": cv,
                "q5_q95_ratio": q5_q95_ratio,
                "summer_ratio": summer_ratio,
                "winter_ratio": winter_ratio,
                "baseflow_ratio_approx": classification_metrics["baseflow_ratio_approx"],
                "mean_flow": float(np.mean(discharge_series)),
                "data_years": len(discharge_series) / 365.25,
            }

            results[gauge_id] = metrics

        except Exception as e:
            continue

    return pd.DataFrame.from_dict(results, orient="index")


print("=== USE CASE 4: Flow Regime Classification ===")
regime_results = classify_flow_regimes(discharge_data, sample_stations)
print(f"Regime classification completed for {len(regime_results)} stations")

# Display classification results
print("\nFlow regime distribution:")
print(regime_results["regime_type"].value_counts())
print("\nRegime subtype distribution:")
print(regime_results["regime_subtype"].value_counts())

print("\nRegime classification summary:")
regime_results.head()

## Use Case 5: Production Pipeline for Full Dataset

Production-ready pipeline for processing the entire CAMELS-RU dataset with optimizations for large-scale processing.

In [None]:
def production_pipeline_full_dataset(
    discharge_data: pd.DataFrame,
    output_dir: str = "../results/",
    chunk_size: int = 50,
    save_intermediate: bool = True,
) -> pd.DataFrame:
    """
    Production pipeline for processing the full CAMELS-RU dataset.

    Features:
    - Chunked processing to manage memory
    - Intermediate saves for fault tolerance
    - Multiple output formats
    - Quality control and validation
    """

    # Ensure output directory exists
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    # Get all available gauge stations
    all_stations = discharge_data.columns.tolist()
    print(f"Total stations to process: {len(all_stations)}")

    # Split into chunks for processing
    chunks = [all_stations[i : i + chunk_size] for i in range(0, len(all_stations), chunk_size)]
    print(f"Processing in {len(chunks)} chunks of {chunk_size} stations each")

    all_results = []

    for i, chunk in enumerate(chunks):
        print(f"\n--- Processing Chunk {i + 1}/{len(chunks)} ---")
        print(f"Stations {i * chunk_size + 1} to {min((i + 1) * chunk_size, len(all_stations))}")

        try:
            # Process chunk with comprehensive metrics
            chunk_results = calculate_hydrological_statistics_batch(
                discharge_data,
                gauge_ids=chunk,
                min_data_years=1.0,
                include_bfi=True,  # Include BFI for production
                include_detailed_metrics=True,
                progress_bar=True,
            )

            if len(chunk_results) > 0:
                all_results.append(chunk_results)

                # Save intermediate results
                if save_intermediate:
                    chunk_file = f"{output_dir}/hydro_metrics_chunk_{i + 1:03d}.csv"
                    chunk_results.to_csv(chunk_file)
                    print(f"Saved intermediate results to {chunk_file}")

        except Exception as e:
            print(f"Error processing chunk {i + 1}: {e}")
            continue

    # Combine all results
    if all_results:
        final_results = pd.concat(all_results, axis=0)

        # Quality control
        print(f"\n--- Quality Control ---")
        print(f"Total stations processed: {len(final_results)}")
        print(f"Total metrics calculated: {len(final_results.columns)}")

        # Check for stations with insufficient data
        insufficient_data = final_results[final_results["data_years"] < 2.0]
        if len(insufficient_data) > 0:
            print(f"Warning: {len(insufficient_data)} stations have less than 2 years of data")

        # Save final results in multiple formats
        final_results.to_csv(f"{output_dir}/camels_ru_hydrological_metrics_complete.csv")
        final_results.to_parquet(f"{output_dir}/camels_ru_hydrological_metrics_complete.parquet")

        # Save metadata
        metadata = {
            "processing_date": pd.Timestamp.now().isoformat(),
            "total_stations": len(final_results),
            "total_metrics": len(final_results.columns),
            "data_period": f"{discharge_data.index.min()} to {discharge_data.index.max()}",
            "metric_categories": list(
                set([col.split("_")[0] for col in final_results.columns if "_" in col])
            ),
        }

        pd.Series(metadata).to_json(f"{output_dir}/processing_metadata.json", indent=2)

        print(f"\nFinal results saved to {output_dir}")
        return final_results

    else:
        print("No stations were successfully processed!")
        return pd.DataFrame()


# For demonstration, run on a subset (uncomment next lines to run full dataset)
print("=== USE CASE 5: Production Pipeline (Demo with 20 stations) ===")

# Demo with first 20 stations
demo_stations = discharge_data.columns[:20]
demo_data = discharge_data[demo_stations]

production_results = production_pipeline_full_dataset(
    demo_data, output_dir="../results/demo/", chunk_size=10, save_intermediate=True
)

print(f"\nProduction pipeline completed. Final dataset shape: {production_results.shape}")

# Generate summary statistics
print("\n--- Summary Statistics ---")
summary_stats = {
    "mean_data_years": production_results["data_years"].mean(),
    "stations_with_5plus_years": (production_results["data_years"] >= 5).sum(),
    "stations_with_10plus_years": (production_results["data_years"] >= 10).sum(),
    "mean_cv": production_results["magnitude_ma16"].mean()
    if "magnitude_ma16" in production_results.columns
    else np.nan,
    "stations_processed": len(production_results),
}

for key, value in summary_stats.items():
    print(f"{key}: {value}")

# Display first few rows of final results
production_results.head()

## Data Export and Validation

Export the calculated metrics in various formats and perform validation checks.

In [None]:
# Export functions for different use cases
def export_key_metrics_summary(results_df: pd.DataFrame, output_path: str) -> None:
    """Export a summary table with only key hydrological metrics."""

    # Define key metrics for summary
    key_metrics = [
        "data_years",
        "data_completeness",
        "magnitude_ma01",  # Mean flow
        "magnitude_ma16",  # CV
        "fdc_fdc_slope",  # FDC slope
        "baseflow_bfi",  # BFI
        "extreme_q05_flow",
        "extreme_q95_flow",  # Flow extremes
        "variability_flashiness_index",  # Flashiness
        "timing_hfd_mean",  # Half-flow date
    ]

    # Select available key metrics
    available_metrics = [col for col in key_metrics if col in results_df.columns]
    summary_df = results_df[available_metrics].copy()

    # Add derived metrics
    if "extreme_q05_flow" in summary_df.columns and "extreme_q95_flow" in summary_df.columns:
        summary_df["flow_variability_ratio"] = (
            summary_df["extreme_q05_flow"] / summary_df["extreme_q95_flow"]
        )

    # Export with proper naming
    summary_df.columns = [col.replace("_", " ").title() for col in summary_df.columns]
    summary_df.to_csv(output_path)
    print(f"Key metrics summary exported to {output_path}")


def validate_results(results_df: pd.DataFrame) -> dict:
    """Perform validation checks on calculated metrics."""

    validation_report = {}

    # Check for reasonable ranges
    if "baseflow_bfi" in results_df.columns:
        bfi_out_of_range = ((results_df["baseflow_bfi"] < 0) | (results_df["baseflow_bfi"] > 1)).sum()
        validation_report["bfi_out_of_range"] = bfi_out_of_range

    # Check for extreme CV values
    if "magnitude_ma16" in results_df.columns:
        extreme_cv = (results_df["magnitude_ma16"] > 5).sum()
        validation_report["extreme_cv_count"] = extreme_cv

    # Check data completeness
    missing_data_severe = (results_df["data_completeness"] < 50).sum()
    validation_report["severe_missing_data_count"] = missing_data_severe

    # Check for NaN values in key metrics
    key_metrics_nan = {}
    for col in ["magnitude_ma01", "fdc_fdc_slope", "baseflow_bfi"]:
        if col in results_df.columns:
            key_metrics_nan[col] = results_df[col].isna().sum()

    validation_report["nan_counts"] = key_metrics_nan
    validation_report["total_stations"] = len(results_df)

    return validation_report


# Perform exports and validation on our comprehensive results
if "comprehensive_results" in locals() and len(comprehensive_results) > 0:
    print("=== EXPORT AND VALIDATION ===")

    # Export key metrics summary
    export_key_metrics_summary(comprehensive_results, "../results/key_metrics_summary.csv")

    # Perform validation
    validation_report = validate_results(comprehensive_results)

    print("\nValidation Report:")
    for key, value in validation_report.items():
        print(f"  {key}: {value}")

    # Create metrics catalog
    metrics_catalog = pd.DataFrame(
        {
            "metric_name": comprehensive_results.columns,
            "category": [
                col.split("_")[0] if "_" in col else "basic" for col in comprehensive_results.columns
            ],
            "non_null_count": [
                comprehensive_results[col].notna().sum() for col in comprehensive_results.columns
            ],
            "mean_value": [
                comprehensive_results[col].mean()
                if pd.api.types.is_numeric_dtype(comprehensive_results[col])
                else np.nan
                for col in comprehensive_results.columns
            ],
        }
    )

    metrics_catalog.to_csv("../results/metrics_catalog.csv", index=False)
    print("\nMetrics catalog exported to ../results/metrics_catalog.csv")

    # Display metrics by category
    print("\nMetrics by category:")
    print(metrics_catalog.groupby("category").size().sort_values(ascending=False))

else:
    print("No comprehensive results available for export. Run the comprehensive analysis first.")

## Summary and Next Steps

This notebook demonstrates comprehensive usage of the reorganized hydrological metrics package. The modular design allows for:

### ✅ What We've Accomplished:
1. **Batch Processing**: Efficient calculation of metrics for multiple stations
2. **Flexible Analysis**: Different use cases from quick analysis to comprehensive metrics
3. **Specialized Metrics**: Drought, flood, and regime-specific analysis
4. **Production Pipeline**: Scalable processing for large datasets
5. **Quality Control**: Validation and export capabilities

### 🎯 Key Benefits:
- **Standardized Output**: All functions return DataFrames with gauge_id as index
- **Error Handling**: Robust processing that continues despite individual station failures
- **Performance**: Optimized calculations with progress tracking
- **Flexibility**: Easy to customize for specific research questions
- **Scalability**: Chunked processing for large datasets

### 📊 Typical Output Structure:
```
DataFrame with gauge_id as index and columns like:
- magnitude_* : Flow magnitude metrics (mean, median, CV, etc.)
- fdc_* : Flow Duration Curve metrics (slope, percentiles, etc.)
- extreme_* : Extreme flow metrics (floods, droughts, quantiles)
- timing_* : Temporal metrics (seasonal patterns, timing of extremes)
- variability_* : Variability metrics (flashiness, autocorrelation, etc.)
- baseflow_* : Base flow metrics (BFI, baseflow statistics)
- data_* : Data quality metrics (years, completeness, etc.)
```

### 🚀 Recommended Workflow:
1. **Quick Analysis** → Get overview of all stations
2. **Quality Filtering** → Select high-quality stations
3. **Comprehensive Analysis** → Calculate detailed metrics for selected stations
4. **Specialized Analysis** → Focus on specific phenomena (droughts, floods)
5. **Export & Archive** → Save results in multiple formats

In [None]:
# Final demonstration: One-liner for comprehensive analysis
print("=== ONE-LINER COMPREHENSIVE ANALYSIS ===")
print("For users who want everything calculated with minimal code:")
print()
print("# Single function call to get comprehensive metrics for all stations:")
print("comprehensive_metrics = calculate_hydrological_statistics_batch(")
print("    discharge_data=discharge_data,")
print("    min_data_years=2.0,")
print("    include_bfi=True,")
print("    include_detailed_metrics=True")
print(")")
print()
print("# Result: DataFrame with gauge_id as index and ~100+ hydrological metrics as columns")
print("# Ready for further analysis, machine learning, or export")

# Example of what the final DataFrame looks like
if "comprehensive_results" in locals() and len(comprehensive_results) > 0:
    print(f"\nExample output shape: {comprehensive_results.shape}")
    print("Sample metrics preview:")
    sample_cols = [
        col
        for col in comprehensive_results.columns
        if any(x in col for x in ["ma01", "bfi", "fdc_slope", "q05", "q95"])
    ][:5]
    if sample_cols:
        print(comprehensive_results[sample_cols].head(3).round(3))

print("\n" + "=" * 60)
print("NOTEBOOK COMPLETE - Ready for CAMELS-RU Production Analysis!")
print("=" * 60)