In [7]:
# -*- coding: utf-8 -*-
"""
@name: Batch Data Processing Script (Full Period Mode) - Python Version
@description:
This script processes and exports a Sentinel-2 SR composite for EACH FEATURE in a specified
area of interest (AOI) FeatureCollection and time range. It uses the COPERNICUS/S2_CLOUD_PROBABILITY
collection and the SCL band for accurate cloud and shadow masking (excluding water bodies),
and adds a 'pixel_count' band to indicate the number of clear observations per pixel.
Voids from masking are filled using the median of a specified reference year.

The workflow includes:
1. Filtering Sentinel-2 imagery by date, AOI, and cloud cover for each feature.
2. Joining with the cloud probability collection.
3. Applying cloud and shadow masking, using the SCL band to avoid masking water.
4. Calculating a 'pixel_count' band by summing valid observations.
5. Filling gaps in spectral bands using the median of a specific reference year.
6. Calculating spectral indices (NDVI, NDBI, etc.).
7. Generating a single composite image with temporal statistics and the pixel count for each feature.
8. Exporting the final processed raster(s) and a CSV list of source images to Google Drive for each feature.

@author: Adapted from PINV01-528 App by Carlos Giménez, converted to Python by Gemini
@version: 2.5.0 (Python Batch Processing Update)
"""

import ee
import geemap
import math

# --- Initialize the Earth Engine API ---
# Authenticate and initialize the Earth Engine library.
# This will require you to log in to your Google account and grant permissions.
try:
    ee.Initialize()
except Exception as e:
    ee.Authenticate()
    ee.Initialize()

In [8]:


# ====================================================================================
# ============================= 1. USER PARAMETERS ===================================
# ====================================================================================

# --- Collection Names ---
collection_name = 'COPERNICUS/S2_SR_HARMONIZED'
cloud_prob_collection_name = 'COPERNICUS/S2_CLOUD_PROBABILITY'

# Use MODIS Land Cover product to get native MODIS projection
modis_image = ee.Image("projects/ee-lc-monitoring/assets/proyecto_pinv530/modis_grid_py")

# --- Define the Area of Interest (AOI) Collection ---
# This is the full collection. The script will iterate over each feature.
aoi_asset_id = "projects/ee-lc-monitoring/assets/proyecto_pinv530/grid_py_25x25km_overlay_2km"

# The original script gets the first feature and then puts it back into a collection.
# This is likely for testing a single feature. The logic is preserved here.
#aoi_first = ee.FeatureCollection(aoi_asset_id)
aoiCollection = ee.FeatureCollection(aoi_asset_id)

print("AOI Feature Collection Name:", aoiCollection.get("id").getInfo())

# --- Time Period and Cloud Cover ---
startDate = '2023-10-01'
endDate = '2023-12-31'
cloudCoverMax = 10  # Can be higher since we are using per-pixel masking.

# --- Cloud & Shadow Masking Parameters ---
CLOUD_PROB_THRESHOLD = 50      # Cloud probability threshold (%). Higher is more strict.
NIR_DARK_THRESHOLD = 0.15      # NIR reflectance threshold for shadow detection.
CLOUD_PROJECTION_DIST = 50       # Max distance in meters to search for shadows from clouds.
cloud_cover_max_fill = 5       # Max cloud cover for filtering filling collection

# --- Gap Filling Parameter ---

# !!!!!!!!!!!!!!!!
REFERENCE_YEAR_FOR_FILLING = 2023 # The specific year to use for calculating the median for gap filling.
# !!!!!!!!!!!!!!!!

