# Multi-Year Lake Ice Phenology Data Export
## Part 2: GEE Processing and Export (2019-2021)

**Goal:** Export S1 + S2 + ERA5 data for North Slope lakes across 3 years

**Strategy:**
- Process chunks independently (spatial parallelization)
- Export one year at a time per chunk
- Use efficient spatial filtering (only process S2 images that overlap lakes)
- Total exports: ~19 chunks × 3 years = ~57 exports

**Data sources:**
- Sentinel-1 GRD (SAR)
- Sentinel-2 SR Harmonized (optical, for NDSI)
- ERA5-Land (temperature)

**Years:** 2019, 2020, 2021 (match ALPOD temporal coverage)

---
## Setup

In [72]:
import ee
import pandas as pd
import numpy as np
import geopandas as gpd
from datetime import datetime
import time

# Initialize Earth Engine
ee.Initialize()

print("Imports successful!")
print(f"Earth Engine initialized: {ee.String('GEE Initialized').getInfo()}")

httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance.
httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance.
httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance.
httplib2 transport does not support per-request timeout. Set the timeout when constructing the httplib2.Http instance.


Imports successful!
Earth Engine initialized: GEE Initialized


In [73]:
# Configuration
BUCKET = 'wustl-eeps-geospatial'
BASE_PATH = 'thermokarst_lakes'
CHUNKS_PATH = f'gs://{BUCKET}/{BASE_PATH}/processed/chunks'
OUTPUT_PATH = f'{BASE_PATH}/exports'  # No gs:// prefix for GEE exports

# Years to process (match ALPOD coverage)
YEARS = [2019, 2020, 2021]

# Processing parameters
SCALE = 10  # Sentinel-1 resolution
S2_CLOUD_THRESHOLD = 30  # Maximum cloud cover for S2 images (scene-level)
S2_CLOUD_PROB_THRESHOLD = 40  # s2cloudless probability threshold (pixel-level)
S2_TIME_WINDOW = 3  # Days before/after S1 acquisition to look for S2

# Projection
ALASKA_ALBERS = 'EPSG:3338'

print(f"Configuration:")
print(f"  Years: {YEARS}")
print(f"  Chunks path: {CHUNKS_PATH}")
print(f"  Output: gs://{BUCKET}/{OUTPUT_PATH}")
print(f"  S2 scene cloud threshold: {S2_CLOUD_THRESHOLD}%")
print(f"  S2 pixel cloud probability threshold: {S2_CLOUD_PROB_THRESHOLD}%")
print(f"  S2 time window: ±{S2_TIME_WINDOW} days")

Configuration:
  Years: [2019, 2020, 2021]
  Chunks path: gs://wustl-eeps-geospatial/thermokarst_lakes/processed/chunks
  Output: gs://wustl-eeps-geospatial/thermokarst_lakes/exports
  S2 scene cloud threshold: 30%
  S2 pixel cloud probability threshold: 40%
  S2 time window: ±3 days


---
## Helper Functions

In [74]:
def load_chunk_from_bucket(chunk_id):
    """
    Load a chunk GeoJSON from bucket and convert to ee.FeatureCollection
    Downloads to local temp file first to avoid GCS streaming issues
    """
    import os
    import tempfile
    
    chunk_file = f'{CHUNKS_PATH}/chunk_{chunk_id:02d}.geojson'
    
    # Download to local temp file
    local_path = f'/tmp/chunk_{chunk_id:02d}.geojson'
    
    # Use gsutil to download (reliable in Vertex AI)
    os.system(f'gsutil -q cp {chunk_file} {local_path}')
    
    # Load from local file
    gdf = gpd.read_file(local_path)
    
    print(f"  Loaded chunk {chunk_id}: {len(gdf)} lakes")
    print(f"    Bounds: {gdf.total_bounds}")
    
    # Keep only essential properties to reduce payload size
    essential_cols = ['geometry']
    
    if 'lake_area_km2' in gdf.columns:
        essential_cols.append('lake_area_km2')
    
    # Create simple ID if needed
    gdf['lake_id'] = range(len(gdf))
    essential_cols.append('lake_id')
    
    gdf_simplified = gdf[essential_cols].copy()
    
    # Simplify geometries to reduce size (5m tolerance)
    print(f"  Simplifying geometries...")
    gdf_simplified['geometry'] = gdf_simplified.geometry.simplify(
        tolerance=5, preserve_topology=True
    )
    
    # Convert to GeoJSON dict
    geojson = gdf_simplified.__geo_interface__
    
    print(f"  Creating EE FeatureCollection...")
    fc = ee.FeatureCollection(geojson)
    print(f"  FeatureCollection created successfully")
    
    return fc, gdf

