In [6]:
import pandas as pd
import numpy as np
import pymannkendall as mk
import os
from collections import defaultdict, OrderedDict
from typing import Dict, List, Tuple

# ======================
# DATA PROCESSING MODULE
# ======================

def combine_fluxnet_datasets(ds1_path: str, ds2_path: str, output_path: str) -> None:
    """
    Combine two FLUXNET datasets, prioritizing non-missing values (-9999) from ds2
    
    Args:
        ds1_path: Path to first dataset (typically older data)
        ds2_path: Path to second dataset (typically newer data)
        output_path: Path to save combined dataset
    """
    # Core variables to include (TIMESTAMP must be first)
    core_vars = [
        'TIMESTAMP',
        # Base variables
        'LE_F_MDS', 'H_F_MDS', 'G_F_MDS', 'NETRAD', 'TA_F',
        'TS_F_MDS_1', 'VPD_F', 'P_F', 'SWC_F_MDS_1', 'GPP_NT_VUT_MEAN',
        # Corrected versions
        'LE_CORR', 'H_CORR',
        # Quality control flags
        'LE_F_MDS_QC', 'H_F_MDS_QC', 'G_F_MDS_QC', 'TA_F_QC',
        'TS_F_MDS_1_QC', 'VPD_F_QC', 'P_F_QC', 'SWC_F_MDS_1_QC',
    ]

    # Load datasets with only needed columns
    ds1 = pd.read_csv(ds1_path, usecols=lambda x: x in core_vars)
    ds2 = pd.read_csv(ds2_path, usecols=lambda x: x in core_vars)

    # Convert TIMESTAMP to datetime
    for df in [ds1, ds2]:
        df['TIMESTAMP'] = pd.to_datetime(df['TIMESTAMP'], format='%Y%m')

    # Process overlapping dates - prioritize non -9999 values from ds2
    overlap_dates = set(ds1['TIMESTAMP']).intersection(set(ds2['TIMESTAMP']))
    
    for date in overlap_dates:
        for var in core_vars[1:]:  # Skip TIMESTAMP
            if var in ds1.columns and var in ds2.columns:
                val1 = ds1.loc[ds1['TIMESTAMP'] == date, var].values[0]
                val2 = ds2.loc[ds2['TIMESTAMP'] == date, var].values[0]
                
                if val2 != -9999:  # Prefer ds2 value if not missing
                    ds1.loc[ds1['TIMESTAMP'] == date, var] = val2

    # Remove overlapping dates from ds2 and concatenate
    ds2 = ds2[~ds2['TIMESTAMP'].isin(overlap_dates)]
    combined_df = pd.concat([ds1, ds2]).sort_values('TIMESTAMP')
    
    # Convert TIMESTAMP back to original format and save
    combined_df['TIMESTAMP'] = combined_df['TIMESTAMP'].dt.strftime('%Y%m')
    combined_df.to_csv(output_path, index=False)
    
    print(f"Combined data saved to {output_path}")
    print(f"Time coverage: {combined_df['TIMESTAMP'].min()} to {combined_df['TIMESTAMP'].max()}")

# ======================
# COMPLETENESS ANALYSIS
# ======================

def validate_variable(df: pd.DataFrame, var: str) -> List[int]:
    """
    Identify complete years for a variable based on QC flags
    
    Args:
        df: Input DataFrame with data
        var: Variable name to validate
    
    Returns:
        List of years with complete data (12 months)
    """
    qc_col = var + '_QC' if var + '_QC' in df.columns else None
    
    if qc_col:
        valid_mask = (df[qc_col] > 0.8) & (df[var] != -9999.0)
    else:
        valid_mask = (df[var] != -9999.0)
    
    valid_data = df[valid_mask].copy()
    valid_year_counts = valid_data['Year'].value_counts()
    return valid_year_counts[valid_year_counts == 12].index.tolist()

def get_complete_years(df: pd.DataFrame, variables: List[str]) -> List[int]:
    """
    Find years where all specified variables have complete data
    
    Args:
        df: Input DataFrame with data
        variables: List of variables to check
    
    Returns:
        Sorted list of years with complete data for all variables
    """
    common_years = set(df['Year'].unique())
    for var in variables:
        valid_years = validate_variable(df, var)
        common_years.intersection_update(valid_years)
    return sorted(common_years)

