#### Main changes
- Using selfMask() to avoid empty pixels (a lot of bands are sparse)
- Skipping the validation step – panderas should be fast but there is some temp schema generation that takes time 
- Using high volume end point and concurrent processing
- Using reduceRegions instead of mapped reduceRegion - a chunk of code for choosing ha or percent etc is based on using reduceRegion and it also allowed to skip
- Skipping the use of points to get the admin details (country and level 1 info) and water_flag (should be based on image but was using vector admin still)


In [None]:
import ee

# Reset Earth Engine completely
ee.Reset()


In [None]:
# Earth Engine and Common Libraries
import ee
from pathlib import Path

# Authenticate and initialize Earth Engine
try:
    ee.Initialize(opt_url='https://earthengine-highvolume.googleapis.com')  # Try to use existing credentials first
except Exception:
    ee.Authenticate()
    ee.Initialize(opt_url='https://earthengine-highvolume.googleapis.com')

In [None]:
print("EE Data Base URL:", ee.data._cloud_api_base_url)
print("EE API Base URL:", ee.data._api_base_url)

# Check if using standard endpoint
if 'highvolume' in str(ee.data._cloud_api_base_url):
    print("✅ Using HIGH-VOLUME endpoint")
else:
    print("❌ Using STANDARD endpoint")

In [None]:
# !pip install --upgrade --pre openforis-whisp

In [None]:
combined_reducer = ee.Reducer.sum().combine(ee.Reducer.median(),sharedInputs=True)

In [None]:
import openforis_whisp as whisp

In [None]:
import ee
import geopandas as gpd
import pandas as pd
import time
import threading
from queue import Queue
import logging
from typing import List, Optional, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import openforis_whisp as whisp
import tempfile
import os
import sys

from openforis_whisp.parameters.config_runtime import (
    plot_id_column,
    admin_1_column, 
    iso3_country_column, 
    iso2_country_column, 
    water_flag,
    geometry_type_column, 
    geometry_area_column,
    centroid_x_coord_column, 
    centroid_y_coord_column
)

import sys
from pathlib import Path

# Add project root to sys.path
sys.path.append(str(Path.cwd().parent))

# Import the lookup dictionary
from src.openforis_whisp.parameters.lookup_gaul1_admin import (
    lookup_dict
)

In [None]:

# Configure logging ONCE - avoid duplicate handlers
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

logging.basicConfig(
    level=logging.WARNING, 
    format='%(levelname)s: %(message)s',
    stream=sys.stdout,
    force=True
    )
logger = logging.getLogger("whisp-batch")

logging.getLogger('googleapiclient.discovery_cache').setLevel(logging.ERROR)
logging.getLogger('googleapiclient').setLevel(logging.WARNING)

EE_MAX_CONCURRENT = 10
EE_FEATURES_PER_BATCH = 25
MAX_RETRIES = 3

def join_admin_codes(df, lookup_dict, id_col='id_col'):
    """
    Join admin names and ISO3 codes to a DataFrame using a lookup dictionary.
    Output columns are named using config_runtime.py variables.
    """
    from openforis_whisp.parameters.config_runtime import (
        admin_1_column, 
        iso3_country_column, 
        iso2_country_column
    )
    
    lookup_df = pd.DataFrame.from_dict(lookup_dict, orient='index')
    lookup_df.index.name = 'gaul1_code'
    lookup_df = lookup_df.reset_index()
    # Ensure gaul1_code is int32 for join
    lookup_df['gaul1_code'] = lookup_df['gaul1_code'].fillna(-9999).astype('int32')
    # Ensure df[id_col] is int32 for join
    df = df.copy()
    df['id_col_int'] = df[id_col].fillna(-9999).astype('int32')
    merged_df = df.merge(lookup_df, left_on="id_col_int", right_on='gaul1_code', how='left')
    merged_df = merged_df.rename(columns={
        'gaul1_name': admin_1_column,
        'iso3_code': iso3_country_column,
        'iso2_code': iso2_country_column
    })
    merged_df = merged_df.drop(columns=['gaul1_code', "gaul0_name", "id_col_int",id_col], errors='ignore')
    return merged_df

# Usage:
# result_df = join_admin_codes(example_df, lookup_dict, id_col='id_col')

