# NDVI EVI Statistics Processing Script

**Authors:** Gabriel Ortega & Michela Perrone

## Libraries

In [1]:
import os
import time
import datetime
import pandas as pd
import geopandas as gpd
import ee
import geemap
ee.Authenticate()
ee.Initialize(project='gortega-research')

## 1. Settings

In [6]:
INPUT_FOLDER = "edge_bck"          
EXPORT_FOLDER = "NEW_GEE_MODIS_Stats"  

# Processing Parameters
START_YEAR = 2000
END_YEAR = 2024
BATCH_SIZE = 50                    # Process 50 WDPA files per Earth Engine task
SIMPLIFY_TOLERANCE = 0.001         # Degrees (approx 100m). Reduces vertices to prevent payload errors.

# MODIS Configuration
MODIS_COLLECTION = 'MODIS/061/MOD13Q1' # Consider 'MODIS/061/MOD13Q1' for Collection 6.1
SCALE_FACTOR = 0.0001
BANDS = ['NDVI', 'EVI']
QA_BAND = 'SummaryQA'

# GPKG Layer Name
# Critical: Matches the specific layer in your GPKG to avoid ambiguity
TARGET_LAYER_NAME = 'sql_statement' 

current_date = datetime.datetime.now().strftime("%Y%m%d")

# Resume Processing
# Should be none if I don't want to skip any files
START_FROM_WDPAID = 145814

## 2. Function(s)

In [8]:
def check_queue_and_wait(max_tasks=100):
    """
    Pauses execution if too many tasks are running (default limit 100).
    Prevents overloading the local script or hitting strict EE rate limits.
    """
    while True:
        try:
            tasks = ee.batch.Task.list()
            active_tasks = [t for t in tasks if t.state in ['READY', 'RUNNING']]
            if len(active_tasks) < max_tasks:
                break
            print(f"Queue busy ({len(active_tasks)} active). Waiting 60s...")
            time.sleep(60)
        except Exception as e:
            print(f"Queue check failed: {e}. Retrying in 30s...")
            time.sleep(30)

def read_and_clean_gpkg(filepath):
    """
    Reads a GPKG, selecting the correct layer and simplifying geometry.
    Returns a GeoDataFrame or None if invalid.
    """
    try:
        # Attempt to read the specific layer to avoid ambiguity warnings
        try:
            gdf = gpd.read_file(filepath, layer=TARGET_LAYER_NAME)
        except ValueError:
            # Layer not found, try default reading
            gdf = gpd.read_file(filepath)
        
        if gdf.empty:
             return None

        # Resume from START_FROM_WDPAID if specified
        if START_FROM_WDPAID is not None:
            # Ensure column exists (case-insensitive check could be added if needed, but 'wdpaid' is standard)
            if 'wdpaid' in gdf.columns:
                # Convert to numeric just in case IDs were loaded as strings
                gdf['wdpaid'] = pd.to_numeric(gdf['wdpaid'], errors='coerce')
                # Filter: Keep only IDs >= the start value
                gdf = gdf[gdf['wdpaid'] >= START_FROM_WDPAID]
            
            # If the file contains only skipped IDs, gdf will be empty
            if gdf.empty:
                return None

        # Simplify geometry to reduce to avoid hitting size limit
        gdf['geometry'] = gdf.geometry.simplify(tolerance=SIMPLIFY_TOLERANCE)
        
        # Add source filename for tracking results back to the original file
        gdf['source_file'] = os.path.basename(filepath)
        
        return gdf
    except Exception as e:
        print(f"Skipping {os.path.basename(filepath)}: {e}")
        return None

def get_yearly_composite(year, roi):
    """
    Creates a 90th percentile composite for the given year and ROI.
    Applies QA masking for valid pixels.
    """
    collection = ee.ImageCollection(MODIS_COLLECTION) \
        .filterDate(f'{year}-01-01', f'{year+1}-01-01') \
        .filterBounds(roi)

    def mask_quality(image):
        qa = image.select(QA_BAND)
        # Mask: 0 = Good Data. 
        # You can add .or(qa.eq(1)) for 'Marginal' data if coverage is too low.
        mask = qa.eq(0) 
        return image.updateMask(mask).multiply(SCALE_FACTOR).copyProperties(image, ['system:time_start'])

    # 90th percentile reduces cloud/shadow artifacts effectively for vegetation indices
    composite = collection.map(mask_quality).select(BANDS).reduce(ee.Reducer.percentile([90]))
    
    # Rename bands back to standard names (remove '_p90' suffix created by reducer)
    return composite.rename(BANDS)