In [75]:
def add_lake_geometry_metrics(fc):
    """
    More efficient version: Pre-compute union of all lakes once
    """
    # PRE-COMPUTE: Union of all lake geometries (done once per chunk)
    all_lakes_union = fc.geometry().dissolve()
    
    def buffer_interior(feat):
        geom = feat.geometry()
        area = geom.area()
        
        buffered = geom.buffer(-10)
        buffered = ee.Algorithms.If(
            area.lt(10000),
            geom,
            buffered
        )
        
        return feat.set({'lake_interior': buffered})
    
    def add_landscape_ring(feat):
        geom = feat.geometry()
        
        # Create 100m ring
        outer = geom.buffer(100)
        ring = outer.difference(geom)
        
        # Remove ALL lakes from ring (including this one and all neighbors)
        # This ensures we only sample terrestrial landscape
        clean_ring = ring.difference(all_lakes_union)
        
        return feat.set({'landscape_ring': clean_ring})
    
    fc_with_interior = fc.map(buffer_interior)
    fc_with_landscape = fc_with_interior.map(add_landscape_ring)
    
    return fc_with_landscape

---
## Sentinel-1 Processing

In [76]:
def process_sentinel1(lakes_fc, year, region_bounds):
    """
    Process Sentinel-1 SAR data for a given year and region
    """
    # Filter S1 collection
    s1 = (ee.ImageCollection('COPERNICUS/S1_GRD')
          .filterDate(f'{year}-01-01', f'{year}-12-31')
          .filterBounds(region_bounds)
          .filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VV'))
          .filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VH'))
          .filter(ee.Filter.eq('instrumentMode', 'IW'))
          .filter(ee.Filter.eq('orbitProperties_pass', 'DESCENDING'))
          .select(['VV', 'VH']))
    
    def extract_s1_features(img):
        """
        Extract S1 backscatter for each lake
        """
        date = img.date()
        
        def lake_s1_stats(lake):
            # Get lake interior geometry
            lake_geom = ee.Geometry(lake.get('lake_interior'))
            
            # Extract VV and VH
            stats = img.reduceRegion(
                reducer=ee.Reducer.mean(),
                geometry=lake_geom,
                scale=SCALE,
                maxPixels=1e9
            )
            
            return lake.set({
                's1_date': date.format('YYYY-MM-dd'),
                's1_doy': date.getRelative('day', 'year'),
                'vv_db': stats.get('VV'),
                'vh_db': stats.get('VH')
            })
        
        return lakes_fc.map(lake_s1_stats)
    
    # Process all S1 images
    s1_features = s1.map(extract_s1_features).flatten()
    
    return s1_features

---
## Sentinel-2 Processing

In [77]:
def compute_ndsi(img):
    """
    Compute Normalized Difference Snow Index (NDSI)
    NDSI = (Green - SWIR1) / (Green + SWIR1)
    """
    green = img.select('B3')
    swir1 = img.select('B11')
    
    ndsi = green.subtract(swir1).divide(green.add(swir1)).rename('ndsi')
    
    return img.addBands(ndsi)

def mask_s2_clouds(img):
    """
    Mask clouds using QA60 band (basic cloud mask)
    """
    qa = img.select('QA60')
    
    # Bits 10 and 11 are clouds and cirrus
    cloud_bit_mask = 1 << 10
    cirrus_bit_mask = 1 << 11
    
    # Both should be zero (clear conditions)
    mask = qa.bitwiseAnd(cloud_bit_mask).eq(0).And(
           qa.bitwiseAnd(cirrus_bit_mask).eq(0))
    
    return img.updateMask(mask)