def ee_extract_centroid_and_geomtype(fc, x_col='centroid_x', y_col='centroid_y', type_col='geometry_type',max_error=1):
    def add_centroid_and_geomtype(feature):
        centroid = feature.geometry().centroid(max_error)
        coords = centroid.coordinates()
        # Round coordinates to 6 decimal places (Earth Engine's round only takes 1 argument)
        x = ee.Number(coords.get(0)).multiply(1e6).round().divide(1e6)
        y = ee.Number(coords.get(1)).multiply(1e6).round().divide(1e6)
        return feature.set({
            x_col: x,
            y_col: y,
            type_col: feature.geometry().type()
        })
    return fc.map(add_centroid_and_geomtype)



# Function to extract centroid, geometry type, and coordinates from a GeoDataFrame using GeoPandas (faster for local data)

def gpd_extract_centroid_and_geomtype(
    gdf,
    x_col='centroid_x',
    y_col='centroid_y',
    type_col='geometry_type',
    external_id_col=None,
    return_attributes_only=False
):
    """
    Adds centroid coordinate values and geometry type columns to a GeoDataFrame.
    Optionally returns only attributes (no geometry columns) and preserves an external ID column.
    Does NOT add a centroid geometry column, only the values for lon/lat/type.
    Args:
        gdf (GeoDataFrame): Input GeoDataFrame.
        x_col (str): Name for centroid x column.
        y_col (str): Name for centroid y column.
        type_col (str): Name for geometry type column.
        external_id_col (str, optional): Name of external ID column to preserve in output.
        return_attributes_only (bool, optional): If True, returns a pandas DataFrame with only attributes (no geometry columns).
    Returns:
        GeoDataFrame or DataFrame: Copy with new columns for centroid_x, centroid_y, and geometry_type, optionally only attributes.
    """
    gdf = gdf.copy()
    # Calculate centroid coordinates as values only, not as a geometry column
    centroid_points = gdf.geometry.centroid
    gdf[x_col] = centroid_points.x.round(6)
    gdf[y_col] = centroid_points.y.round(6)
    gdf[type_col] = gdf.geometry.geom_type
    cols = [x_col, y_col, type_col]
    if external_id_col and external_id_col in gdf.columns:
        cols = [external_id_col] + cols
    if return_attributes_only:
        df = gdf[cols].reset_index(drop=True)
        return df
    return gdf

# Example usage:
# gdf = gpd.read_file(GEOJSON_EXAMPLE_FILEPATH)
# gdf_with_centroids = gpd_extract_centroid_and_geomtype(gdf, return_attributes_only=True)
# print(gdf_with_centroids[["centroid_x", "centroid_y", "geometry_type"]].head())