def submit_batch_export(batch_fc, batch_index, years):
    """
    Processes a batch of polygons over all years and submits ONE export task.
    """
    accumulated_results = []
    
    # Helper to get geometry of the whole batch for efficient image filtering
    batch_bounds = batch_fc.geometry().bounds()

    # Loop through years locally to build the computation graph
    for year in range(years[0], years[1] + 1):
        # Get image for this year
        img = get_yearly_composite(year, batch_bounds)
        
        # reduceRegions is efficient for FeatureCollections
        # It appends the stats as properties to each feature in the batch
        stats = img.reduceRegions(
            collection=batch_fc,
            reducer=ee.Reducer.mean().combine(ee.Reducer.stdDev(), None, True),
            scale=250,
            tileScale=4
        )
        
        # Add the year as a property to every feature in this year's result
        stats_with_year = stats.map(lambda f: f.set('year', year))
        accumulated_results.append(stats_with_year)

    # Flatten the list of collections into one single collection for export
    final_batch_fc = ee.FeatureCollection(accumulated_results).flatten()

    # Define Export
    description = f"WDPA_Batch_{batch_index}_MODIS_{current_date}"
    
    # Select specific properties to export to keep CSV clean
    # Ensure 'wdpaid' matches the actual column name in your GPKG
    selectors = ['wdpaid', 'year', 'NDVI_mean', 'NDVI_stdDev', 'EVI_mean', 'EVI_stdDev', 'source_file']

    task = ee.batch.Export.table.toDrive(
        collection=final_batch_fc,
        description=description,
        folder=EXPORT_FOLDER,
        fileFormat='CSV',
        selectors=selectors 
    )
    
    check_queue_and_wait()
    task.start()
    print(f" -> Submitted Batch {batch_index} ({description})")


## 3. Execution

In [None]:
def main():
    if not os.path.exists(INPUT_FOLDER):
        print(f"Error: Input folder '{INPUT_FOLDER}' not found.")
        return

    # Get list of files
    all_files = [f for f in os.listdir(INPUT_FOLDER) if f.endswith('.gpkg')]
    total_files = len(all_files)
    
    if total_files == 0:
        print("No .gpkg files found.")
        return

    print(f"Found {total_files} GPKG files. Processing in batches of {BATCH_SIZE}...")

    current_batch_gdfs = []
    batch_counter = 1

    for i, filename in enumerate(all_files):
        filepath = os.path.join(INPUT_FOLDER, filename)
        
        # 1. Read and Clean Local Data
        gdf = read_and_clean_gpkg(filepath)
        
        if gdf is not None and not gdf.empty:
            current_batch_gdfs.append(gdf)

        # 2. Check if we should process the batch
        is_batch_full = len(current_batch_gdfs) >= BATCH_SIZE
        is_last_file = (i == total_files - 1)

        if (is_batch_full or is_last_file) and current_batch_gdfs:
            print(f"Processing Batch {batch_counter} with {len(current_batch_gdfs)} files...")
            
            try:
                # Combine Python GDFs into one
                merged_gdf = pd.concat(current_batch_gdfs, ignore_index=True)
                
                # Convert to Earth Engine FeatureCollection ONCE per batch
                ee_batch_fc = geemap.geopandas_to_ee(merged_gdf)
                
                # Submit to Earth Engine
                submit_batch_export(ee_batch_fc, batch_counter, [START_YEAR, END_YEAR])
                
            except Exception as e:
                print(f"Error submitting Batch {batch_counter}: {e}")
                # Optional: You could log failed batch IDs to a file here

            # Reset Accumulator for next batch
            current_batch_gdfs = []
            batch_counter += 1

    print("All batches submitted.")

if __name__ == "__main__":
    main()

Found 108961 GPKG files. Processing in batches of 50...
Processing Batch 1 with 50 files...
 -> Submitted Batch 1 (WDPA_Batch_1_MODIS_20251212)
Processing Batch 2 with 50 files...
 -> Submitted Batch 2 (WDPA_Batch_2_MODIS_20251212)
Processing Batch 3 with 50 files...
 -> Submitted Batch 3 (WDPA_Batch_3_MODIS_20251212)
Processing Batch 4 with 50 files...
 -> Submitted Batch 4 (WDPA_Batch_4_MODIS_20251212)
Processing Batch 5 with 50 files...
 -> Submitted Batch 5 (WDPA_Batch_5_MODIS_20251212)
Processing Batch 6 with 50 files...
 -> Submitted Batch 6 (WDPA_Batch_6_MODIS_20251212)
Processing Batch 7 with 50 files...
 -> Submitted Batch 7 (WDPA_Batch_7_MODIS_20251212)
Processing Batch 8 with 50 files...
 -> Submitted Batch 8 (WDPA_Batch_8_MODIS_20251212)
Processing Batch 9 with 50 files...
 -> Submitted Batch 9 (WDPA_Batch_9_MODIS_20251212)
Processing Batch 10 with 50 files...
 -> Submitted Batch 10 (WDPA_Batch_10_MODIS_20251212)
Processing Batch 11 with 50 files...
 -> Submitted Batch 11 (