In [1]:
import ee
import time
import sys
from pathlib import Path

# Add src to path for utils import
sys.path.insert(0, str(Path.cwd()))
from utils import INDEX_CONFIGS, make_gradient

ee.Authenticate()
ee.Initialize(project='dse-staff')

# ===== CONFIGURATION: SELECT INDEX =====
INDEX_NAME = 'ndbi'  # Change to: 'ndvi', 'ndbi', 'lai', or 'fpar'
# =======================================

# Validate index selection
if INDEX_NAME not in INDEX_CONFIGS:
    raise ValueError(f"Invalid index: {INDEX_NAME}. Choose from {list(INDEX_CONFIGS.keys())}")

# Folder configuration - exports to Google Drive folder, download to results/{INDEX_NAME}/
folder_name = f"{INDEX_NAME}_raw"
print(f"Processing: {INDEX_NAME.upper()} - {INDEX_CONFIGS[INDEX_NAME]['description']}")
print(f"Google Drive folder: {folder_name}")
print(f"After download, place CSVs in: ../results/{INDEX_NAME}/")

# Static layers (unchanged)
gsw = ee.Image('JRC/GSW1_4/GlobalSurfaceWater')
hm = ee.ImageCollection('CSP/HM/GlobalHumanModification').mean()
elevation = ee.Image('USGS/SRTMGL1_003').select('elevation')
slope = ee.Terrain.slope(elevation)

staticImage = ee.Image.cat([
    gsw.select('max_extent'),
    hm.rename('gHM'),
    elevation,
    slope
])

# Year configuration
years = ee.List.sequence(2003, 2025)
gradBandNames = [str(y) for y in range(2003, 2026)]
selectors = ['WDPA_PID', 'transectID', 'pointID', 'max_extent', 'gHM', 'elevation', 'slope'] + gradBandNames

# Build gradient function for selected index
def make_current_gradient(y):
    return make_gradient(INDEX_NAME, y)

# Build image with gradient bands (your existing logic)
gradientBands = ee.ImageCollection.fromImages(
    years.map(make_current_gradient)
).toBands()
gradientBands = gradientBands.rename(gradBandNames)
image = staticImage.addBands(gradientBands)


def process_samples(asset_path, chunk_size=50_000, batch_size=10, chunks_to_run=None):
    """
    Process samples from Earth Engine asset and export to Google Drive.
    
    Parameters
    ----------
    asset_path : str
        Path to Earth Engine FeatureCollection asset
    chunk_size : int
        Number of samples per chunk
    batch_size : int
        Number of chunks to process simultaneously
    chunks_to_run : list, optional
        Specific chunk indices to process (for rerunning failures)
    """
    samples = ee.FeatureCollection(asset_path)
    size = samples.size().getInfo()
    nChunks = int((size + chunk_size - 1) // chunk_size)
    tasks = []
    
    # If chunks_to_run is None, run all chunks
    if chunks_to_run is None:
        chunks_to_run = list(range(nChunks))
    
    # Extract asset number from path (e.g., "chunk_003" -> "003")
    asset_num = asset_path.split("_")[-1]
    
    # Create tasks only for specified chunks
    for i in chunks_to_run:
        fcChunk = ee.FeatureCollection(samples.toList(chunk_size, i * chunk_size))
        sampled = image.reduceRegions(
            collection=fcChunk,
            reducer=ee.Reducer.first(),
            scale=500
        )
        task = ee.batch.Export.table.toDrive(
            collection=sampled,
            description=f'{INDEX_NAME}_raw_grad_{asset_num}_chunk_{i}',
            fileFormat='CSV',
            selectors=selectors, 
            folder=folder_name
        )
        tasks.append((i, task))

    # Process in batches
    for j in range(0, len(tasks), batch_size):
        batch = tasks[j:j + batch_size]
        for idx, t in batch:
            t.start()
        
        chunk_nums = [idx for idx, _ in batch]
        print(f"  Processing chunks {chunk_nums}...")
        
        while True:
            statuses = [t.status()['state'] for _, t in batch]
            if all(s in ['COMPLETED', 'FAILED', 'CANCELLED'] for s in statuses):
                print(f"  Completed chunks {chunk_nums}")
                break
            time.sleep(30)



Processing: NDBI - Normalized Difference Built-up Index
Google Drive folder: ndbi_raw
After download, place CSVs in: ../results/ndbi/


In [None]:
# Process all assets sequentially #305 minutes
total_assets = 10
for idx in range(total_assets):
    asset = f'projects/dse-staff/assets/chunk_{idx:03d}'
    print(f"\nProcessing asset {idx + 1} of {total_assets}: {asset}")
    process_samples(asset)
    print(f"Asset {idx + 1} of {total_assets} complete")

print(f"\nAll {total_assets} assets processed for {INDEX_NAME.upper()}!")


Processing asset 1 of 10: projects/dse-staff/assets/chunk_000
  Processing chunks [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]...


In [None]:
# Reprocess only failed chunks
# Update the dictionary below with your failed chunks
failed_by_asset = {
    # 0: [14],      # Example: chunk_000 had chunk 14 fail
    # 1: [9],       # chunk_001 had chunk 9 fail
    # 2: [1],       # etc.
    # 3: [4, 7]
}

if failed_by_asset:
    for asset_idx, failed_chunks in failed_by_asset.items():
        asset = f'projects/dse-staff/assets/chunk_{asset_idx:03d}'
        print(f"\nReprocessing failed chunks {failed_chunks} for asset {asset_idx} ({asset})")
        process_samples(asset, chunks_to_run=failed_chunks)
        print(f"Asset {asset_idx} failed chunks reprocessed!")
    print(f"\nAll failed chunks reprocessed for {INDEX_NAME.upper()}!")
else:
    print("No failed chunks specified. Update the failed_by_asset dictionary above.")