def add_s2cloudless_mask(img):
    """
    Add s2cloudless probability-based cloud mask
    More accurate than QA60, especially for distinguishing ice from clouds
    """
    # Get the s2cloudless collection
    s2_cloudless = ee.ImageCollection('COPERNICUS/S2_CLOUD_PROBABILITY')
    
    # Find the matching cloud probability image
    # Match by system:index (unique ID that links S2 and s2cloudless)
    cloud_prob = (s2_cloudless
                  .filter(ee.Filter.eq('system:index', img.get('system:index')))
                  .first()
                  .select('probability'))
    
    # Mask pixels with cloud probability > threshold
    is_clear = cloud_prob.lt(S2_CLOUD_PROB_THRESHOLD)
    
    return img.updateMask(is_clear)

In [78]:
def process_sentinel2(lakes_fc, year, region_bounds):
    """
    Process Sentinel-2 optical data for ice detection
    Uses both QA60 and s2cloudless for robust cloud masking
    Only process S2 images that actually overlap with lakes (spatial filtering)
    """
    # Get S2 collection for the year
    s2 = (ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED')
          .filterDate(f'{year}-01-01', f'{year}-12-31')
          .filterBounds(region_bounds)
          .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', S2_CLOUD_THRESHOLD))
          .map(mask_s2_clouds)        # Apply QA60 mask first (fast)
          .map(add_s2cloudless_mask)  # Then s2cloudless (accurate for ice/cloud distinction)
          .map(compute_ndsi))
    
    def extract_s2_for_date(s2_img):
        """
        For each S2 image, extract NDSI for lakes within the image footprint
        """
        s2_date = s2_img.date()
        s2_bounds = s2_img.geometry()
        cloud_pct = s2_img.get('CLOUDY_PIXEL_PERCENTAGE')
        
        # CRITICAL: Only process lakes that overlap this S2 image
        lakes_in_image = lakes_fc.filterBounds(s2_bounds)
        
        def lake_s2_stats(lake):
            lake_geom = ee.Geometry(lake.get('lake_interior'))
            
            # Compute ice fraction (NDSI > 0.4 indicates ice)
            ndsi = s2_img.select('ndsi')
            ice_mask = ndsi.gt(0.4)
            
            # Get statistics
            stats = ice_mask.reduceRegion(
                reducer=ee.Reducer.mean(),
                geometry=lake_geom,
                scale=20,  # S2 SWIR resolution
                maxPixels=1e9
            )
            
            ice_fraction = stats.get('ndsi')
            
            return lake.set({
                's2_date': s2_date.format('YYYY-MM-dd'),
                's2_doy': s2_date.getRelative('day', 'year'),
                's2_ice_fraction': ice_fraction,
                's2_cloud_pct': cloud_pct
            })
        
        return lakes_in_image.map(lake_s2_stats)
    
    # Process all S2 images
    s2_features = s2.map(extract_s2_for_date).flatten()
    
    return s2_features

---
## ERA5 Temperature Processing

