# Export the standardized anomalies and hotter drought fingerprint data to local, as well as normal PDSI data & forest mask

Run 02_hotter_drought.ipynb first to generate the hotter drought data and export to GEE asset

Note that you will need to put an activated Google Service account JSON file in the 'config/secrets/' directory.



In [2]:
import os
import shutil
import tempfile
import time
import random
import contextlib
from concurrent.futures import ThreadPoolExecutor, as_completed

import ee
import rioxarray as rxr
from rioxarray.merge import merge_arrays
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload

from find_set_root import find_set_project_root
PROJECT_ROOT = find_set_project_root()
print(f"Project root found at: {PROJECT_ROOT}")
import utils.general_functions as ugf


DIR_RAW = os.path.join(PROJECT_ROOT, 'data', 'raw')
DIR_DERIVED = os.path.join(PROJECT_ROOT, 'data', 'derived')
DIR_SECRETS = os.path.join(PROJECT_ROOT, 'config', 'secrets')
ugf.dir_ensure([DIR_RAW, DIR_DERIVED, DIR_SECRETS])

#Prepare to use Earth Engine
ee.Authenticate()
ee.Initialize(project = 'ee-tymc5571-multi-disturbance')



############################
# USER-SET SERVICE FILE, YEARS, and AOI
############################
SERVICE_FILE = os.path.join(DIR_SECRETS, 'ee-tymc5571-goodfire-72076a6632b5.json')

FIRST_YEAR = 2000
LAST_YEAR = 2020

states = ee.FeatureCollection('TIGER/2018/States')
western_states_names = [
    'Washington', 'Oregon', 'California', 'Idaho', 'Nevada', 'Montana',
    'Wyoming', 'Utah', 'Colorado', 'Arizona', 'New Mexico'
]
aoi = states.filter(ee.Filter.inList('NAME', western_states_names))

Project root found at: C:\Users\tymc5571\dev\forest-disturbance-stack-v3
✅ Directory already exists: C:\Users\tymc5571\dev\forest-disturbance-stack-v3\data\raw
✅ Directory already exists: C:\Users\tymc5571\dev\forest-disturbance-stack-v3\data\derived
✅ Directory already exists: C:\Users\tymc5571\dev\forest-disturbance-stack-v3\config\secrets


In [3]:
# FUNCTIONS

def remove_to_bands_append(image):
    """
    Renames bands in the input Earth Engine Image by removing the first part
    (before the first underscore) from each band name.

    Args:
        image (ee.Image): The input image.

    Returns:
        ee.Image: The image with renamed bands.
    """
    old_names = image.bandNames()
    new_names = old_names.map(lambda name: ee.String(name).split('_').slice(1).join('_'))
    return image.rename(new_names)

def toBands_with_projection(collection):
    collection = ee.ImageCollection(collection)
    image = collection.toBands()
    reference_img = ee.Image(collection.first())
    return image.setDefaultProjection(reference_img.projection())

# Define the functions to compute annual and summer terraclimate means
def annual_terraclimate_image(index, years):

    def annual_terraclimate_images(year):
        year = ee.Number(year)
        mean_image = terraclimate \
            .filter(ee.Filter.calendarRange(year, year, 'year')) \
            .select(index) \
            .mean() \
            .rename(ee.String(index).cat('_annual_').cat(year.format('%d')))
        return mean_image

    annual_images = years.map(annual_terraclimate_images)
    annual_collection = ee.ImageCollection(annual_images)
    #annual_image = annual_collection.toBands()
    annual_image = toBands_with_projection(annual_collection)
    annual_image = remove_to_bands_append(annual_image)
    return(annual_image)