def format_stats_dataframe(
    df,
    area_col='Area_sum',
    decimal_places=2,
    unit_type='ha',
    stats_unit_type_column='Unit',
    strip_suffix='_sum',
    remove_columns=True,
    remove_columns_suffix='_median',
    convert_water_flag=True,
    water_flag_column='In_waterbody_sum',
    water_flag_threshold=0.5,
    sort_column="plotId" 
):
    """Flexible stats formatting for DataFrame columns.

    - Converts columns ending with `strip_suffix` (default '_sum') to hectares or percent.
    - Removes columns ending with `remove_columns_suffix` (default '_median') if `remove_columns` is True.
    - Optionally converts a water-flag stat into a boolean column based on the threshold compared to `area_col`.
    - Strips the `strip_suffix` from produced stat column names (so 'Cocoa_sum' -> 'Cocoa').
    - Fills `stats_unit_type_column` with `unit_type` for every row.

    Returns a new DataFrame (copy) with conversions applied. Helper sub-functions are used for clarity
    and to avoid fragmenting the original DataFrame (we build new columns and concat once).
    """
    # Helper: find stat columns that end with the strip_suffix (and are not the area_col)
    def _collect_stat_columns(columns, strip_suffix, area_col):
        cols = [c for c in columns if c.endswith(strip_suffix) and c != area_col]
        return cols

    # Helper: drop columns with a given suffix
    def _drop_suffix_columns(df, suffix):
        if suffix is None or suffix == '':
            return df
        return df.loc[:, ~df.columns.str.endswith(suffix)]

    # Helper: build converted stats (returns DataFrame of new columns indexed same as df)
    def _build_converted_stats(df, stat_cols, area_col, unit_type, decimal_places, strip_suffix):
        area = df[area_col].replace(0, float('nan'))
        new = {}
        for col in stat_cols:
            base = col[:-len(strip_suffix)] if strip_suffix and col.endswith(strip_suffix) else col
            if unit_type == 'ha':
                # value is in whatever units the sum uses (ee outputs square meters) -> convert to hectares
                # (user earlier used divide by 10000 pattern)
                new[base] = (df[col] / 10000).round(decimal_places)
            elif unit_type == 'percent':
                new[base] = ((df[col] / area) * 100).round(decimal_places)
            else:
                # unknown unit type: just copy the raw sums
                new[base] = df[col].round(decimal_places)
        df[area_col] = (df[area_col]/ 10000).round(decimal_places)
        return pd.DataFrame(new, index=df.index)

    # Helper: convert water flag stat (if present) into bool by thresholding water_area / total_area
    def _apply_water_flag(df, water_flag_column, strip_suffix, area_col, threshold):
        # possible names for water stat: exact provided name, name+suffix
        candidates = []
        if water_flag_column in df.columns:
            candidates.append(water_flag_column)
        suffixed = water_flag_column + strip_suffix if strip_suffix else None
        if suffixed and suffixed in df.columns:
            candidates.append(suffixed)
        # also check generic 'water' candidates
        if 'water' + strip_suffix in df.columns:
            candidates.append('water' + strip_suffix)
        if not candidates:
            # nothing to do
            return df
        # pick first available candidate
        water_col = candidates[0]
        total_area = df[area_col].replace(0, float('nan'))
        # compute ratio
        ratio = df[water_col] / total_area
        df[water_flag_column] = (ratio > threshold).astype(bool)
        return df

    # 1) Work on a shallow copy to avoid mutating caller inplace accidentally
    df = df.copy()

    # 2) Optionally drop median (or other) columns
    if remove_columns and remove_columns_suffix:
        df = _drop_suffix_columns(df, remove_columns_suffix)

    # 3) Collect stat columns to convert (those ending with strip_suffix and not equal to area_col)
    stat_cols = _collect_stat_columns(df.columns, strip_suffix, area_col)

    # 4) Build converted stats DataFrame (these will have suffix removed as column names)
    if stat_cols:
        converted_stats_df = _build_converted_stats(df, stat_cols, area_col, unit_type, decimal_places, strip_suffix)
    else:
        converted_stats_df = pd.DataFrame(index=df.index)

    # 5) Remove original stat columns (the ones with strip_suffix) from df (but keep area_col)
    df = df.loc[:, [c for c in df.columns if not (c.endswith(strip_suffix) and c != area_col)]]

    # 6) Concatenate converted stats into df in one go to avoid fragmentation
    if not converted_stats_df.empty:
        df = pd.concat([df, converted_stats_df], axis=1)

    # 7) Fill stats unit type column
    df[stats_unit_type_column] = unit_type

    # 8) Optionally convert water flag to boolean
    if convert_water_flag:
        df = _apply_water_flag(df, water_flag_column, strip_suffix, area_col, water_flag_threshold)

    # 9) rename area_col by stripping suffix from area_col
    area_col_stripped = area_col[:-len(strip_suffix)] if area_col.endswith(strip_suffix) else area_col    
    df.rename(columns={area_col:area_col_stripped},inplace=True)

    # 10) "
    # reorder by plotId column if present
    df = df.sort_values(sort_column).reset_index(drop=True) if sort_column in df.columns else df

    # 11) Defragment final DataFrame and return
    return df.copy()


def clean_geodataframe(gdf: gpd.GeoDataFrame, remove_nulls: bool = True, fix_invalid: bool = True) -> gpd.GeoDataFrame:
    """Validate and optionally clean a GeoDataFrame's geometries."""
    if remove_nulls:
        null_count = gdf.geometry.isna().sum()
        if null_count > 0:
            print(f"⚠️  Found {null_count} null geometries - removing...", flush=True)
            gdf = gdf[~gdf.geometry.isna()]
    if fix_invalid:
        valid_count = gdf.geometry.is_valid.sum()
        invalid_count = len(gdf) - valid_count
        if invalid_count > 0:
            print(f"⚠️  Found {invalid_count} invalid geometries - fixing...", flush=True)
            from shapely.validation import make_valid
            gdf['geometry'] = gdf['geometry'].apply(lambda g: make_valid(g) if g and not g.is_valid else g)
    print(f"✅ Validation complete. {len(gdf):,} geometries ready.", flush=True)
    return gdf

def batch_geodataframe(gdf: gpd.GeoDataFrame, batch_size: int) -> List[gpd.GeoDataFrame]:
    """Split a GeoDataFrame into batches of given size."""
    return [gdf.iloc[i:i+batch_size] for i in range(0, len(gdf), batch_size)]