In [79]:
def process_era5_temperature(lakes_fc, year):
    """
    Process ERA5-Land temperature data - OPTIMIZED VERSION
    Pre-computes daily means once, then samples all lakes in batch
    Much faster than computing daily mean separately for each lake
    """
    print(f"  Loading ERA5 hourly data for {year}...")
    
    # Load ERA5-Land hourly data
    era5_hourly = (ee.ImageCollection('ECMWF/ERA5_LAND/HOURLY')
                   .filterDate(f'{year}-01-01', f'{year}-12-31')
                   .select('temperature_2m'))
    
    # Convert to Celsius
    def to_celsius(img):
        temp_c = img.subtract(273.15).rename('temp_c')
        return temp_c.copyProperties(img, ['system:time_start'])
    
    era5_hourly = era5_hourly.map(to_celsius)
    
    print(f"  Pre-computing daily means...")
    
    # Determine number of days in year (handle leap years)
    is_leap = ee.Number(year).mod(4).eq(0).And(
        ee.Number(year).mod(100).neq(0).Or(
            ee.Number(year).mod(400).eq(0)
        )
    )
    n_days = ee.Number(ee.Algorithms.If(is_leap, 366, 365))
    
    # Pre-compute daily means for entire year (365 or 366 images)
    def compute_daily_mean(day):
        day = ee.Number(day)
        date = ee.Date.fromYMD(year, 1, 1).advance(day.subtract(1), 'day')
        next_date = date.advance(1, 'day')
        
        # Get all hourly images for this day
        daily_collection = era5_hourly.filterDate(date, next_date)
        
        # Check if we have data
        has_data = daily_collection.size().gt(0)
        
        # Compute mean if data exists, otherwise use missing flag
        daily_mean = ee.Image(ee.Algorithms.If(
            has_data,
            daily_collection.mean(),
            ee.Image.constant(-9999).rename('temp_c')
        ))
        
        return daily_mean.set({
            'system:time_start': date.millis(),
            'doy': day,
            'date': date.format('YYYY-MM-dd')
        })
    
    days = ee.List.sequence(1, n_days)
    era5_daily = ee.ImageCollection.fromImages(days.map(compute_daily_mean))
    
    print(f"  Sampling all lakes from daily means...")
    
    # Now sample ALL lakes from each daily mean (batch operation)
    def sample_all_lakes(daily_img):
        doy = daily_img.get('doy')
        date = daily_img.get('date')
        
        # Sample ALL lakes at once using reduceRegions
        samples = daily_img.reduceRegions(
            collection=lakes_fc,
            reducer=ee.Reducer.first(),  # Get pixel value at centroid
            scale=11000  # ERA5-Land resolution
        )
        
        # Add date info to each sampled feature
        def add_date_info(feat):
            # Get the temperature value (from 'first' property created by reducer)
            # Use ee.Algorithms.If to provide default if missing
            temp_value = ee.Algorithms.If(
                feat.propertyNames().contains('first'),
                feat.get('first'),
                -9999
            )
            
            return feat.set({
                'era5_date': date,
                'era5_doy': doy,
                'temp_c': temp_value
            })
        
        return samples.map(add_date_info)
    
    # Process all daily images
    era5_features = era5_daily.map(sample_all_lakes).flatten()
    
    return era5_features

---
## Export Functions

In [80]:
def export_chunk_year(chunk_id, year, lakes_fc, region_bounds):
    """
    Process and export all data for one chunk and one year
    """
    print(f"\n{'='*60}")
    print(f"Processing Chunk {chunk_id}, Year {year}")
    print(f"{'='*60}")
    
    # Add lake geometries
    lakes_with_geom = add_lake_geometry_metrics(lakes_fc)
    n_lakes = lakes_with_geom.size().getInfo()
    print(f"Lakes in chunk: {n_lakes}")
    
    # Process S1
    print("\nProcessing Sentinel-1...")
    s1_features = process_sentinel1(lakes_with_geom, year, region_bounds)
    s1_count = s1_features.size().getInfo()
    print(f"  S1 observations: {s1_count}")
    
    # Process S2
    print("\nProcessing Sentinel-2...")
    s2_features = process_sentinel2(lakes_with_geom, year, region_bounds)
    s2_count = s2_features.size().getInfo()
    print(f"  S2 observations: {s2_count}")
    
    # Process ERA5
    print("\nProcessing ERA5 temperature...")
    era5_features = process_era5_temperature(lakes_with_geom, year)
    era5_count = era5_features.size().getInfo()
    print(f"  ERA5 observations: {era5_count}")
    
    # Export each dataset separately
    exports = []
    
    # S1 export
    s1_task = ee.batch.Export.table.toCloudStorage(
        collection=s1_features,
        description=f'S1_chunk{chunk_id:02d}_{year}',
        bucket=BUCKET,
        fileNamePrefix=f'{OUTPUT_PATH}/{year}/chunk_{chunk_id:02d}/s1_data',
        fileFormat='CSV'
    )
    
    # S2 export
    s2_task = ee.batch.Export.table.toCloudStorage(
        collection=s2_features,
        description=f'S2_chunk{chunk_id:02d}_{year}',
        bucket=BUCKET,
        fileNamePrefix=f'{OUTPUT_PATH}/{year}/chunk_{chunk_id:02d}/s2_data',
        fileFormat='CSV'
    )
    
    # ERA5 export
    era5_task = ee.batch.Export.table.toCloudStorage(
        collection=era5_features,
        description=f'ERA5_chunk{chunk_id:02d}_{year}',
        bucket=BUCKET,
        fileNamePrefix=f'{OUTPUT_PATH}/{year}/chunk_{chunk_id:02d}/era5_data',
        fileFormat='CSV'
    )
    
    exports = [
        {'task': s1_task, 'type': 'S1', 'count': s1_count},
        {'task': s2_task, 'type': 'S2', 'count': s2_count},
        {'task': era5_task, 'type': 'ERA5', 'count': era5_count}
    ]
    
    return exports

