# Land Use Change Detection - Preprocessing Pipeline

## Overview
Satellite image processing pipeline for land use change detection between two time periods (T1 and T2).

## Requirements
- Earth Engine Python API
- IPython Widgets
- NumPy
- DateTime

## Pipeline Steps

1. **AOI Selection**
   - Draw on Map
   - Upload JSON
   - Enter Coordinates

2. **Time Selection**
   - Start date (T1)
   - End date (T2)
   - Interval in months

3. **Processing**
   - Cloud masking
   - Gaussian smoothing
   - Dark Object Subtraction
   - Geometric registration

4. **Export**
   - GeoTIFF format
   - Google Drive output
   - T1 and T2 images

## Usage
1. Select AOI using preferred method
2. Enter dates in YYYY-MM-DD format
3. Wait for processing completion
4. Check "EarthEngine_Exports" in Google Drive

## Notes
- Large areas automatically tiled
- Supports Sentinel-2 and Landsat
- Forest area minimum: 30 hectares

In [1]:
import ee
import geemap
import folium
from folium import plugins

In [2]:
import geemap
import folium
from folium import plugins

In [3]:
ee.Authenticate(auth_mode='colab')

True

In [4]:
ee.Initialize(project='ee-mujtabanaqvi29')

In [9]:
import ipywidgets as widgets
import json
from IPython.display import display

# Initialize the map with SATELLITE basemap
Map = geemap.Map(center=(0, 0), zoom=2)
Map.add_basemap('SATELLITE')
Map.add_draw_control()
display(Map)

# Store AOI globally
AOI = None  

# Function to handle user input method
def handle_aoi_selection(choice):
    if choice == 'Draw on Map':
        display(draw_aoi_map())
    elif choice == 'Upload JSON':
        display(upload_json_aoi())
    elif choice == 'Enter Coordinates':
        display(enter_coordinates_aoi())

# Function to get AOI from drawn feature
def draw_aoi_map():
    global AOI
    drawn_feature = Map.user_roi  # Retrieve the drawn feature
    if drawn_feature is None:
        print("⚠️ No AOI selected! Please draw a polygon or rectangle on the map.")
    else:
        AOI = ee.Geometry.Polygon(drawn_feature.getInfo()['coordinates'])  # Convert to EE Geometry
        print("✅ AOI defined from drawn feature.")

# Function to upload JSON AOI
def upload_json_aoi():
    upload_widget = widgets.FileUpload(accept='.json', multiple=False)

    def process_json(change):
        global AOI  # Store AOI globally
        try:
            uploaded_file = list(upload_widget.value.values())[0]
            json_data = json.loads(uploaded_file['content'].decode('utf-8'))

            if 'coordinates' in json_data and isinstance(json_data['coordinates'], list):
                AOI = ee.Geometry.Polygon(json_data['coordinates'])
                print("✅ AOI successfully loaded from JSON!")
            else:
                print("⚠️ Invalid JSON format. Ensure it contains a 'coordinates' key with a valid polygon list.")

        except Exception as e:
            print(f"⚠️ Error processing JSON file: {e}")

    upload_widget.observe(process_json, names='value')
    return upload_widget

# Function to manually enter coordinates for AOI
def enter_coordinates_aoi():
    coord_input = widgets.Textarea(
        placeholder="Enter coordinates as [[lon1, lat1], [lon2, lat2], ...]",
        layout=widgets.Layout(width='100%', height='100px')
    )

    def process_coordinates(button):
        global AOI  # Store AOI globally
        try:
            coords = json.loads(coord_input.value)  # Convert input string to Python list

            if not isinstance(coords, list) or not all(isinstance(i, list) and len(i) == 2 for i in coords):
                print("⚠️ Invalid JSON format. Ensure input is a list of coordinate pairs like: [[lon1, lat1], [lon2, lat2], ...]")
                return

            AOI = ee.Geometry.Polygon([coords])  
            print("✅ AOI successfully created from coordinates!")

        except Exception as e:
            print(f"⚠️ Error processing coordinates: {e}")
            print("Ensure your input is in the correct format: [[lon1, lat1], [lon2, lat2], ...]")

    submit_button = widgets.Button(description="Submit Coordinates")
    submit_button.on_click(process_coordinates)
    
    return widgets.VBox([coord_input, submit_button])