def convert_batch_to_ee(batch_gdf: gpd.GeoDataFrame) -> ee.FeatureCollection:
    """Convert a batch GeoDataFrame to an Earth Engine FeatureCollection using whisp."""
    temp_fd, temp_geojson_path = tempfile.mkstemp(suffix='.geojson', text=True)
    try:
        os.close(temp_fd)
        batch_gdf.to_file(temp_geojson_path, driver='GeoJSON')
        fc = whisp.convert_geojson_to_ee(temp_geojson_path)
        return fc
    finally:
        time.sleep(0.1)
        if os.path.exists(temp_geojson_path):
            try:
                os.unlink(temp_geojson_path)
            except OSError as cleanup_error:
                logger.warning(f"Could not delete temp file {temp_geojson_path}: {cleanup_error}")

def process_ee_feature_collection(feature_collection: ee.FeatureCollection, whisp_image: ee.Image, reducer: ee.Reducer, batch_idx: int, max_retries: int = MAX_RETRIES) -> pd.DataFrame:
    """Process an EE FeatureCollection with retry logic and return a DataFrame."""
    for attempt in range(max_retries):
        try:
            results = whisp_image.reduceRegions(
                collection=feature_collection,
                reducer=reducer,
                scale=10
            )
            df_result = whisp.convert_ee_to_df(results)
            return df_result
        except ee.EEException as e:
            error_msg = str(e)
            if "Unable to transform geometry" in error_msg:
                raise Exception(f"Geometry transformation error in batch {batch_idx + 1}: {error_msg}")
            elif "Quota" in error_msg or "limit" in error_msg.lower():
                if attempt < max_retries - 1:
                    backoff = min(30, 2 ** attempt)
                    print(f"⏳ Quota/rate limit hit, waiting {backoff}s before retry...", flush=True)
                    time.sleep(backoff)
                else:
                    raise Exception(f"Quota/rate limit exhausted for batch {batch_idx + 1}")
            elif "timeout" in error_msg.lower():
                if attempt < max_retries - 1:
                    backoff = min(15, 2 ** attempt)
                    print(f"⏳ Timeout, retrying in {backoff}s...", flush=True)
                    time.sleep(backoff)
                else:
                    raise e
            else:
                if attempt < max_retries - 1:
                    backoff = min(10, 2 ** attempt)
                    time.sleep(backoff)
                else:
                    raise e
        except Exception as e:
            if attempt < max_retries - 1:
                backoff = min(5, 2 ** attempt)
                time.sleep(backoff)
            else:
                raise e
    raise RuntimeError(f"Failed to process batch {batch_idx + 1} after {max_retries} attempts")

# def process_geojson_file(
#     geojson_path: str,
#     whisp_image: ee.Image,
#     reducer: ee.Reducer,
#     batch_size: int = EE_FEATURES_PER_BATCH,
#     max_concurrent: int = EE_MAX_CONCURRENT,
#     validate_null_geometries: bool = True,
#     validate_invalid_geometries: bool = True,
#     max_retries: int = MAX_RETRIES,
#     add_metadata_gpd: bool = False,
#     add_metadata_ee: bool = True,

# ) -> pd.DataFrame:
#     """Main function to process a GeoJSON file in batches using Whisp and EE."""
#     print(f"🔍 Loading and validating GeoJSON file...", flush=True)
#     gdf = gpd.read_file(geojson_path)
#     print(f"📁 Loaded {len(gdf):,} features from {geojson_path}", flush=True)
#     gdf = clean_geodataframe(gdf, remove_nulls=validate_null_geometries, fix_invalid=validate_invalid_geometries)
#     gdf = gpd.read_file(GEOJSON_EXAMPLE_FILEPATH)
#     if add_metadata_gpd:
#         gdf_reproj = gdf#.to_crs(epsg=6933)  # Reproject to equal area if necessary
#         gdf_w_metadata = gpd_extract_centroid_and_geomtype(gdf_reproj, x_col=centroid_x_coord_column, y_col=centroid_y_coord_column, type_col=geometry_type_column)
#         gdf_w_metadata_unproj = gdf_w_metadata#.to_crs(epsg=4326) 
#         batches = batch_geodataframe(gdf_w_metadata_unproj, batch_size)
#     else:
#         batches = batch_geodataframe(gdf, batch_size)
#     print(f"📊 Processing {len(gdf):,} features in {len(batches)} batches ({batch_size} features/batch)", flush=True)
#     print(f"🔄 Running {max_concurrent} concurrent requests...", flush=True)
#     results = []
    