---
## Main Export Loop

In [81]:
# Load chunk statistics to know how many chunks we have
chunk_stats = pd.read_csv(f'gs://{BUCKET}/{BASE_PATH}/processed/chunk_statistics.csv')
n_chunks = len(chunk_stats)

print(f"Total chunks to process: {n_chunks}")
print(f"Years to process: {YEARS}")
print(f"Total exports: {n_chunks * len(YEARS) * 3} (chunks × years × 3 datasets)")
print("\nChunk statistics:")
print(chunk_stats[['chunk_id', 'n_lakes', 'lat_min', 'lat_max', 'lon_min', 'lon_max']])

Total chunks to process: 13
Years to process: [2019, 2020, 2021]
Total exports: 117 (chunks × years × 3 datasets)

Chunk statistics:
    chunk_id  n_lakes    lat_min    lat_max     lon_min     lon_max
0          0     2083  69.160813  70.902079 -153.130795 -151.727647
1          1     1452  69.006262  70.827977 -159.368810 -157.856992
2          2     1327  69.040671  70.501113 -150.347535 -148.888885
3          3     2518  69.006075  71.117631 -155.556146 -154.195153
4          4      780  69.027426  70.294397 -163.578774 -160.999169
5          5      223  69.061209  70.121377 -144.849551 -141.047786
6          6     1987  69.022434  71.334582 -157.932951 -156.619012
7          7      910  69.000834  70.388435 -149.010972 -147.408194
8          8     2044  69.016535  70.903127 -154.464362 -153.026821
9          9     1383  69.000146  70.478816 -151.904233 -150.237160
10        10      903  69.021021  70.839688 -161.140899 -159.325726
11        11     2148  69.298632  71.360948 -156.62

In [82]:
# Test with one chunk and one year first
TEST_CHUNK = 0
TEST_YEAR = 2019

print(f"\n{'#'*60}")
print(f"TEST RUN: Chunk {TEST_CHUNK}, Year {TEST_YEAR}")
print(f"{'#'*60}")

# Load chunk
test_fc, test_gdf = load_chunk_from_bucket(TEST_CHUNK)
test_bounds = ee.Geometry.Rectangle(test_gdf.total_bounds.tolist())

# Process and export
test_exports = export_chunk_year(TEST_CHUNK, TEST_YEAR, test_fc, test_bounds)

print(f"\n{'='*60}")
print("TEST EXPORTS PREPARED (NOT STARTED)")
print(f"{'='*60}")
for exp in test_exports:
    print(f"  {exp['type']}: {exp['count']} observations ready")
print("\nTo start exports, run the cells below.")


############################################################
TEST RUN: Chunk 0, Year 2019
############################################################
  Loaded chunk 0: 2083 lakes
    Bounds: [-153.14164553   69.15976659 -151.71727682   70.91525432]
  Simplifying geometries...
  Creating EE FeatureCollection...
  FeatureCollection created successfully

Processing Chunk 0, Year 2019
Lakes in chunk: 2083

Processing Sentinel-1...
  S1 observations: 245794

Processing Sentinel-2...
  S2 observations: 178567

Processing ERA5 temperature...
  Loading ERA5 hourly data for 2019...
  Pre-computing daily means...
  Sampling all lakes from daily means...
  ERA5 observations: 760295

TEST EXPORTS PREPARED (NOT STARTED)
  S1: 245794 observations ready
  S2: 178567 observations ready
  ERA5: 760295 observations ready