def _download_file_from_drive(file_id, file_name, temp_dir, service_account_file):
    try:
        # Authenticate service account
        SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
        credentials = service_account.Credentials.from_service_account_file(
            os.path.abspath(service_account_file), scopes=SCOPES
        )
        service = build('drive', 'v3', credentials=credentials)

        request = service.files().get_media(fileId=file_id)
        local_path = os.path.join(temp_dir, file_name)
        with open(local_path, 'wb') as f:
            downloader = MediaIoBaseDownload(f, request)
            done = False
            while not done:
                status, done = downloader.next_chunk()
        time.sleep(random.uniform(0.5, 1.5))  # Random delay
        print(f"⬇️ Downloaded {file_name}")
        return local_path
    except Exception as e:
        print(f"⚠️ Error downloading {file_name}: {e}")
        return None


def download_merge_from_drive(
    description: str,
    local_filename: str,
    drive_folder: str,
    service_account_file: str,
    compress: str = "deflate",
    check_existing: bool = True,
    n_workers: int = 1
) -> str:
    local_filename = os.path.abspath(local_filename)
    if check_existing and os.path.exists(local_filename):
        print(f"✅ File already exists: {local_filename}")
        return local_filename

    temp_dir = tempfile.mkdtemp()
    datasets = []

    try:
        SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
        credentials = service_account.Credentials.from_service_account_file(
            os.path.abspath(service_account_file), scopes=SCOPES
        )
        service = build('drive', 'v3', credentials=credentials)

        folder_results = service.files().list(
            q=f"name='{drive_folder}' and mimeType='application/vnd.google-apps.folder' and trashed=false",
            fields="files(id, name)"
        ).execute()
        folders = folder_results.get('files', [])
        if not folders:
            raise FileNotFoundError(f"❌ Folder '{drive_folder}' not found or not shared with service account.")
        folder_id = folders[0]['id']

        file_results = service.files().list(
            q=f"'{folder_id}' in parents and trashed=false and name contains '{description}' and name contains '.tif'",
            fields="files(id, name)"
        ).execute()
        files = file_results.get('files', [])
        if not files:
            raise FileNotFoundError(f"❌ No matching .tif files found for '{description}' in '{drive_folder}'.")

        print(f"📁 Found {len(files)} files. Starting download...")

        downloaded_paths = []
        if n_workers == 1:
            for file in files:
                path = _download_file_from_drive(file['id'], file['name'], temp_dir, service_account_file)
                if path:
                    downloaded_paths.append(path)
        else:
            with ThreadPoolExecutor(max_workers=n_workers) as executor:
                futures = [
                    executor.submit(_download_file_from_drive, file['id'], file['name'], temp_dir, service_account_file)
                    for file in files
                ]
                for future in as_completed(futures):
                    result = future.result()
                    if result:
                        downloaded_paths.append(result)

        for f in downloaded_paths:
            ds = rxr.open_rasterio(f, masked=True, chunks=True)
            datasets.append(ds)

        mosaic = merge_arrays(datasets)
        mosaic.rio.to_raster(local_filename, compress=compress)
        print(f"✅ Final merged GeoTIFF saved to: {local_filename}")
        return local_filename

    except KeyboardInterrupt:
        print("⚠️ Interrupted by user. Cleaning up and exiting.")
        raise

    finally:
        for ds in datasets:
            with contextlib.suppress(Exception):
                ds.close()
        try:
            shutil.rmtree(temp_dir)
            print(f"🧹 Cleaned up temporary directory: {temp_dir}")
        except Exception as cleanup_err:
            print(f"⚠️ Error during cleanup: {cleanup_err}")