#     def process_one_batch(batch_gdf, batch_idx):
#         fc = convert_batch_to_ee(batch_gdf)
#         if add_metadata_ee:
#             fc = ee_extract_centroid_and_geomtype(fc,x_col=centroid_x_coord_column, y_col=centroid_y_coord_column, type_col=geometry_type_column,max_error=0.1)
#         else:
#             fc
#         return process_ee_feature_collection(fc, whisp_image, reducer, batch_idx, max_retries)
#     with ThreadPoolExecutor(max_workers=max_concurrent) as executor:
#         future_to_idx = {executor.submit(process_one_batch, batch, i): i for i, batch in enumerate(batches)}
#         for future in as_completed(future_to_idx):
#             batch_idx = future_to_idx[future]
#             try:
#                 batch_result = future.result()
#                 results.append(batch_result)
#                 print(f"⏳ Progress: Batch {batch_idx + 1} ✓", flush=True)
#             except Exception as e:
#                 print(f"❌ Batch {batch_idx + 1} failed: {str(e)[:80]}...", flush=True)
#     if results:
#         combined_df = pd.concat(results, ignore_index=True)
#         return combined_df
#     else:
#         print("❌ No results produced - all batches failed", flush=True)
#         return pd.DataFrame()

import threading
from concurrent.futures import ThreadPoolExecutor, as_completed

# Global semaphore to strictly limit concurrent EE calls
# (so we can safely use more worker threads than EE concurrency)
EE_SEMAPHORE = threading.BoundedSemaphore(EE_MAX_CONCURRENT)

def _client_side_metadata(batch_gdf: gpd.GeoDataFrame,
                          row_id_col: str,
                          x_col: str,
                          y_col: str,
                          type_col: str) -> pd.DataFrame:
    """
    Client-side: compute centroid_x/centroid_y/geom_type using GeoPandas for this batch.
    Returns a pandas DataFrame with [row_id_col, x_col, y_col, type_col].
    """
    # Ensure only needed columns are carried across to reduce memory use
    meta_df = gpd_extract_centroid_and_geomtype(
        batch_gdf,
        x_col=x_col,
        y_col=y_col,
        type_col=type_col,
        external_id_col=row_id_col,
        return_attributes_only=True
    )
    return meta_df

def _server_side_batch(batch_gdf: gpd.GeoDataFrame,
                       batch_idx: int,
                       whisp_image: ee.Image,
                       reducer: ee.Reducer,
                       max_retries: int,
                       add_metadata_ee: bool,
                       x_col: str,
                       y_col: str,
                       type_col: str) -> pd.DataFrame:
    """
    Server-side: convert to EE, (optionally) add centroid/type server-side, run reduceRegions,
    return the converted DataFrame for this batch.
    """
    # Limit EE concurrency
    with EE_SEMAPHORE:
        fc = convert_batch_to_ee(batch_gdf)
        if add_metadata_ee:
            fc = ee_extract_centroid_and_geomtype(
                fc,
                x_col=x_col,
                y_col=y_col,
                type_col=type_col,
                max_error=0.1
            )
        df_result = process_ee_feature_collection(fc, whisp_image, reducer, batch_idx, max_retries)
        return df_result