# --- Variable Selection ---
Available_bands = ['B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'B8', 'B8A', 'B9', 'B11', 'B12']
Available_indices = ['NDVI', 'NDBI', 'NDWI', 'SAVI']
selectedVars = ['B2', 'B3', 'B4', 'B5', 'B6', 'B7', 'B8', 'B8A', 'B9', 'B11', 'B12', 'NDVI', 'NDBI', 'NDWI', 'SAVI']

# --- Export options ---
exportFolder = 'PINV01528_exp_modis_mean_25kmGrid_2023_q4' # Changed name to avoid overwriting JS exports
exportScale = 500
# crs_export is determined dynamically from the modis_image later in the script.

# ====================================================================================
# =========================== 2. REQUIRED MODULES & FUNCTIONS ========================
# ====================================================================================

def rescaleBands(image):
    """Rescales Sentinel-2 bands by a factor of 0.0001."""
    return image.multiply(0.0001).copyProperties(image, image.propertyNames())

def addDateProperties(image):
    """Adds date properties to an image for reporting and time-series analysis."""
    date = image.date()
    year = date.get('year')
    yearStr = date.format('YYYY')
    yearMonth = date.format('YYYY-MM')
    getQuarter = ee.Number(date.get('month')).divide(3).ceil()
    quarter_year = yearStr.cat('_').cat(getQuarter.format('%d'))
    return image.set({
        'year': year,
        'yearMonth': yearMonth,
        'quarter_year': quarter_year
    }).copyProperties(image, ['system:time_start'])

def maskCloudsAndShadows(image):
    """Masks clouds and shadows in a Sentinel-2 SR image using S2 cloud probability and SCL."""
    cloudProb = ee.Image(image.get('cloud_prob')).select('probability')
    scl = image.select('SCL')
    
    # Identify cloud pixels based on SCL and probability.
    validCloudSCL = scl.eq(3).Or(scl.eq(8)).Or(scl.eq(9)).Or(scl.eq(10))
    cloudMask = cloudProb.gt(CLOUD_PROB_THRESHOLD).Or(validCloudSCL)
    cloudMaskDilated = cloudMask.focal_max(1)
    
    # Identify shadow pixels.
    notWater = scl.neq(6)
    azimuth = ee.Number(image.get('MEAN_SOLAR_AZIMUTH_ANGLE'))
    
    # Project shadows from clouds.
    shadowProjection = cloudMaskDilated.directionalDistanceTransform(azimuth.subtract(90), CLOUD_PROJECTION_DIST) \
        .reproject(crs=image.select('B8').projection(), scale=10).select('distance')
        
    darkPixels = image.select('B8').lt(NIR_DARK_THRESHOLD).And(notWater)
    ndvi = image.normalizedDifference(['B8', 'B4'])
    shadowMask = shadowProjection.mask().And(darkPixels).And(ndvi.lt(0.4))
    
    # Combine masks and apply to the image.
    combinedMask = cloudMaskDilated.Or(shadowMask)
    binaryMaskBand = combinedMask.rename('cloud_shadow_mask').uint8()
    maskedImage = image.updateMask(combinedMask.Not())
    
    return maskedImage.addBands(binaryMaskBand).copyProperties(image, image.propertyNames())

def fillVoidsByReferenceYear(maskedCollection, filling_coll, bandsToFill):
    """Fills gaps in a masked collection using the median of a specific reference year."""
    referenceCollection = filling_coll.select(bandsToFill)
    referenceMedian = referenceCollection.median()
    
    def fill_image(image):
        return image.unmask(referenceMedian).copyProperties(image, image.propertyNames())
        
    filledCollection = maskedCollection.select(bandsToFill).map(fill_image)
    return filledCollection

def addIndices(image):
    """Adds common spectral indices to an image."""
    ndvi = image.normalizedDifference(['B8', 'B4']).rename('NDVI').toFloat()
    ndbi = image.normalizedDifference(['B11', 'B8']).rename('NDBI').toFloat()
    savi = image.expression(
        '((NIR - RED) / (NIR + RED + L)) * (1 + L)', {
            'NIR': image.select('B8'), 'RED': image.select('B4'), 'L': 0.5
        }).rename('SAVI').toFloat()
    ndwi = image.normalizedDifference(['B3', 'B8']).rename('NDWI').toFloat()
    return image.addBands([ndvi, ndbi, savi, ndwi])

def getDerivedBandNames(selectedBaseVars):
    """Creates a list of derived band names with statistical suffixes using a Python list comprehension."""
    stats = ['_mean', '_median', '_min', '_max', '_stdDev']
    return [f"{baseVar}{stat}" for baseVar in selectedBaseVars for stat in stats]

# ====================================================================================
# ==================== 3. MAIN PROCESSING FUNCTION FOR A SINGLE FEATURE ==============
# ====================================================================================

def process_and_export_for_feature(feature):
    """
    This function encapsulates the entire processing and export workflow for one feature.
    Args:
        feature (ee.Feature): The single AOI feature to process.
    """
    aoi = ee.Feature(feature)
    aoi_name = ee.String(aoi.get('grid_id')).getInfo() # Use getInfo() for client-side naming
    regionToExport = aoi.geometry()

    print('====================================================')
    print(f'--- Processing AOI: {aoi_name} ---')

    modisProj = modis_image.clip(regionToExport).projection()
    print('MODIS projection:', modisProj.getInfo())

    # Define date range for gap filling
    start_filling = ee.Date.fromYMD(REFERENCE_YEAR_FOR_FILLING, 1, 1)
    end_filling = ee.Date.fromYMD(REFERENCE_YEAR_FOR_FILLING, 12, 31)

    # Get the initial Sentinel-2 SR image collection for the target period
    s2SrCollection = ee.ImageCollection(collection_name) \
        .filterDate(startDate, endDate) \
        .filterBounds(regionToExport) \
        .filter(ee.Filter.lte('CLOUDY_PIXEL_PERCENTAGE', cloudCoverMax)) \
        .map(rescaleBands) \
        .map(addDateProperties)

    # Get the collection for gap filling
    s2SrCollection_for_void_filling = ee.ImageCollection(collection_name) \
        .filterDate(start_filling, end_filling) \
        .filterBounds(regionToExport) \
        .filter(ee.Filter.lte('CLOUDY_PIXEL_PERCENTAGE', cloud_cover_max_fill)) \
        .map(rescaleBands) \
        .map(addDateProperties)

    # Get the corresponding cloud probability collection
    s2CloudProbCollection = ee.ImageCollection(cloud_prob_collection_name) \
        .filterDate(startDate, endDate) \
        .filterBounds(regionToExport)

    # Join the two collections
    s2WithCloudProb = ee.ImageCollection(ee.Join.inner().apply(
        primary=s2SrCollection,
        secondary=s2CloudProbCollection,
        condition=ee.Filter.equals(leftField='system:index', rightField='system:index')
    ))

    # Clean up join and add a 'cloud_prob' property
    def merge_collections(f):
        primary = ee.Image(f.get('primary'))
        secondary = ee.Image(f.get('secondary'))
        return primary.addBands(secondary).set('cloud_prob', secondary)

    joinedCollection = s2WithCloudProb.map(merge_collections)

    # Check if images were found for this feature
    imageCount = joinedCollection.size().getInfo()
    print(f'Images found for this AOI: {imageCount}')

    if imageCount == 0:
        print(f'No images found for AOI {aoi_name}. Skipping.')
        return  # Exit the function for this feature

    # Calculate initial pixel count before masking
    countImage = s2SrCollection.select('B4').reduce(ee.Reducer.count()).rename('pixel_count').toFloat()

    # Apply cloud and shadow masking
    maskedCollection = joinedCollection.map(maskCloudsAndShadows)

    # Count the number of cloud/shadow observations
    cloud_shadow_band = maskedCollection.select('cloud_shadow_mask').sum().rename('clouds_shadows_count').toFloat()

    # Fill gaps using the median of the reference year
    bandsToFill = Available_bands
    gapFilledCollection = fillVoidsByReferenceYear(maskedCollection, s2SrCollection_for_void_filling, bandsToFill)

    # Add spectral indices
    collectionWithIndices = gapFilledCollection.map(addIndices)

    # Calculate temporal statistics
    statBands = Available_bands + Available_indices
    meanImage = collectionWithIndices.select(statBands).mean().toFloat().rename([b + '_mean' for b in statBands])
    medianImage = collectionWithIndices.select(statBands).median().toFloat().rename([b + '_median' for b in statBands])
    minImage = collectionWithIndices.select(statBands).min().toFloat().rename([b + '_min' for b in statBands])
    maxImage = collectionWithIndices.select(statBands).max().toFloat().rename([b + '_max' for b in statBands])
    stdDevImage = collectionWithIndices.select(statBands).reduce(ee.Reducer.stdDev()).toFloat().rename([b + '_stdDev' for b in statBands])

    # Combine all statistical bands and the pixel count band
    combinedImage = ee.Image.cat([
        meanImage, medianImage, minImage, maxImage, stdDevImage, countImage, cloud_shadow_band
    ]).set({
        'start_date': startDate,
        'end_date': endDate,
        'image_count': imageCount,
        'aoi_id': aoi_name,
        'system:time_start': ee.Date(startDate).millis()
    })
    
    # --- Reducing resolution ---
    tagetProj = modisProj.getInfo()
    nativeProjection = joinedCollection.first().select('B2').projection()

    imageToExport = combinedImage#.setDefaultProjection(nativeProjection) \
      #  .reduceResolution(reducer=ee.Reducer.mean(), maxPixels=2600) \
       # .reproject(crs=tagetProj['crs'], crsTransform=tagetProj['transform'])
    
    # --- Rescaling ---
    APPLY_RESCALING = True
    SCALE_FACTOR = 10000

    def scaleAndCastImage(image, bandsToScale):
        nonScaledBands = image.select(ee.List(image.bandNames()).removeAll(bandsToScale)).toInt16()
        scaledBands = image.select(bandsToScale).multiply(SCALE_FACTOR).toInt16()
        return ee.Image.cat([scaledBands, nonScaledBands]).copyProperties(image, image.propertyNames())

    # --- Select bands for export ---
    baseBandsToExport = getDerivedBandNames(selectedVars)
    bandsToExportWithCounts = baseBandsToExport + ['pixel_count', 'clouds_shadows_count']
    finalImageForExport = imageToExport.select(bandsToExportWithCounts)

    # --- Apply Rescaling if enabled ---
    if APPLY_RESCALING:
        finalImageForExport = scaleAndCastImage(finalImageForExport, baseBandsToExport)
    
    # --- EXPORT 1: Image List CSV ---
   # def extract_metadata(image):
       # return ee.Feature(None, {
       #     'image_id': image.id(),
       #     'system_start_time': image.get('system:time_start'),
        #    'cloud_coverage': image.get('CLOUDY_PIXEL_PERCENTAGE'),
        #    'year': image.get('year'),
        #    'quarter_year': image.get('quarter_year'),
        #    'yearMonth': image.get('yearMonth')
        #})

   # imageMetadata = joinedCollection.map(extract_metadata)
   # csv_filename = f'ImageList_Full_period_CloudProb_{aoi_name}'
    
   # task_csv = ee.batch.Export.table.toDrive(
     #   collection=imageMetadata,
     #   description=csv_filename,
     #   fileFormat='CSV',
    #    folder=exportFolder
    #)
    #task_csv.start()
    #print(f"Started CSV export task: {csv_filename}")

    # --- EXPORT 2: Composite Image ---
    image_filename = f'Periodo_{startDate}_to_{endDate}_{aoi_name}'
    print(f'Initiating image export for: {image_filename}')
    
    task_image = ee.batch.Export.image.toDrive(
        image=ee.Image(finalImageForExport),  # <-- Explicitly cast to ee.Image
        description=image_filename,
        fileNamePrefix=image_filename,
        folder=exportFolder,
        region=regionToExport,
        scale=500,
        crs=tagetProj['crs'],
        maxPixels=1e13
    )
    task_image.start()
    print(f"Started Image export task: {image_filename}")



AOI Feature Collection Name: None


In [9]:
# ====================================================================================
# ========================== 4. ITERATE AND RUN WORKFLOW (BATCHED) ===================
# ====================================================================================
if __name__ == "__main__":
    # --- Create a map for visualization ---
    Map = geemap.Map()
    Map.centerObject(aoiCollection.first().geometry(), 8)
    Map.addLayer(aoiCollection, {'color': 'BLUE'}, 'Full AOI Collection', False)
    Map.addLayer(modis_image, {}, 'Modis PY Grid')
    
    # --- BATCH PROCESSING SETUP ---
    batch_size = 852
    total_features = aoiCollection.size().getInfo()
    print(f'Total number of features to process: {total_features}')

    # !!! IMPORTANT: CHANGE THIS NUMBER FOR EACH BATCH !!!
    batch_number = 0  # Start with 0 for the first batch, 1 for the second, etc.

    # --- Calculate start and end index for the current batch ---
    start_index = batch_number * batch_size
    end_index = min(start_index + batch_size, total_features)

    if start_index >= total_features:
        print("All batches have been processed. You've entered a batch number that is out of range.")
    else:
        print(f'--- Processing Batch #{batch_number}: Features {start_index} to {end_index - 1} ---')
        
        aoi_list = aoiCollection.toList(total_features)

        # --- Loop through each feature IN THE CURRENT BATCH ---
        for i in range(start_index, end_index):
            feature = ee.Feature(aoi_list.get(i))
            # Call the main processing function for the current feature
            process_and_export_for_feature(feature)

        print('----------------------------------------------------')
        print(f'Batch #{batch_number} tasks initiated. Check the "Tasks" tab in the GEE Code Editor or use `ee.data.listOperations()` to monitor.')
        print(f'To run the next batch, change batch_number to {batch_number + 1} and run the script again.')

    # Display the map if you are in an interactive environment like Jupyter
    # Map

Total number of features to process: 852
--- Processing Batch #0: Features 0 to 851 ---
--- Processing AOI: 20K_01_01 ---
MODIS projection: {'type': 'Projection', 'crs': 'EPSG:4326', 'transform': [0.004491576420597608, 0, -62.64850791449543, 0, -0.004491576420597608, -19.28233757362553]}
Images found for this AOI: 4
Initiating image export for: Periodo_2023-10-01_to_2023-12-31_20K_01_01
Started Image export task: Periodo_2023-10-01_to_2023-12-31_20K_01_01
--- Processing AOI: 20K_01_02 ---
MODIS projection: {'type': 'Projection', 'crs': 'EPSG:4326', 'transform': [0.004491576420597608, 0, -62.64850791449543, 0, -0.004491576420597608, -19.28233757362553]}
Images found for this AOI: 4
Initiating image export for: Periodo_2023-10-01_to_2023-12-31_20K_01_02
Started Image export task: Periodo_2023-10-01_to_2023-12-31_20K_01_02
--- Processing AOI: 20K_01_03 ---
MODIS projection: {'type': 'Projection', 'crs': 'EPSG:4326', 'transform': [0.004491576420597608, 0, -62.64850791449543, 0, -0.00449157