# Create a dropdown for user to select AOI input method
aoi_choice = widgets.Dropdown(
    options=['Draw on Map', 'Upload JSON', 'Enter Coordinates'],
    description='AOI Method:',
)

# Button to proceed with selected method
proceed_button = widgets.Button(description="Proceed")
proceed_button.on_click(lambda x: handle_aoi_selection(aoi_choice.value))

# Display selection widgets
display(widgets.VBox([aoi_choice, proceed_button]))


Map(center=[0, 0], controls=(WidgetControl(options=['position', 'transparent_bg'], widget=SearchDataGUI(childr…

VBox(children=(Dropdown(description='AOI Method:', options=('Draw on Map', 'Upload JSON', 'Enter Coordinates')…

⚠️ No AOI selected! Please draw a polygon or rectangle on the map.


None

✅ AOI defined from drawn feature.


None

In [11]:
# Function to display AOI on the map
def show_aoi_on_map(aoi):
    Map.layers = Map.layers[:1]  # Clear previous AOI layers, keeping the basemap
    Map.add_basemap('SATELLITE')  # Ensure satellite imagery is used

    # Add AOI as a highlighted layer
    Map.addLayer(aoi, {"color": "red", "width": 2}, "AOI")  
    
    # Get AOI bounds and zoom in
    bounds = aoi.bounds().getInfo()['coordinates'][0]
    lon_min, lat_min = bounds[0]
    lon_max, lat_max = bounds[2]
    center_lon, center_lat = (lon_min + lon_max) / 2, (lat_min + lat_max) / 2

    Map.setCenter(center_lon, center_lat, zoom=10)  # Adjust zoom level

    print("✅ AOI displayed on map.")

# Ensure AOI is globally stored from previous cell
try:
    if AOI is not None:  # If AOI was set
        show_aoi_on_map(AOI)
        print("✅ AOI successfully displayed from previous selection.")
    else:
        print("⚠️ No AOI found. Ensure you selected AOI using JSON, Draw, or Coordinates before running this cell.")
except Exception as e:
    print(f"⚠️ Error displaying AOI: {e}")


✅ AOI displayed on map.
✅ AOI successfully displayed from previous selection.


In [12]:
# Input start and end dates
start_date = input("Enter the start date for analysis (YYYY-MM-DD): ")
end_date = input("Enter the end date for analysis (YYYY-MM-DD): ")

# Input the duration for change detection
interval_months = int(input("Enter the duration of change detection in months (e.g., 6): "))
print(f"Start Date: {start_date}, End Date: {end_date}, Duration: {interval_months} months.")

Start Date: 2022-01-01, End Date: 2022-12-25, Duration: 6 months.


In [13]:
# Load Sentinel-2, Landsat 8, or Landsat 5 based on availability
def get_best_satellite_collection(start_date, end_date, AOI):
    """
    Selects the best satellite collection based on data availability.
    Priority: Sentinel-2 > Landsat 8 > Landsat 5.
    """
    print("\n🔍 Checking available satellite imagery...")

    # Sentinel-2 Availability Check (2015-Present)
    sentinel2_count = ee.ImageCollection('COPERNICUS/S2_SR') \
        .filterDate(start_date, end_date) \
        .filterBounds(AOI).size().getInfo()

    if sentinel2_count > 0:
        print("✅ Using Sentinel-2 (2015-Present)")
        return 'COPERNICUS/S2_SR', ['B8', 'B4'], 10  # Near-Infrared (B8) & Red (B4), 10m resolution

    # Landsat 8 Availability Check (2013-Present)
    landsat8_count = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2') \
        .filterDate(start_date, end_date) \
        .filterBounds(AOI).size().getInfo()

    if landsat8_count > 0:
        print("✅ Using Landsat 8 (2013-Present)")
        return 'LANDSAT/LC08/C02/T1_L2', ['SR_B5', 'SR_B4'], 30  # NIR (B5) & Red (B4), 30m resolution

    # Landsat 5 Availability Check (1984-2013)
    landsat5_count = ee.ImageCollection('LANDSAT/LT05/C02/T1_L2') \
        .filterDate(start_date, end_date) \
        .filterBounds(AOI).size().getInfo()

    if landsat5_count > 0:
        print("✅ Using Landsat 5 (1984-2013)")
        return 'LANDSAT/LT05/C02/T1_L2', ['SR_B4', 'SR_B3'], 30  # NIR (B4) & Red (B3), 30m resolution

    print("⚠️ No available imagery for the selected time range!")
    return None, None, None

# Determine the best dataset
best_collection, ndvi_bands, scale_resolution = get_best_satellite_collection(start_date, ee.Date(start_date).advance(30, 'day'), AOI)

if best_collection is not None:
    # Load the selected ImageCollection
    image_collection = ee.ImageCollection(best_collection) \
        .filterDate(start_date, ee.Date(start_date).advance(30, 'day')) \
        .filterBounds(AOI)

    # Cloud Masking Function (Sentinel-2, Landsat 8 & 5)
    def mask_clouds(image):
        if best_collection == 'COPERNICUS/S2_SR':  # Sentinel-2
            qa = image.select('QA60')
            cloud_bit_mask = 1 << 10  # Bit 10 represents clouds
            cirrus_bit_mask = 1 << 11  # Bit 11 represents cirrus clouds
            mask = qa.bitwiseAnd(cloud_bit_mask).eq(0).And(qa.bitwiseAnd(cirrus_bit_mask).eq(0))
        else:  # Landsat 8 & 5
            qa = image.select('QA_PIXEL')
            cloud_mask = 1 << 4  # Bit 4 represents cloud presence
            mask = qa.bitwiseAnd(cloud_mask).eq(0)
        
        return image.updateMask(mask)

    # Apply Cloud Masking and Create a Median Composite
    composite_image = image_collection.map(mask_clouds).median().clip(AOI)

    # Compute NDVI
    ndvi_image = composite_image.normalizedDifference(ndvi_bands).rename('NDVI')

    # Define Forest Suitability Function
    def assess_forest_suitability(ndvi_image, aoi_geometry, Map, satellite_used, scale_resolution):
        """
        Assess forest suitability using dual thresholds: percentage and absolute area.
        """
        try:
            print(f"\n📡 Assessing forest suitability using {satellite_used}...")

            # Calculate total AOI area in hectares
            aoi_area_ha = aoi_geometry.area().divide(10000).getInfo()
            print(f"📍 Total AOI area: {aoi_area_ha:.2f} hectares")

            # Create forest mask based on NDVI threshold
            NDVI_FOREST_THRESHOLD = 0.3
            forest_mask = ndvi_image.gte(NDVI_FOREST_THRESHOLD)
            print("🌱 Created forest mask based on NDVI threshold (NDVI ≥ 0.3)")

            # Debug: Show the forest mask layer on the map
            Map.addLayer(
                forest_mask,
                {'min': 0, 'max': 1, 'palette': ['white', 'green']},
                f'Forest Mask (NDVI ≥ 0.3) [{satellite_used}]'
            )

            # Calculate forest area correctly
            forest_stats = forest_mask.multiply(ee.Image.pixelArea()).reduceRegion(
                reducer=ee.Reducer.sum(),
                geometry=aoi_geometry,
                scale=scale_resolution,
                maxPixels=1e9
            ).getInfo()
            print(f"🌍 Raw forest stats: {forest_stats}")

            # Extract forest area correctly
            forest_area_m2 = forest_stats.get('NDVI', 0)  # Ensure correct key
            forest_area_ha = forest_area_m2 / 10000  # Convert m² to hectares
            print(f"🌳 Forest area: {forest_area_ha:.2f} hectares")

            # Calculate forest percentage
            forest_percentage = (forest_area_ha / aoi_area_ha) * 100
            print(f"📊 Forest percentage: {forest_percentage:.2f}%")

            # Define thresholds
            MIN_FOREST_AREA_HA = 30
            MIN_FOREST_PERCENTAGE = 15

            # Assess suitability
            is_suitable = (forest_area_ha >= MIN_FOREST_AREA_HA and 
                          forest_percentage >= MIN_FOREST_PERCENTAGE)

            # Print results
            print("\n🌎 Forest Assessment Results:")
            print(f"📏 Total AOI Area: {aoi_area_ha:.2f} hectares")
            print(f"🌲 Forest Area: {forest_area_ha:.2f} hectares")
            print(f"🟩 Forest Coverage: {forest_percentage:.2f}%")
            print(f"🔍 Minimum Required: {MIN_FOREST_AREA_HA} hectares and {MIN_FOREST_PERCENTAGE}%")

            if is_suitable:
                print("\n✅ Area is suitable for deforestation analysis")
            else:
                print("\n🚫 Area is NOT suitable for deforestation analysis")
                if forest_area_ha < MIN_FOREST_AREA_HA:
                    print(f"  - Insufficient forest area ({forest_area_ha:.2f} < {MIN_FOREST_AREA_HA} ha)")
                if forest_percentage < MIN_FOREST_PERCENTAGE:
                    print(f"  - Insufficient coverage ({forest_percentage:.2f}% < {MIN_FOREST_PERCENTAGE}%)")

            return is_suitable, forest_area_ha, forest_percentage

        except Exception as e:
            print(f"❌ Error in forest assessment: {str(e)}")
            import traceback
            traceback.print_exc()
            return False, 0, 0

    # Run Forest Suitability Analysis
    print("\n🚀 Starting forest suitability assessment...")
    is_suitable, forest_area, forest_percentage = assess_forest_suitability(ndvi_image, AOI, Map, best_collection, scale_resolution)

    # Display Final Map
    print("\n🛰️ Sentinel-2/Landsat NDVI Map for the First 30 Days:")
    Map
else:
    print("⚠️ No valid satellite images found for the selected date range. Try a different time period.")



🔍 Checking available satellite imagery...
✅ Using Sentinel-2 (2015-Present)

🚀 Starting forest suitability assessment...

📡 Assessing forest suitability using COPERNICUS/S2_SR...
📍 Total AOI area: 283.61 hectares
🌱 Created forest mask based on NDVI threshold (NDVI ≥ 0.3)
🌍 Raw forest stats: {'NDVI': 2339800.7391093043}
🌳 Forest area: 233.98 hectares
📊 Forest percentage: 82.50%

🌎 Forest Assessment Results:
📏 Total AOI Area: 283.61 hectares
🌲 Forest Area: 233.98 hectares
🟩 Forest Coverage: 82.50%
🔍 Minimum Required: 30 hectares and 15%

✅ Area is suitable for deforestation analysis

🛰️ Sentinel-2/Landsat NDVI Map for the First 30 Days:


Following cell calculated the dynamic Deforesation check (based on selected area size)

In [14]:
# Import necessary libraries
from datetime import datetime, timedelta
import math

# Function to generate robust time intervals
def generate_time_intervals(start_date, end_date, duration_months):
    intervals = []
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    end_date = datetime.strptime(end_date, "%Y-%m-%d")

    # Loop through and create intervals
    current_date = start_date
    while current_date < end_date:
        next_date = current_date + timedelta(days=duration_months * 30)  # Approximate 1 month = 30 days
        # Ensure last interval ends exactly at the end_date
        if next_date > end_date:
            next_date = end_date
        intervals.append((current_date.strftime("%Y-%m-%d"), next_date.strftime("%Y-%m-%d")))
        current_date = next_date

    return intervals

# Generate robust intervals
time_intervals = generate_time_intervals(start_date, end_date, interval_months)

# Print the generated intervals
print("📅 Generated Time Intervals:")
for interval in time_intervals:
    print(f"  - {interval[0]} to {interval[1]}")

# Function to check satellite image availability
def check_satellite_availability(start, end, AOI):
    """
    Checks the availability of Sentinel-2, Landsat 8, and Landsat 5.
    Priority: Sentinel-2 > Landsat 8 > Landsat 5.
    """
    satellite_data = [
        ("COPERNICUS/S2_SR", "Sentinel-2"),  # Sentinel-2 (2015-Present)
        ("LANDSAT/LC08/C02/T1_L2", "Landsat 8"),  # Landsat 8 (2013-Present)
        ("LANDSAT/LT05/C02/T1_L2", "Landsat 5")   # Landsat 5 (1984-2013)
    ]

    for satellite, name in satellite_data:
        image_collection = ee.ImageCollection(satellite) \
            .filterDate(start, end) \
            .filterBounds(AOI)
        image_count = image_collection.size().getInfo()

        if image_count > 0:
            print(f"✅ Interval: {start} to {end} - {name} Available ({image_count} images)")
            return satellite, name, image_count  # Return the first available satellite

    print(f"⚠️ Interval: {start} to {end} - No satellite data available!")
    return None, None, 0  # No available data

# Check satellite image availability for each interval
selected_satellites = []

for interval in time_intervals:
    start, end = interval
    satellite, name, image_count = check_satellite_availability(start, end, AOI)

    if satellite is not None:
        selected_satellites.append((start, end, satellite, name, image_count))

# Print the selected satellite sources
print("\n📡 **Final Selected Satellites Per Interval:**")
for entry in selected_satellites:
    print(f"  - {entry[0]} to {entry[1]} | 📡 {entry[3]} ({entry[4]} images)")

# If no satellite images are found at all
if len(selected_satellites) == 0:
    print("\n⚠️ No satellite data found for any interval. Try adjusting the date range or AOI.")


📅 Generated Time Intervals:
  - 2022-01-01 to 2022-06-30
  - 2022-06-30 to 2022-12-25
✅ Interval: 2022-01-01 to 2022-06-30 - Sentinel-2 Available (71 images)
✅ Interval: 2022-06-30 to 2022-12-25 - Sentinel-2 Available (69 images)

📡 **Final Selected Satellites Per Interval:**
  - 2022-01-01 to 2022-06-30 | 📡 Sentinel-2 (71 images)
  - 2022-06-30 to 2022-12-25 | 📡 Sentinel-2 (69 images)


In [17]:
# Define a cloud masking function for different satellites
def mask_clouds(image, satellite):
    """
    Applies cloud masking dynamically based on the satellite sensor.
    """
    qa_bands = image.bandNames()

    if satellite == 'COPERNICUS/S2_SR':  # Sentinel-2
        has_qa60 = qa_bands.contains('QA60')
        return ee.Algorithms.If(
            has_qa60,
            image.updateMask(
                image.select('QA60').bitwiseAnd(1 << 10).eq(0).And(
                    image.select('QA60').bitwiseAnd(1 << 11).eq(0)
                )
            ),
            image
        )
    
    elif satellite in ['LANDSAT/LC08/C02/T1_L2', 'LANDSAT/LT05/C02/T1_L2']:  # Landsat 8 & Landsat 5
        has_qa_pixel = qa_bands.contains('QA_PIXEL')
        return ee.Algorithms.If(
            has_qa_pixel,
            image.updateMask(
                image.select('QA_PIXEL').bitwiseAnd(1 << 4).eq(0)  # Bit 4 represents cloud mask
            ),
            image
        )

    return image  # If no QA band is present, return the original image

# Define RGB Bands for Visualization Based on Satellite
RGB_BANDS = {
    'COPERNICUS/S2_SR': ['B4', 'B3', 'B2'],  # Sentinel-2 (Red, Green, Blue)
    'LANDSAT/LC08/C02/T1_L2': ['SR_B4', 'SR_B3', 'SR_B2'],  # Landsat 8 (Red, Green, Blue)
    'LANDSAT/LT05/C02/T1_L2': ['SR_B3', 'SR_B2', 'SR_B1']  # Landsat 5 (Red, Green, Blue)
}

# Define visualization parameters for different satellites
VIS_PARAMS = {
    'COPERNICUS/S2_SR': {'min': 0, 'max': 3000, 'gamma': 1.4},  # Sentinel-2
    'LANDSAT/LC08/C02/T1_L2': {'min': 0, 'max': 15000, 'gamma': 1.3},  # Landsat 8
    'LANDSAT/LT05/C02/T1_L2': {'min': 0, 'max': 10000, 'gamma': 1.3}  # Landsat 5
}

# Process time intervals for all selected satellites
for interval in selected_satellites:
    start, end, satellite, name, image_count = interval  # Get interval details

    print(f"\n📡 Processing {name} imagery for interval: {start} to {end} ({image_count} images)")

    # Load the selected ImageCollection
    image_collection = ee.ImageCollection(satellite) \
        .filterDate(start, end) \
        .filterBounds(AOI) \
        .map(lambda img: mask_clouds(img, satellite))  # Apply cloud masking dynamically

    # Check if images are available
    image_count = image_collection.size().getInfo()
    if image_count == 0:
        print(f"⚠️ No images available for {name} during {start} to {end}. Skipping.")
        continue

    # Create a median composite
    composite = image_collection.median().clip(AOI)

    # Ensure composite is not empty
    composite_check = composite.reduceRegion(
        reducer=ee.Reducer.mean(),
        geometry=AOI,
        scale=30,
        maxPixels=1e9
    ).getInfo()

    if not composite_check:
        print(f"⚠️ Composite image for {name} ({start} to {end}) is empty. Skipping.")
        continue

    # Apply Gaussian smoothing to reduce noise
    smoothed_composite = composite.convolve(ee.Kernel.gaussian(radius=3, sigma=1, units='pixels'))

    # Use a single band to extract CRS and ensure consistent projection
    single_band = composite.select(RGB_BANDS[satellite][0])  # Select the red band
    crs_string = single_band.projection().crs().getInfo()

    # Reproject the smoothed composite using the extracted CRS
    aligned_composite = smoothed_composite.reproject(
        crs=crs_string,  # CRS string
        scale=10 if satellite == 'COPERNICUS/S2_SR' else 30  # Sentinel-2 (10m) vs Landsat (30m)
    )

    # Visualize the composite for this interval
    Map.addLayer(
        aligned_composite.select(RGB_BANDS[satellite]),  # Dynamically selects RGB bands
        VIS_PARAMS[satellite],  # Use correct visualization parameters
        f"{name} Composite {start} to {end}"
    )

    # Add text labels for T1 and T2 on the map
    text_feature_1 = ee.Feature(ee.Geometry.Point(AOI.centroid().getInfo()['coordinates']),
                                {'label': f"T1: {start}"})
    text_feature_2 = ee.Feature(ee.Geometry.Point(AOI.centroid().getInfo()['coordinates']),
                                {'label': f"T2: {end}"})

    text_layer = ee.FeatureCollection([text_feature_1, text_feature_2])

    Map.addLayer(text_layer, {}, f"Time Labels {start} to {end}")

print("\n✅ All available composites have been processed and displayed on the map.")
Map



📡 Processing Sentinel-2 imagery for interval: 2022-01-01 to 2022-06-30 (71 images)

📡 Processing Sentinel-2 imagery for interval: 2022-06-30 to 2022-12-25 (69 images)

✅ All available composites have been processed and displayed on the map.


Map(bottom=2482240.0, center=[-31.336337255279147, -64.11982416000096], controls=(WidgetControl(options=['posi…

In [18]:
# Define tile size dynamically based on satellite resolution
tile_size_meters = 256 * (10 if best_collection == 'COPERNICUS/S2_SR' else 30)  # Sentinel-2 (10m), Landsat (30m)

# 🔹 Ensure AOI is a valid rectangular grid by using a bounding box
expanded_AOI = AOI.bounds()  # Create a bounding box that fully covers the AOI

# 🔹 Check if the AOI is smaller than a single tile and expand if necessary
aoi_area_m2 = AOI.area().getInfo()  # Get AOI area in square meters
min_tile_area = tile_size_meters ** 2  # Minimum tile size area (256x256 pixels)

if aoi_area_m2 < min_tile_area:
    print("⚠️ AOI is too small for a full tile. Expanding to a single tile-sized bounding box...")

    # Expand AOI to nearest 256x256 tile size
    centroid = AOI.centroid()
    expanded_AOI = centroid.buffer(tile_size_meters / 2).bounds()  # Create a single tile

    # ✅ Do not display any tiles on the map for small AOIs
    print("✅ AOI is now covered by a single tile. No need to display a grid.")
    
else:
    # 🔹 Generate the grid based on the expanded AOI for larger AOIs
    grid = expanded_AOI.coveringGrid(
        proj=ee.Projection(crs_string),  # Use the extracted CRS
        scale=tile_size_meters  # Define the scale for each tile
    ).map(lambda feature: feature.intersection(expanded_AOI, ee.ErrorMargin(1)))  # Clip to expanded AOI with error margin

    # 🔹 Process tiles for each time interval
    for interval in selected_satellites:  
        start, end, satellite, name, image_count = interval  

        print(f"\n📡 Processing tiles for {name} from {start} to {end} ({image_count} images)")

        # Iterate through each tile in the FeatureCollection
        grid_list = grid.toList(grid.size())  # Convert FeatureCollection to a list
        for i in range(grid.size().getInfo()):
            tile = ee.Feature(grid_list.get(i))  # Get tile as Feature
            tile_geom = tile.geometry()  # Extract geometry

            # ✅ Only display tiles if AOI is large enough
            Map.addLayer(
                tile_geom,
                {'color': 'blue'},  # Display individual tiles in blue
                f"Tile {i + 1} ({start} to {end})"
            )

    # 🔹 Add the entire grid clipped to expanded AOI for visualization
    Map.addLayer(
        grid,
        {'color': 'red'},  # Display the full grid in red
        "256x256 Tiles (Expanded & Clipped to AOI)"
    )

    print("\n✅ The grid of 256x256 tiles, expanded for full AOI coverage, is displayed for all time windows.")

# **Display the map**
Map


⚠️ AOI is too small for a full tile. Expanding to a single tile-sized bounding box...
✅ AOI is now covered by a single tile. No need to display a grid.


Map(bottom=2482240.0, center=[-31.336337255279147, -64.11982416000096], controls=(WidgetControl(options=['posi…

Download tiles one by one.

In [36]:
# Define export folder and format
export_folder = "EarthEngine_Exports"
export_format = "GeoTIFF"

try:
    # Function for Gaussian blur
    def apply_gaussian_blur(image):
        """Apply Gaussian blur to reduce noise"""
        return image.convolve(ee.Kernel.gaussian(radius=1.5, sigma=1, units='pixels'))
    
    # Function for Dark Object Subtraction (DOS)
    def apply_dos(image):
        """Apply dark object subtraction for atmospheric correction"""
        dark_stats = image.reduceRegion(
            reducer=ee.Reducer.percentile([1]),
            geometry=AOI,
            scale=10,
            maxPixels=1e9
        )
        return image.subtract(ee.Image.constant(dark_stats.values()))

    # Function to process image collection
    def process_collection(start_date, collection_name):
        """Process image collection with all preprocessing steps"""
        collection = ee.ImageCollection(collection_name) \
            .filterDate(start_date, ee.Date(start_date).advance(1, 'month')) \
            .filterBounds(AOI) \
            .map(mask_clouds)
        
        composite = collection.median() \
            .clip(AOI) \
            .reproject('EPSG:4326', None, 30)
        
        return apply_dos(apply_gaussian_blur(composite))

    # Process both time periods
    print("Processing T1 images...")
    t1_processed = process_collection(start_date, best_collection)
    
    print("Processing T2 images...")
    t2_processed = process_collection(end_date, best_collection)
    
    # Register T2 to T1
    t2_registered = ee.Image.register(t2_processed, t1_processed, 50)
    
    # Function to create export task
    def create_export_task(image, description, region):
        """Create and start an export task"""
        task = ee.batch.Export.image.toDrive(
            image=image,
            description=description,
            folder=export_folder,
            fileFormat=export_format,
            region=region,
            scale=10 if best_collection == 'COPERNICUS/S2_SR_HARMONIZED' else 30,
            maxPixels=1e9
        )
        task.start()
        return task

    # Export logic
    if aoi_area_m2 >= min_tile_area:
        print("🔹 Exporting tiled images for T1 and T2...")
        grid_list = grid.toList(grid.size())
        grid_size = grid.size().getInfo()
        
        for i in range(grid_size):
            tile_geom = ee.Feature(grid_list.get(i)).geometry()
            
            # Export both time periods
            create_export_task(t1_processed, f"T1_Tile_{i+1}_{start_date}_processed", tile_geom)
            create_export_task(t2_registered, f"T2_Tile_{i+1}_{end_date}_processed", tile_geom)
            print(f"✅ Started exports for tile {i+1}")
            
    else:
        print("🔹 Exporting single images for T1 and T2...")
        create_export_task(t1_processed, f"T1_Full_AOI_{start_date}_processed", AOI)
        create_export_task(t2_registered, f"T2_Full_AOI_{end_date}_processed", AOI)
        print("✅ Exports started")

except Exception as e:
    print(f"❌ Error: {str(e)}")

Processing T1 images...
Processing T2 images...
🔹 Exporting single images for T1 and T2...
✅ Exports started