def process_geojson_file(
    geojson_path: str,
    whisp_image: ee.Image,
    reducer: ee.Reducer,
    batch_size: int = EE_FEATURES_PER_BATCH,
    max_concurrent: int = EE_MAX_CONCURRENT,
    validate_null_geometries: bool = True,
    validate_invalid_geometries: bool = True,
    max_retries: int = MAX_RETRIES,
    add_metadata_gpd: bool = False,     # kept for compatibility; now client metadata is always computed for merge
    add_metadata_ee: bool = False,
    row_id_col: str = "__row_id__",
    x_col: str = None,
    y_col: str = None,
    type_col: str = None
) -> pd.DataFrame:
    """
    Process a GeoJSON in concurrent batches. For each batch, run EE (server-side) and GeoPandas
    (client-side) work concurrently, then merge the results.

    - EE concurrency is limited by `max_concurrent` via a semaphore.
    - Thread pool can exceed `max_concurrent` so client work proceeds even if EE slots are busy.

    Args for centroid/type columns:
      - If x_col/y_col/type_col are None, they fall back to config variables:
        centroid_x_coord_column / centroid_y_coord_column / geometry_type_column
    """
    # Resolve column names from config if not supplied
    global centroid_x_coord_column, centroid_y_coord_column, geometry_type_column
    x_col = x_col or centroid_x_coord_column
    y_col = y_col or centroid_y_coord_column
    type_col = type_col or geometry_type_column

    print(f"🔍 Loading and validating GeoJSON file...", flush=True)
    gdf = gpd.read_file(geojson_path)
    print(f"📁 Loaded {len(gdf):,} features from {geojson_path}", flush=True)

    gdf = clean_geodataframe(
        gdf,
        remove_nulls=validate_null_geometries,
        fix_invalid=validate_invalid_geometries
    )

    # Stable row id to join server and client results
    if row_id_col in gdf.columns:
        # If it already exists, ensure uniqueness; otherwise, recreate
        if gdf[row_id_col].duplicated().any():
            gdf = gdf.reset_index(drop=True)
            gdf[row_id_col] = gdf.index.astype("int64")
    else:
        gdf = gdf.reset_index(drop=True)
        gdf[row_id_col] = gdf.index.astype("int64")

    # Important: keep row_id property into EE by writing it to GeoJSON; convert_batch_to_ee reads props
    # (convert_batch_to_ee writes a temp geojson, so the property is preserved)

    batches = batch_geodataframe(gdf, batch_size)
    print(f"📊 Processing {len(gdf):,} features in {len(batches)} batches ({batch_size} features/batch)", flush=True)

    # We can use more workers than EE concurrency because EE calls are gated by the semaphore
    # A good heuristic is ~ 2x EE concurrency so client work can overlap fully
    pool_workers = max(2 * max_concurrent, max_concurrent + 2)
    print(f"🔄 Running up to {max_concurrent} EE calls concurrently (pool size: {pool_workers})...", flush=True)

    results = []
    progress_ok = 0

    with ThreadPoolExecutor(max_workers=pool_workers) as executor:
        # For each batch, submit both server-side and client-side futures, then merge on completion
        # We track a tuple of (server_future, client_future, batch_idx)
        pair_futures = []
        for i, batch in enumerate(batches):
            server_fut = executor.submit(
                _server_side_batch,
                batch, i, whisp_image, reducer, max_retries,
                add_metadata_ee, x_col, y_col, type_col
            )
            client_fut = executor.submit(
                _client_side_metadata,
                batch, row_id_col, x_col, y_col, type_col
            )
            pair_futures.append((server_fut, client_fut, i))

        # Consume as they complete—but we must wait for both halves of each pair
        for server_fut, client_fut, batch_idx in pair_futures:
            try:
                # Wait for both parts of this batch
                server_df = server_fut.result()
                client_df = client_fut.result()

                # Ensure the join key exists in server_df.
                # If whisp.convert_ee_to_df drops the property name, add a safeguard:
                if row_id_col not in server_df.columns:
                    # Try common fallbacks
                    possible_keys = [row_id_col, 'row_id', 'id', 'plotId', 'plot_id']
                    found_key = next((k for k in possible_keys if k in server_df.columns), None)
                    if found_key and found_key != row_id_col:
                        server_df = server_df.rename(columns={found_key: row_id_col})
                    else:
                        # If no key, inject a temporary index-aligned key (best-effort)
                        server_df[row_id_col] = range(len(server_df))

                # Merge: server results (left) with client metadata (right)
                merged = server_df.merge(client_df, on=row_id_col, how='left', suffixes=('', '_client'))
                results.append(merged)
                progress_ok += 1
                print(f"⏳ Progress: Batch {batch_idx + 1} ✓", flush=True)

            except Exception as e:
                # Try to surface a helpful failure message but keep going
                err = str(e)
                print(f"❌ Batch {batch_idx + 1} failed: {err[:120]}...", flush=True)

    if results:
        combined_df = pd.concat(results, ignore_index=True)
        print(f"✅ Done. {progress_ok}/{len(batches)} batches completed.", flush=True)
        return combined_df
    else:
        print("❌ No results produced - all batches failed", flush=True)
        return pd.DataFrame()


In [None]:
!pip show openforis-whisp

In [None]:
# Choose additional national datasets to include (currently three countries: 'co', 'ci', 'br').
iso2_codes_list = ['co', 'ci', 'br']

In [None]:
admin_image = ee.Image("projects/ee-whisp/assets/admin_bounds/gaul_2024_level_1_code_500m").rename("admin_code")

water_mask_image = ee.Image("projects/ee-whisp/assets/water_mask/water_mask_jrc_usgs").rename(water_flag)