To start exports, run the cells below.


In [83]:
# TEST EXPORTS
# Use this to test the export on one chunk. Can skip it in future runs once working
SKIP_TEST = True  #True to skip this cell. 

if not SKIP_TEST:
    print("Starting test exports...")
    for exp in test_exports:
        exp['task'].start()
        print(f"  Started: {exp['task'].status()['description']}")
    
    print("\nTest exports started! Monitor at: https://code.earthengine.google.com/tasks")
    print("\nOnce test completes successfully, proceed to full export below.")
else:
    print("⏭️  Skipping test exports (SKIP_TEST = True)")
    print("Test exports already completed. Proceed to full export below.")

⏭️  Skipping test exports (SKIP_TEST = True)
Test exports already completed. Proceed to full export below.


---
## Full Export (All Chunks, All Years)

**WARNING:** This will start ~57 export tasks (19 chunks × 3 years × 3 datasets each)

Only run after test export completes successfully!

In [None]:
# Prepare all exports (run once test is good)
all_exports = []

for year in YEARS:
    for chunk_id in range(n_chunks):
        print(f"\nPreparing Chunk {chunk_id}, Year {year}...")
        
        # Load chunk
        chunk_fc, chunk_gdf = load_chunk_from_bucket(chunk_id)
        chunk_bounds = ee.Geometry.Rectangle(chunk_gdf.total_bounds.tolist())
        
        # Prepare exports
        exports = export_chunk_year(chunk_id, year, chunk_fc, chunk_bounds)
        
        for exp in exports:
            all_exports.append({
                'chunk_id': chunk_id,
                'year': year,
                'type': exp['type'],
                'task': exp['task'],
                'count': exp['count']
            })

print(f"\n{'='*60}")
print(f"ALL EXPORTS PREPARED: {len(all_exports)} tasks")
print(f"{'='*60}")
print("\nReady to start. Run next cell to begin all exports.")

In [None]:
# START ALL EXPORTS
# Only run after careful review!

print(f"Starting {len(all_exports)} export tasks...")
print("This may take a few minutes to submit all tasks.\n")

for i, exp in enumerate(all_exports):
    exp['task'].start()
    
    if (i+1) % 10 == 0:
        print(f"  Started {i+1}/{len(all_exports)} tasks...")
        time.sleep(2)  # Brief pause to avoid overwhelming GEE

print(f"\n{'='*60}")
print(f"ALL {len(all_exports)} EXPORTS STARTED!")
print(f"{'='*60}")
print("\nMonitor progress at: https://code.earthengine.google.com/tasks")
print(f"\nOutputs will be in: gs://{BUCKET}/{OUTPUT_PATH}/YEAR/chunk_XX/")

---
## Monitor Export Progress

In [None]:
# Check status of all exports
def check_export_status():
    status_summary = {
        'READY': 0,
        'RUNNING': 0,
        'COMPLETED': 0,
        'FAILED': 0,
        'CANCELLED': 0
    }
    
    for exp in all_exports:
        status = exp['task'].status()['state']
        status_summary[status] = status_summary.get(status, 0) + 1
    
    print(f"Export Status Summary:")
    print(f"  Total tasks: {len(all_exports)}")
    for state, count in status_summary.items():
        if count > 0:
            print(f"    {state}: {count}")
    
    return status_summary

# Run this cell periodically to check progress
check_export_status()

---
## Summary

This notebook exports:
- **Sentinel-1**: VV/VH backscatter for each lake interior
- **Sentinel-2**: NDSI-based ice fraction (with cloud filtering)
- **ERA5**: Daily mean temperature at lake locations

**For:**
- ~19 spatial chunks
- 3 years (2019, 2020, 2021)
- ~28,000 North Slope lakes

**Output structure:**
```
gs://wustl-eeps-geospatial/thermokarst_lakes/exports/
├── 2019/
│   ├── chunk_00/
│   │   ├── s1_data.csv
│   │   ├── s2_data.csv
│   │   └── era5_data.csv
│   ├── chunk_01/
│   └── ...
├── 2020/
└── 2021/
```

**Next step:** Combine CSVs and run ice detection algorithm (Notebook 03)