def identify_problematic_variable(df: pd.DataFrame, variables: List[str]) -> str:
    """
    Find which variable reduces completeness the most
    
    Args:
        df: Input DataFrame with data
        variables: List of variables to evaluate
    
    Returns:
        Name of the most problematic variable
    """
    base_years = get_complete_years(df, variables)
    var_impact = {}
    
    for var in variables:
        test_vars = [v for v in variables if v != var]
        test_years = get_complete_years(df, test_vars)
        var_impact[var] = len(test_years) - len(base_years)
    
    return max(var_impact.items(), key=lambda x: x[1])[0]


def save_variable_year_availability(df: pd.DataFrame, variables: List[str], output_file: str) -> None:
    """
    Save availability of each variable (years with valid data after QC and missing check)

    Args:
        df: DataFrame with Year, QC, and variable columns
        variables: List of variables to check
        output_file: Path to save CSV
    """
    availability_records = []

    for var in variables:
        valid_years = validate_variable(df, var)
        availability_records.append({
            "Variable": var,
            "Valid_Years": ", ".join(map(str, sorted(valid_years))),
            "Count_Valid_Years": len(valid_years)
        })

    availability_df = pd.DataFrame(availability_records)
    availability_df.to_csv(output_file, index=False)
    print(f"Variable availability saved to {output_file}")


def save_recursive_year_availability(df: pd.DataFrame, variables: List[str], output_file: str) -> None:
    """
    Save recursive completeness progression:
    Start with all variables, list available years,
    then remove the most problematic variable and repeat.

    Args:
        df: DataFrame with Year and variables
        variables: List of variables to analyze
        output_file: Path to save CSV
    """
    progression_records = []
    current_vars = variables.copy()

    while len(current_vars) > 1:
        complete_years = get_complete_years(df, current_vars)
        progression_records.append({
            "Step": len(progression_records) + 1,
            "Variables_Included": ", ".join(current_vars),
            "Count_Variables": len(current_vars),
            "Years_Available": ", ".join(map(str, complete_years)),
            "Count_Years": len(complete_years)
        })

        if len(current_vars) > 2:
            problematic_var = identify_problematic_variable(df, current_vars)
            current_vars.remove(problematic_var)
        else:
            break

    progression_df = pd.DataFrame(progression_records)
    progression_df.to_csv(output_file, index=False)
    print(f"Recursive year availability progression saved to {output_file}")


# ======================
# YEARLY ANALYSIS
# ======================

def calculate_yearly_means(df: pd.DataFrame, variables: List[str], years: List[int]) -> pd.DataFrame:
    """
    Calculate yearly means for complete years only
    
    Args:
        df: Input DataFrame with data
        variables: Variables to calculate means for
        years: Years to include
    
    Returns:
        DataFrame with yearly means
    """
    if not years:
        return pd.DataFrame()
    
    complete_data = df[df['Year'].isin(years)]
    return complete_data.groupby('Year')[variables].mean()

def generate_recursive_means(df: pd.DataFrame, variables: List[str]) -> Dict[str, pd.DataFrame]:
    """
    Generate yearly means by recursively removing problematic variables
    
    Args:
        df: Input DataFrame with data
        variables: List of variables to analyze
    
    Returns:
        Dictionary of DataFrames with different variable combinations
    """
    results = OrderedDict()
    current_vars = variables.copy()
    
    # Initial run with all variables
    complete_years = get_complete_years(df, current_vars)
    yearly_means = calculate_yearly_means(df, current_vars, complete_years)
    results["All Variables"] = yearly_means
    
    # Recursively remove most problematic variable
    while len(current_vars) > 2:
        problematic_var = identify_problematic_variable(df, current_vars)
        current_vars.remove(problematic_var)
        
        complete_years = get_complete_years(df, current_vars)
        yearly_means = calculate_yearly_means(df, current_vars, complete_years)
        results[f"Excluded {problematic_var}"] = yearly_means
    
    return results

# ======================
# SEASONAL ANALYSIS
# ======================

def get_season_months(season: str) -> List[int]:
    """Return months for each season (with DJF year adjustment)"""
    return {
        'DJF': [12, 1, 2],  # December, January, February
        'MAM': [3, 4, 5],   # March, April, May
        'JJA': [6, 7, 8],   # June, July, August
        'SON': [9, 10, 11]  # September, October, November
    }[season]