try:
    whisp_image = whisp.combine_datasets(national_codes=iso2_codes_list).addBands(admin_image).addBands(water_mask_image)
    print(whisp_image.bandNames().getInfo())
except:
    whisp_image = whisp.combine_datasets(national_codes=iso2_codes_list,validate_bands=True).addBands(admin_image).addBands(water_mask_image)


In [None]:
folder_path = (r"C:\Users\Arnell\Downloads\a_processing_tests")  # Replace with your folder path

In [None]:
GEOJSON_EXAMPLE_FILEPATH = folder_path+"/random_polygons.geojson"

Separate testing of functions

In [None]:
geom = (ee.FeatureCollection("projects/sat-io/open-datasets/FAO/GAUL/GAUL_2024_L1")
    .filter(ee.Filter.eq('gaul0_name', 'Austria')).geometry().bounds()
)

# Option 1: Use simple bounds (list)
random_geojson = whisp.generate_test_polygons(
    bounds=geom, 
    num_polygons=100, 
    min_area_ha=100, 
    max_area_ha=100, 
    min_number_vert=100,     
    max_number_vert=100     
)

# GEOJSON_EXAMPLE_FILEPATH = folder_path + "/random_polygons.geojson"?
print(GEOJSON_EXAMPLE_FILEPATH)
import json
# Save the GeoJSON to a file
with open(GEOJSON_EXAMPLE_FILEPATH, 'w') as f:
    json.dump(random_geojson, f)


In [None]:
GEOJSON_EXAMPLE_FILEPATH = whisp.get_example_data_path("geojson_example.geojson")

In [None]:
# # Function to convert a polygon GeoJSON to a point GeoJSON (centroids)
# import geopandas as gpd
# from shapely.geometry import Point

# def convert_polygon_geojson_to_point_geojson(input_geojson_path, output_geojson_path, id_column=None):
#     """
#     Reads a polygon GeoJSON, computes centroids, and writes a new GeoJSON with point geometries.
#     Optionally preserves an ID column.
#     """
#     gdf = gpd.read_file(input_geojson_path)
#     # Compute centroids
#     gdf['geometry'] = gdf.geometry.centroid
#     # Optionally keep only id and geometry
#     if id_column and id_column in gdf.columns:
#         gdf_out = gdf[[id_column, 'geometry']]
#     else:
#         gdf_out = gdf[['geometry']]
#     # Save as GeoJSON
#     gdf_out.to_file(output_geojson_path, driver='GeoJSON')
#     print(f"Saved point GeoJSON to: {output_geojson_path}")

# # Example usage:
# # convert_polygon_geojson_to_point_geojson(GEOJSON_EXAMPLE_FILEPATH, folder_path+"/random_points.geojson", id_column=plot_id_column)


In [None]:
# convert_polygon_geojson_to_point_geojson(
#     GEOJSON_EXAMPLE_FILEPATH,
#     folder_path + "/random_polygons.geojson",
#     id_column=plot_id_column
# )

In [None]:
# GEOJSON_EXAMPLE_FILEPATH = folder_path+"/RSPO-Concessions-Version-10-May-2025.geojson"

In [None]:
FEATURES_PER_EE_REQUEST = 10
MAX_CONCURRENT_EE_REQUESTS = 20

In [None]:
# print(whisp_image.bandNames().getInfo())

In [None]:
if __name__=="__main__":
    
    result_df_raw = process_geojson_file(
        geojson_path=GEOJSON_EXAMPLE_FILEPATH,
        whisp_image=whisp_image,
        reducer=combined_reducer,
        batch_size=FEATURES_PER_EE_REQUEST,
        max_concurrent=MAX_CONCURRENT_EE_REQUESTS,
        validate_null_geometries=True,
        validate_invalid_geometries=False,
        add_metadata_gpd=False,
        add_metadata_ee=False,
        max_retries=3,
        # ee_version="v1"  # Add this if you implement versioning
    )

In [None]:
result_df_raw

In [None]:
# Ensure the correct column exists before joining admin codes
# print("Columns in result_df:", result_df.columns.tolist())

# Use the correct column for admin code joining
if "admin_code_median" in result_df_raw.columns:
    id_col = "admin_code_median"
    result_df_w_loc = join_admin_codes(df=result_df_raw, lookup_dict=lookup_dict, id_col=id_col)
else:
    print("Column 'admin_code_median' not found. Available columns:", result_df_raw.columns.tolist())

# Robustly add integer index to plot_id_column if missing or empty
if plot_id_column not in result_df_w_loc.columns or result_df_w_loc[plot_id_column].isnull().all():
    result_df_w_loc[plot_id_column] = range(len(result_df_w_loc))


