Data Collection Pipeline from Google Earth Engine

In [None]:
import ee
import time
from datetime import datetime

# Initialize Earth Engine
ee.Authenticate()
ee.Initialize(project = 'dm-sentinel-2')

In [None]:
# Define Phoenix ROI
def get_phoenix_roi():
    phoenix_coords = [
        [-112.3244, 33.2903],
        [-112.3244, 33.8153],
        [-111.9255, 33.8153],
        [-111.9255, 33.2903],
        [-112.3244, 33.2903]
    ]
    return ee.Geometry.Polygon([phoenix_coords])

In [None]:
# Split ROI into tiles (~51km² each)
def split_roi(roi, tile_size_m=5120, scale=10):
    bounds = roi.bounds().coordinates().getInfo()[0]
    minx, miny = bounds[0][0], bounds[0][1]
    maxx, maxy = bounds[2][0], bounds[2][1]

    lon_tiles = int((maxx - minx) * 111000 // tile_size_m) + 1
    lat_tiles = int((maxy - miny) * 111000 // tile_size_m) + 1

    tiles = []
    for i in range(lon_tiles):
        for j in range(lat_tiles):
            x0 = minx + i * (tile_size_m / 111000)
            x1 = minx + (i + 1) * (tile_size_m / 111000)
            y0 = miny + j * (tile_size_m / 111000)
            y1 = miny + (j + 1) * (tile_size_m / 111000)
            tile = ee.Geometry.Rectangle([x0, y0, x1, y1])
            tiles.append(tile)
    return tiles

In [None]:
# Quarter date mapping
quarters = {
    1: ("01-01", "03-31"),
    2: ("04-01", "06-30"),
    3: ("07-01", "09-30"),
    4: ("10-01", "12-31"),
}


In [None]:
# Cloud masking for Sentinel-2 using SCL
def mask_sentinel_clouds(img):
    scl = img.select("SCL")
    mask = scl.neq(3).And(scl.neq(8)).And(scl.neq(9)).And(scl.neq(10))  # exclude cloud, cirrus, shadows
    return img.updateMask(mask)

In [None]:
# Get quarterly image: Sentinel-2 for >= 2017, Landsat for < 2017
def get_quarterly_image(year, quarter, roi):
    start, end = quarters[quarter]
    start_date = f"{year}-{start}"
    end_date = f"{year}-{end}"

    if year >= 2017:
        # Sentinel-2 L2A
        collection = (
            ee.ImageCollection("COPERNICUS/S2_SR")
            .filterDate(start_date, end_date)
            .filterBounds(roi)
            .filter(ee.Filter.lt("CLOUDY_PIXEL_PERCENTAGE", 20))
            .map(mask_sentinel_clouds)
        )

        count = collection.size().getInfo()
        if count == 0:
            print(f"No Sentinel-2 images for {year} Q{quarter}")
            return None

        image = collection.median().clip(roi).select(["B4", "B3", "B2"]).divide(10000)  # RGB, scaled to 0–1

    else:
        # Landsat (merged from 5, 7, 8, 9)
        collection = (
            ee.ImageCollection("LANDSAT/LT05/C02/T1_L2")
            .merge(ee.ImageCollection("LANDSAT/LE07/C02/T1_L2"))
            .merge(ee.ImageCollection("LANDSAT/LC08/C02/T1_L2"))
            .merge(ee.ImageCollection("LANDSAT/LC09/C02/T1_L2"))
            .filterDate(start_date, end_date)
            .filterBounds(roi)
            .filter(ee.Filter.lt("CLOUD_COVER", 20))
        )

        count = collection.size().getInfo()
        if count == 0:
            print(f"No Landsat images for {year} Q{quarter}")
            return None

        image = collection.median().clip(roi).select(["SR_B4", "SR_B3", "SR_B2"])  # RGB
        image = image.multiply(0.0000275).add(-0.2)  # scale reflectance to 0–1

    return image

In [None]:
# Export image tile with retry
def export_image_tile(image, year, quarter, tile, tile_id, scale, retries=3):
    name = f"Phoenix_{year}_Q{quarter}_T{tile_id}"
    for attempt in range(retries):
        try:
            task = ee.batch.Export.image.toDrive(
                image=image,
                description=name,
                folder="Phoenix_Quarterly_Images",
                fileNamePrefix=name,
                region=tile.getInfo()['coordinates'],
                scale=scale,
                maxPixels=1e13
            )
            task.start()
            print(f"[{datetime.now()}] Export started: {name}")
            return task
        except Exception as e:
            print(f"Retry {attempt + 1}/{retries} for {name}: {e}")
            time.sleep(5)
    print(f"Failed all retries for {name}")
    return None

In [None]:
# Export image to Google Drive
def export_image(image, year, quarter, roi):
    name = f"Phoenix_{year}_Q{quarter}"
    task = ee.batch.Export.image.toDrive(
        image=image,
        description=name,
        folder="Phoenix_Quarterly_Images",
        fileNamePrefix=name,
        region=roi.coordinates().getInfo(),
        scale=10 if year >= 2017 else 30,  # Sentinel: 10m, Landsat: 30m
        maxPixels=1e13
    )
    task.start()
    print(f"[{datetime.now()}] Export started: {name}")
    return task

In [None]:
# Wait for all tasks to complete
def wait_for_tasks(tasks):
    print("\n🔄 Monitoring export tasks...")
    while any(t.status()['state'] in ['READY', 'RUNNING'] for t in tasks):
        for t in tasks:
            status = t.status()
            print(f"  {t.id} - {status['state']}")
        time.sleep(30)
    print("\n All export tasks completed.")

In [None]:
# Main orchestrator
def run_pipeline(start_year=1984, end_year=2024):
    roi = get_phoenix_roi()
    tiles = 9
    tasks = []

    for year in range(start_year, end_year + 1):
        for quarter in range(1, 5):
            image = get_quarterly_image(year, quarter, roi)
            if image is None:
                continue

            scale = 10 if year >= 2017 else 30

            for idx, tile in enumerate(tiles):
                task = export_image_tile(image, year, quarter, tile, idx + 1, scale)
                if task:
                    tasks.append(task)
                time.sleep(1.5)

    wait_for_tasks(tasks)
if __name__ == "__main__":
    run_pipeline()