def get_complete_seasons(df: pd.DataFrame, variables: List[str], season: str) -> List[int]:
    """
    Get seasons with complete data for all variables
    
    Args:
        df: Input DataFrame
        variables: Variables to check
        season: Season code (DJF, MAM, JJA, SON)
    
    Returns:
        List of complete season years
    """
    season_months = get_season_months(season)
    df_season = df[df['Month'].isin(season_months)].copy()
    
    # Adjust year for winter season (DJF)
    if season == 'DJF':
        df_season['Season_Year'] = df_season['Year']
        df_season.loc[df_season['Month'] == 12, 'Season_Year'] += 1
    else:
        df_season['Season_Year'] = df_season['Year']
    
    # Find seasons with all months present
    month_counts = df_season.groupby(['Season_Year'])['Month'].nunique()
    complete_seasons = month_counts[month_counts == 3].index.tolist()
    
    # Check variable completeness
    valid_seasons = []
    for season_year in complete_seasons:
        season_data = df_season[df_season['Season_Year'] == season_year]
        valid = True
        for var in variables:
            if var+'_QC' in df.columns:
                var_data = season_data[(season_data[var+'_QC'] > 0.8) & (season_data[var] != -9999.0)]
            else:
                var_data = season_data[season_data[var] != -9999.0]
            if len(var_data) < 3:  # Not all months valid
                valid = False
                break
        if valid:
            valid_seasons.append(season_year)
    
    return sorted(valid_seasons)

def generate_recursive_seasonal_means(df: pd.DataFrame, variables: List[str], season: str) -> Dict[str, pd.DataFrame]:
    """
    Generate seasonal means by recursively removing problematic variables
    
    Args:
        df: Input DataFrame
        variables: Variables to analyze
        season: Season code (DJF, MAM, JJA, SON)
    
    Returns:
        Dictionary of DataFrames with different variable combinations
    """
    results = OrderedDict()
    current_vars = variables.copy()
    
    # Initial run with all variables
    complete_seasons = get_complete_seasons(df, current_vars, season)
    seasonal_means = calculate_seasonal_means(df, current_vars, season, complete_seasons)
    results["All Variables"] = seasonal_means
    
    # Recursively remove problematic variables
    while len(current_vars) > 2:
        problematic_var = identify_problematic_variable_season(df, current_vars, season)
        if not problematic_var:
            break
            
        current_vars.remove(problematic_var)
        complete_seasons = get_complete_seasons(df, current_vars, season)
        seasonal_means = calculate_seasonal_means(df, current_vars, season, complete_seasons)
        results[f"Excluded {problematic_var}"] = seasonal_means
    
    return results

def calculate_seasonal_means(df: pd.DataFrame, variables: List[str], season: str, season_years: List[int]) -> pd.DataFrame:
    """
    Calculate seasonal means for complete seasons
    
    Args:
        df: Input DataFrame
        variables: Variables to calculate means for
        season: Season code (DJF, MAM, JJA, SON)
        season_years: List of years with complete seasonal data
    
    Returns:
        DataFrame with seasonal means
    """
    if not season_years:
        return pd.DataFrame()
    
    season_months = get_season_months(season)
    df_season = df[df['Month'].isin(season_months)].copy()
    
    # Adjust year for winter season (DJF)
    if season == 'DJF':
        df_season['Season_Year'] = df_season['Year']
        df_season.loc[df_season['Month'] == 12, 'Season_Year'] += 1
    else:
        df_season['Season_Year'] = df_season['Year']
    
    # Filter for complete seasons
    df_season = df_season[df_season['Season_Year'].isin(season_years)]
    
    # Calculate seasonal means
    seasonal_means = df_season.groupby('Season_Year')[variables].mean()
    seasonal_means['Season'] = season
    return seasonal_means.reset_index().set_index(['Season_Year', 'Season'])

def identify_problematic_variable_season(df: pd.DataFrame, variables: List[str], season: str) -> str:
    """
    Find which variable reduces seasonal completeness the most
    
    Args:
        df: Input DataFrame
        variables: Variables to evaluate
        season: Season code (DJF, MAM, JJA, SON)
    
    Returns:
        Name of the most problematic variable
    """
    base_seasons = get_complete_seasons(df, variables, season)
    var_impact = {}
    
    for var in variables:
        test_vars = [v for v in variables if v != var]
        test_seasons = get_complete_seasons(df, test_vars, season)
        var_impact[var] = len(test_seasons) - len(base_seasons)
    
    return max(var_impact.items(), key=lambda x: x[1])[0] if var_impact else None

# ======================
# TREND ANALYSIS
# ======================