strip_suffix = "_sum"

area_col = geometry_area_column+strip_suffix

result_df_w_loc_formatted = format_stats_dataframe(df=result_df_w_loc, unit_type="percent",area_col=area_col,strip_suffix=strip_suffix)


df_stats = whisp.validate_dataframe_using_lookups_flexible(df_stats=result_df_w_loc_formatted,
                                                           national_codes=iso2_codes_list)

In [None]:
df_stats

In [None]:
# Define the output folder 
# e.g. in running in Sepal this might be: Path.home() / 'module_results/whisp/'
out_directory = Path.home() / 'downloads'

# Define the output file path for CSV
csv_output_file = out_directory / 'whisp_output_table.csv'

# Save the CSV file
df_stats.to_csv(path_or_buf=csv_output_file, index=False)
print(f"Table with stats saved to: {csv_output_file}")

In [None]:
# Define the output file path for GeoJSON
geojson_output_file = out_directory / 'whisp_output_geo.geojson'

# Save the GeoJSON file
whisp.convert_df_to_geojson(df_stats, geojson_output_file)  # builds a geojson file containing Whisp columns. Uses the geometry column "geo" to create the spatial features.
print(f"GeoJSON file saved to: {geojson_output_file}")

In [None]:
# adds risk columns to end of dataframe
df_w_risk = whisp.whisp_risk(
    df=df_stats,
    national_codes=iso2_codes_list
    )

Display table with risk columns

In [None]:
df_w_risk

Export table to CSV

In [None]:
# Define the output folder 
# e.g. in running in Sepal this might be: Path.home() / 'module_results/whisp/'
out_directory = Path.home() / 'downloads'

# Define the output file path for CSV
csv_output_file = out_directory / 'whisp_output_table_w_risk.csv'

# Save the CSV file
df_w_risk.to_csv(path_or_buf=csv_output_file, index=False)
print(f"Table with risk columns saved to: {csv_output_file}")

Export to GeoJSON (optional)

In [None]:
# Define the output file path for GeoJSON
geojson_output_file = out_directory / 'whisp_output_geo_w_risk.geojson'

# Save the GeoJSON file
whisp.convert_df_to_geojson(df_w_risk, geojson_output_file)  # builds a geojson file containing Whisp columns. Uses the geometry column "geo" to create the spatial features.
print(f"GeoJSON file saved to: {geojson_output_file}")

Classic Whisp

In [None]:
ee.Reset()

In [None]:
# Earth Engine and Common Libraries
import ee
from pathlib import Path

# Authenticate and initialize Earth Engine with STANDARD endpoint
# (The concurrent processing section above uses high-volume endpoint)
try:
    ee.Initialize()  # Standard endpoint (default)
except Exception:
    ee.Authenticate()
    ee.Initialize()  # Standard endpoint (default)

In [None]:
# Check which endpoint is now active
print("EE Data Base URL:", ee.data._cloud_api_base_url)
print("EE API Base URL:", ee.data._api_base_url)

# Check if using standard endpoint
if 'highvolume' in str(ee.data._cloud_api_base_url):
    print("❌ Still using HIGH-VOLUME endpoint")
else:
    print("✅ Now using STANDARD endpoint")

In [None]:
import openforis_whisp as whisp


In [None]:
!pip show openforis-whisp

In [None]:
#### whisp = whisp.whisp_formatted_stats_geojson_to_df(GEOJSON_EXAMPLE_FILEPATH)
# whisp = whisp.whisp_stats_geojson_to_df(GEOJSON_EXAMPLE_FILEPATH,whisp_image=whisp_image)

In [None]:
import openforis_whisp as whisp
fc = whisp.convert_geojson_to_ee(GEOJSON_EXAMPLE_FILEPATH)
# print(fc.size().getInfo())  # Print number of features in the collection


In [None]:
whisp_image = whisp.combine_datasets(national_codes=iso2_codes_list)

In [None]:
combined_reducer = ee.Reducer.sum().combine(ee.Reducer.median(),sharedInputs=True)
results = whisp_image.reduceRegions(fc, reducer=combined_reducer, scale=10)
whisp.convert_ee_to_df(results)

In [None]:
results = whisp.whisp_formatted_stats_geojson_to_df(input_geojson_filepath=GEOJSON_EXAMPLE_FILEPATH,whisp_image=whisp_image,national_codes=iso2_codes_list)