def export_image_to_drive_and_download(
    image: ee.Image,
    region: ee.Geometry,
    description: str,
    local_filename: str,
    drive_folder: str = "EarthEngineExports",
    service_account_file: str = "your-service-account.json",
    scale: int | float = 30,
    wait_interval: int = 30,
    compress: str = "deflate",
    check_existing: bool = True,
    n_workers: int = 1
) -> str:
    local_filename = os.path.abspath(local_filename)
    if check_existing and os.path.exists(local_filename):
        print(f"✅ File already exists: {local_filename}")
        return local_filename

    task = ee.batch.Export.image.toDrive(
        image=image.clip(region),
        description=description,
        folder=drive_folder,
        fileNamePrefix=description,
        region=region.bounds().getInfo()["coordinates"],
        scale=scale,
        maxPixels=1e13
    )
    task.start()
    print(f"🚀 Started Earth Engine export: {description}")

    while task.active():
        print("⏳ Waiting for Earth Engine export to finish...")
        time.sleep(wait_interval)

    status = task.status()
    if status["state"] != "COMPLETED":
        raise RuntimeError(f"❌ Export failed: {status}")

    print("✅ Earth Engine export complete. Downloading from Drive...")

    return download_merge_from_drive(
        description=description,
        local_filename=local_filename,
        drive_folder=drive_folder,
        service_account_file=service_account_file,
        compress=compress,
        check_existing=check_existing,
        n_workers=n_workers
    )

def filter_bands_by_year(image: ee.Image, first_year: int, last_year: int) -> ee.Image:
    band_names = image.bandNames()

    # Create list of allowed years as strings
    allowed_years = ee.List.sequence(first_year, last_year).map(
        lambda y: ee.Number(y).format('%04d')
    )

    # Map over band names and keep only those that match an allowed year suffix
    def keep_if_valid(band):
        band_str = ee.String(band)
        year_suffix = band_str.slice(-4)
        return ee.Algorithms.If(
            allowed_years.contains(year_suffix),
            band_str,
            None
        )

    # Map and filter out None
    valid_bands = band_names.map(keep_if_valid).removeAll([None])

    return image.select(valid_bands)


def reclassify_image_binary(value):
    def _reclassify(img):
        binary_img = img.eq(value).selfMask()
        return binary_img.copyProperties(img, ['system:time_start'])
    return _reclassify


In [5]:
lcms = ee.ImageCollection("USFS/GTAC/LCMS/v2024-10")
lcmap = ee.ImageCollection("projects/sat-io/open-datasets/LCMAP/LCPRI")

# Inclusive forest mask
lcmap_cover = lcmap.filterDate(str(FIRST_YEAR - 1), str(LAST_YEAR))
lcms_cover = lcms.filter(ee.Filter.eq('study_area', 'CONUS')).filterDate(str(FIRST_YEAR - 1), str(LAST_YEAR)).select('Land_Cover')

lcmap_for = lcmap_cover.map(reclassify_image_binary(4))
lcms_for = lcms_cover.map(reclassify_image_binary(1))

combined_forest_mask = lcmap_for.map(lambda img: img.reduce(ee.Reducer.max()).gt(0)).max().Or(
    lcms_for.map(lambda img: img.reduce(ee.Reducer.max()).gt(0)).max()
)
combined_forest_mask = combined_forest_mask.setDefaultProjection(lcmap_cover.first().projection())


In [None]:
#Prep drought

hd_fingerprint = ee.Image("projects/ee-tymc5571-multi-disturbance/assets/hd_warm_fingerprint")
terraclimate = ee.ImageCollection("IDAHO_EPSCOR/TERRACLIMATE")


pdsi_annual = annual_terraclimate_image('pdsi', ee.List.sequence(FIRST_YEAR, LAST_YEAR))

hd_fingerprint = filter_bands_by_year(hd_fingerprint, FIRST_YEAR, LAST_YEAR)

print(hd_fingerprint.bandNames().getInfo())
print(pdsi_annual.bandNames().getInfo())