def calculate_trend_stats(series: pd.Series) -> Dict[str, float]:
    """
    Calculate Mann-Kendall trend statistics
    
    Args:
        series: Time series data
    
    Returns:
        Dictionary with slope, p-value, and period info
    """
    if series.notna().sum() < 10:
        return None
    
    start_year = int(series.index.min())
    end_year = int(series.index.max())
    full_index = np.arange(start_year, end_year + 1, 1)
    series = series.reindex(full_index)
    
    try:
        result = mk.original_test(series.values)
        return {
            'slope': result.slope,
            'p_value': result.p,
            'period': f"{start_year}-{end_year} ({series.notna().sum()})"
        }
    except Exception:
        return None

def build_trend_tables(sheets: Dict[str, pd.DataFrame]) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Build p-value and slope tables from analysis results
    
    Args:
        sheets: Dictionary of DataFrames from analysis
    
    Returns:
        Tuple of (p-value table, slope table)
    """
    year_groups = defaultdict(list)
    
    # Sort sheets by number of variables (ascending)
    sorted_sheets = sorted(sheets.items(),
                         key=lambda x: len([col for col in x[1].columns if col not in ['Season_Year', 'Season']]),
                         reverse=False)

    for sheet_name, data in sorted_sheets:
        if 'Season_Year' in data.columns:
            data = data.set_index('Season_Year')
        elif 'Year' in data.columns:
            data = data.set_index('Year')

        vars_included = [col for col in data.columns if col not in ['Season', 'Year']]
        if len(vars_included) < 2:
            continue

        # Calculate stats for all variables
        stats_dict = {}
        period = None
        for var in vars_included:
            stats = calculate_trend_stats(data[var].dropna())
            if stats:
                stats_dict[var] = {
                    'p_value': stats['p_value'],
                    'slope': stats['slope']
                }
                period = stats['period']
        
        if stats_dict:
            year_groups[period].append({
                'vars': vars_included,
                'stats': stats_dict
            })
    
    # Build final tables
    p_table, slope_table = [], []
    
    for period, combinations in year_groups.items():
        max_vars = max(len(c['vars']) for c in combinations)
        best_comb = next(c for c in combinations if len(c['vars']) == max_vars)
        
        p_row = {'Years': period, 'Variable Combination': ', '.join(best_comb['vars'])}
        s_row = {'Years': period, 'Variable Combination': ', '.join(best_comb['vars'])}
        
        for var in best_comb['vars']:
            p_row[var] = best_comb['stats'][var]['p_value']
            s_row[var] = best_comb['stats'][var]['slope']
        
        p_table.append(p_row)
        slope_table.append(s_row)
    
    return pd.DataFrame(p_table), pd.DataFrame(slope_table)

# ======================
# MAIN EXECUTION
# ======================

def run_full_analysis(station_code: str = "FI-Hyy"):
    """Run complete analysis pipeline with outputs in station-specific folder"""

    # Build folder path automatically from station code
    station_folder_name = station_code.split("-")[1]  # e.g., "FI-Bra" -> "Bra"
    base_dir = os.path.join(r"C:\Deepak\stations\MM\Final", station_folder_name)

    # Make sure output folder exists
    os.makedirs(base_dir, exist_ok=True)

    # File paths for inputs
    ds1_path = os.path.join(base_dir, f'FLX_{station_code}_FLUXNET2015_FULLSET_MM_1996-2020_beta-3.csv')
    ds2_path = os.path.join(base_dir, f'ICOSETC_{station_code}_FLUXNET_MM_L2.csv')

    # File path for combined dataset
    combined_path = os.path.join(base_dir, f"{station_code}_Fluxnet_Combined.csv")

    # Combine datasets
    combine_fluxnet_datasets(ds1_path, ds2_path, combined_path)

    # Load and prepare data
    df = pd.read_csv(combined_path)
    df['TIMESTAMP'] = pd.to_datetime(df['TIMESTAMP'], format='%Y%m')
    df['Year'] = df['TIMESTAMP'].dt.year
    df['Month'] = df['TIMESTAMP'].dt.month

    # Get variable columns (excluding QC and metadata)
    variables = [col for col in df.columns
                 if not col.endswith('_QC')
                 and col not in ['TIMESTAMP', 'Year', 'Month']]

    # Save variable availability reports
    save_variable_year_availability(df, variables, os.path.join(base_dir, f"{station_code}_variable_year_availability.csv"))
    save_recursive_year_availability(df, variables, os.path.join(base_dir, f"{station_code}_recursive_year_availability.csv"))

    # Yearly analysis
    yearly_results = generate_recursive_means(df, variables)
    yearly_file = os.path.join(base_dir, f'strict_recursive_yearly_means_{station_code}.xlsx')
    with pd.ExcelWriter(yearly_file) as writer:
        for sheet_name, data in yearly_results.items():
            if not data.empty:
                data.to_excel(writer, sheet_name=sheet_name[:31])

    # Seasonal analysis
    seasons = ['DJF', 'MAM', 'JJA', 'SON']
    for season in seasons:
        seasonal_results = generate_recursive_seasonal_means(df, variables, season)
        seasonal_file = os.path.join(base_dir, f"strict_recursive_seasonal_means_{station_code}_{season}.xlsx")
        with pd.ExcelWriter(seasonal_file) as writer:
            for sheet_name, data in seasonal_results.items():
                if not data.empty:
                    data.reset_index().to_excel(writer, sheet_name=sheet_name[:31], index=False)

    # Yearly trend analysis
    yearly_sheets = pd.read_excel(yearly_file, sheet_name=None)
    p_table, slope_table = build_trend_tables(yearly_sheets)
    with pd.ExcelWriter(os.path.join(base_dir, f'progressive_trend_tables_{station_code}_yearly.xlsx')) as writer:
        p_table.to_excel(writer, sheet_name='P-values', index=False)
        slope_table.to_excel(writer, sheet_name='Slopes', index=False)

    # Seasonal trend analysis
    for season in seasons:
        seasonal_file = os.path.join(base_dir, f"strict_recursive_seasonal_means_{station_code}_{season}.xlsx")
        try:
            seasonal_sheets = pd.read_excel(seasonal_file, sheet_name=None)
            p_table, slope_table = build_trend_tables(seasonal_sheets)
            with pd.ExcelWriter(os.path.join(base_dir, f'progressive_trend_tables_{station_code}_{season}.xlsx')) as writer:
                p_table.to_excel(writer, sheet_name='P-values', index=False)
                slope_table.to_excel(writer, sheet_name='Slopes', index=False)
        except FileNotFoundError:
            print(f"Warning: No data file found for {season} season")

if __name__ == "__main__":
    run_full_analysis("BE-Bra")
    run_full_analysis("FI-Hyy")
    run_full_analysis("DE-Tha")


Combined data saved to C:\Deepak\stations\MM\Final\Bra\BE-Bra_Fluxnet_Combined.csv
Time coverage: 199601 to 202412
Variable availability saved to C:\Deepak\stations\MM\Final\Bra\BE-Bra_variable_year_availability.csv
Recursive year availability progression saved to C:\Deepak\stations\MM\Final\Bra\BE-Bra_recursive_year_availability.csv
Combined data saved to C:\Deepak\stations\MM\Final\Hyy\FI-Hyy_Fluxnet_Combined.csv
Time coverage: 199601 to 202412
Variable availability saved to C:\Deepak\stations\MM\Final\Hyy\FI-Hyy_variable_year_availability.csv
Recursive year availability progression saved to C:\Deepak\stations\MM\Final\Hyy\FI-Hyy_recursive_year_availability.csv
Combined data saved to C:\Deepak\stations\MM\Final\Tha\DE-Tha_Fluxnet_Combined.csv
Time coverage: 199601 to 202412
Variable availability saved to C:\Deepak\stations\MM\Final\Tha\DE-Tha_variable_year_availability.csv
Recursive year availability progression saved to C:\Deepak\stations\MM\Final\Tha\DE-Tha_recursive_year_availabil

In [None]:
import numpy as np
import pymannkendall as mk


data = np.array([
    3.453767083, 3.77697995, 4.0161395, 4.404460742, 4.260551275, 4.174563833,
    4.48972, 3.382869083, 3.7710225, 3.832464992, 3.558067917, 2.434592333,
    4.369564583, 4.369519667, 3.97802885, 4.14561475, 3.896000667, 3.922338,
    4.462562417, 4.30264775, 4.69167225, 4.60689075, 3.749627583, 4.18201275,
    2.349085, 4.921183667, 4.48183975, 4.792183417
])

# Perform Mann-Kendall test
result = mk.original_test(data)


result.p


np.float64(0.050477473834512177)

In [None]:

data2 = np.array([
    33.071961, 27.47026983, 24.3015225, 27.10003867, 26.201129, 20.44217867,
    21.03798275, 41.29880267, 28.33163833, 29.40995273, 28.45507667, 28.644646,
    26.62623083, 27.03697333, 26.99581083, 34.16811217, 31.13212267, 24.44250333,
    26.21327083, 32.5066, 27.1318475, 27.85178525, 44.43696167, 34.50343833,
    -14.1410875, 34.15065167, 46.66989083, 36.96086583
])


result1 = mk.original_test(data2)

# Extract p-value
result1.p



np.float64(0.050477473834512177)