['hd_fingerprint_yr_2000', 'hd_fingerprint_yr_2001', 'hd_fingerprint_yr_2002', 'hd_fingerprint_yr_2003', 'hd_fingerprint_yr_2004', 'hd_fingerprint_yr_2005', 'hd_fingerprint_yr_2006', 'hd_fingerprint_yr_2007', 'hd_fingerprint_yr_2008', 'hd_fingerprint_yr_2009', 'hd_fingerprint_yr_2010', 'hd_fingerprint_yr_2011', 'hd_fingerprint_yr_2012', 'hd_fingerprint_yr_2013', 'hd_fingerprint_yr_2014', 'hd_fingerprint_yr_2015', 'hd_fingerprint_yr_2016', 'hd_fingerprint_yr_2017', 'hd_fingerprint_yr_2018', 'hd_fingerprint_yr_2019', 'hd_fingerprint_yr_2020']
['pdsi_annual_2000', 'pdsi_annual_2001', 'pdsi_annual_2002', 'pdsi_annual_2003', 'pdsi_annual_2004', 'pdsi_annual_2005', 'pdsi_annual_2006', 'pdsi_annual_2007', 'pdsi_annual_2008', 'pdsi_annual_2009', 'pdsi_annual_2010', 'pdsi_annual_2011', 'pdsi_annual_2012', 'pdsi_annual_2013', 'pdsi_annual_2014', 'pdsi_annual_2015', 'pdsi_annual_2016', 'pdsi_annual_2017', 'pdsi_annual_2018', 'pdsi_annual_2019', 'pdsi_annual_2020']


In [None]:
export_image_to_drive_and_download(
    image=pdsi_annual,
    region=aoi,
    description='pdsi_annual_test',
    local_filename=os.path.join(DIR_DERIVED, "pdsi_annual.tif"),
    drive_folder="EarthEngineExports",
    service_account_file=SERVICE_FILE,
    scale=4638.3,
    wait_interval=30,
    compress="deflate",
    check_existing= True,
    n_workers=4
)

export_image_to_drive_and_download(
    image=hd_fingerprint,
    region=aoi,
    description='hd_fingerprint',
    local_filename=os.path.join(DIR_DERIVED, "hd_fingerprint.tif"),
    drive_folder="EarthEngineExports",
    service_account_file=SERVICE_FILE,
    scale=4638.3,
    wait_interval=30,
    compress="deflate",
    check_existing= True,
    n_workers=4
)

export_image_to_drive_and_download(
    image=combined_forest_mask,
    region=aoi,
    description='relaxed_forest_mask',
    local_filename=os.path.join(DIR_DERIVED, "relaxed_forest_mask.tif"),
    drive_folder="EarthEngineExports",
    service_account_file=SERVICE_FILE,
    scale=30,
    wait_interval=30,
    compress="deflate",
    check_existing= True,
    n_workers=4
)

🚀 Started Earth Engine export: pdsi_annual_test
⏳ Waiting for Earth Engine export to finish...
⏳ Waiting for Earth Engine export to finish...
⏳ Waiting for Earth Engine export to finish...
⏳ Waiting for Earth Engine export to finish...
✅ Earth Engine export complete. Downloading from Drive...
📁 Found 2 files. Starting download...
⬇️ Downloaded pdsi_annual_test.tif
⬇️ Downloaded pdsi_annual_test.tif
✅ Final merged GeoTIFF saved to: C:\Users\tymc5571\dev\forest-disturbance-stack-v3\data\derived\pdsi_annual.tif
🧹 Cleaned up temporary directory: C:\Users\tymc5571\AppData\Local\Temp\tmp93f3p6jo
🚀 Started Earth Engine export: hd_fingerprint
⏳ Waiting for Earth Engine export to finish...
⏳ Waiting for Earth Engine export to finish...
⏳ Waiting for Earth Engine export to finish...
✅ Earth Engine export complete. Downloading from Drive...
📁 Found 2 files. Starting download...
⬇️ Downloaded hd_fingerprint.tif
⬇️ Downloaded hd_fingerprint.tif
✅ Final merged GeoTIFF saved to: C:\Users\tymc5571\dev

  has_raised = await self.run_ast_nodes(code_ast.body, cell_name,


'C:\\Users\\tymc5571\\dev\\forest-disturbance-stack-v3\\data\\derived\\hd_fingerprint.tif'