In [None]:
import ee
import geetools

In [None]:
ee.Authenticate.geetools.list_user()

In [None]:
ee.Initialize.geetools.from_user(name="pierrick", project="ee-geetools")

In [None]:
ee.Initialize.geetools.project_id()

In [None]:
ee.Number(1).getInfo()

In [None]:
# initialize the Google cloud storage client to save the generateed tiff files
from google.cloud.storage.client import Client
from google.cloud.storage.bucket import Bucket

client = Client(
    project=ee.Initialize.geetools.project_id(),
    credentials=ee.Initialize.geetools.get_credentials()
)
list(client.list_buckets())


In [None]:
# NOAA CPC Historical Weather Data for GEE

import re
import tempfile
from datetime import datetime as dtm
from datetime import timedelta
from pathlib import Path
from functools import partial
import argparse

import requests
import rioxarray as rioxarray
import xarray as xr
from tqdm import tqdm
from xarray.coding.times import CFDatetimeCoder


# Start the process timer for performance monitoring
now = dtm.now()

# Configuration constants
DEFAULT_ASSET_PATH = "~/assets/cpc_noaa/cpc_daily_raw"

# Base URLs for NOAA CPC data servers - each contains the prefix for that data type
CPC_URLS = {
    "precip": "https://downloads.psl.noaa.gov/Datasets/cpc_global_precip/precip.",
    "tmin": "https://downloads.psl.noaa.gov/Datasets/cpc_global_temp/tmin.",
    "tmax": "https://downloads.psl.noaa.gov/Datasets/cpc_global_temp/tmax.",
}

print("=== Starting NOAA CPC Data Processing ===")

# Create the Earth Engine collection asset if it doesn't exist
collection = ee.Asset(DEFAULT_ASSET_PATH).expanduser()
collection.mkdir(parents=True, exist_ok=True, image_collection=True)

# Update the collection description with README content every time to remain up to date
readme = (Path(__file__).parent / "README.md").read_text()
collection.setProperties(**{"description": "# The CPC NOAA Daily Weather Data"})

# Discover available years from NOAA CPC servers
# We check each data type (precip, tmin, tmax) and find years that exist in ALL datasets
for data_type, base_url in CPC_URLS.items():
    # Remove the data type prefix to get the directory listing URL
    # e.g., "precip." becomes "" to get the directory listing
    response = requests.get(base_url.replace(data_type + ".", ""), timeout=30)
    response.raise_for_status()

    # Extract year from filenames like "precip.1979.nc", "tmin.2023.nc"
    # Using regex to find 4-digit years in NetCDF filenames
    pattern = rf"{data_type}\.(\d{{4}})\.nc"
    matches = re.findall(pattern, response.text)
    years_for_datatype = {int(year) for year in matches}

    # Keep only years that exist in ALL datasets (intersection)
    # First iteration: set the initial years
    # Subsequent iterations: intersect with previous results
    available_years = years_for_datatype if available_years is None else available_years
    available_years &= years_for_datatype

available_years = sorted([year for year in available_years if year < dtm.now().year])
print(f"Processing noa_cpc data for years: {available_years}")

# Download and process data year by year to manage memory efficiently
# Process each available year sequentially
task_list, failed_tasks = [], []
for year in available_years:
    with tempfile.TemporaryDirectory() as temp_dir:
        # Download all three data files (precip, tmin, tmax) for this year
        for var in ["precip", "tmin", "tmax"]:
            local_path = Path(temp_dir) / f"{var}.{year}.nc"
            url = f"{CPC_URLS[var]}{year}.nc"

            # Only download if file doesn't already exist locally
            if not local_path.exists():
                response = requests.get(url, timeout=300)
                response.raise_for_status()
                local_path.write_bytes(response.content)

        # Open the three NetCDF datasets we just downloaded
        time_coder = CFDatetimeCoder(use_cftime=True)  # Handle time encoding properly
        precip_file = Path(temp_dir) / f"precip.{year}.nc"
        tmin_file = Path(temp_dir) / f"tmin.{year}.nc"
        tmax_file = Path(temp_dir) / f"tmax.{year}.nc"

        # Use chunking to process data efficiently - one day at a time
        # The Lazy Load avoids allocating the entire year into memory at once
        # Note: CPC files use "lat"/"lon" coordinates, not "latitude"/"longitude"
        chunks = {"time": 1, "lat": -1, "lon": -1}
        open = partial(xr.open_dataset, decode_times=time_coder, chunks=chunks, engine="netcdf4")
        ds_precip, ds_tmin, ds_tmax = open(precip_file), open(tmin_file), open(tmax_file)

        # Find the actual variable names in each dataset
        # NOAA CPC uses different variable names across datasets
        precip_var = next(var for var in ds_precip.data_vars if var.lower().startswith("precip"))
        tmin_var = next(var for var in ds_tmin.data_vars if var.lower().startswith("tmin"))
        tmax_var = next(var for var in ds_tmax.data_vars if var.lower().startswith("tmax"))

        # Extract and rename variables to standard names for Earth Engine
        ds_precip = ds_precip[precip_var].rename("tp")  # Total precipitation
        ds_tmin = ds_tmin[tmin_var].rename("tmin")  # Minimum temperature
        ds_tmax = ds_tmax[tmax_var].rename("tmax")  # Maximum temperature

        # Combine all three variables into a single dataset
        # This creates a dataset with 3 data variables: tp, tmin, tmax
        ds_list = [ds_precip, ds_tmin, ds_tmax]
        ds_merged = xr.merge(ds_list)

        # Close individual datasets since we won't use them anymore
        # This releases file handles and saves memory
        [ds.close() for ds in ds_list]

        # Standardize coordinate names to longitude/latitude
        # Some datasets use "lon/lat", others use "longitude/latitude"
        std_keys= {"lon": "longitude", "lat": "latitude"}
        std_keys= {k: v for k, v in std_keys.items() if k in ds_merged.coords}
        ds_merged = ds_merged.rename(std_keys)

        # Convert longitude from [0, 360] to [-180, 180] format
        # This is the standard format expected by Earth Engine
        adjusted_lon = (ds_merged["longitude"].values - 180) % 360 - 180
        ds_merged = ds_merged.assign_coords(longitude=adjusted_lon).sortby("longitude")

        # Get total number of days in this year's dataset
        total_time_steps = len(ds_merged.time)

        # Process each day individually to minimize memory usage
        for day_index in tqdm(range(total_time_steps), desc=f"Processing {year} days", unit="day"):
            # Extract the date for this day (cftime is a troublsome format it cannot be safely
            # extracted even with pd.to_datetime)
            cftime_date = ds_merged.time.values[day_index]
            start_time = dtm(cftime_date.year, cftime_date.month, cftime_date.day)

            # Create a unique asset ID for this day's data
            filename = f"cpc_daily_root_{start_time:%Y%m%d}"
            asset_id = ee.Asset(DEFAULT_ASSET_PATH) / filename
            if asset_id.exists():
                continue

            # Extract data for just this one day (memory efficient)
            day_slice = ds_merged.isel(time=day_index)

            # Convert to a DataArray with bands in the correct order
            # Earth Engine expects data as a multi-band image
            desired_order = ["tp", "tmin", "tmax"]
            ds_merged_gee = day_slice[desired_order].to_array().rio.write_crs("epsg:4326")

            # Export this day's data to Google Earth Engine
            # Create a temporary GeoTIFF file in the same temp directory (elegant solution)
            tiff_file = Path(temp_dir) / f"cpc_daily_{start_time:%Y%m%d}.tiff"

            # Write the data to the temporary GeoTIFF file
            ds_merged_gee.rio.to_raster(tiff_file, driver="GTiff", dtype="float32", compress="lzw")

            # Prepare metadata properties for the Earth Engine asset
            properties = {"upload_time": int(dtm.now().timestamp() * 1000)}

            # send the file to GCP


#             # Submit the upload task to Earth Engine
#             try:
#                 task = ee.batch.Export.ldc.geotiff.toAsset(
#                     filename=str(tiff_file),
#                     assetId=asset_id,
#                     start_time=start_time,
#                     end_time=start_time + timedelta(days=1),
#                     properties=properties,
#                     band_names=desired_order
#                 )
#                 task.start()
#                 task_list.append(task)
#             except Exception as e:
#                 logger.error(f"Error occurred while submitting task for {start_time}: {e}")
#                 failed_tasks.append(asset_id)
#                 continue # Exit the loop on error to avoid further issues

#             # Clean up the temporary DataArray
#             ds_merged_gee.close()

#         # Close the merged dataset after processing all days for this year
#         # This is crucial for Windows file handle management
#         ds_merged.close()
#         logger.info(f"Completed generating tasks for year {year}, all datasets closed")

# # Monitor Tasks
# logger.warning("Do not shut down the process")
# logger.info(f"Waiting for {len(task_list)} tasks to complete...")
# start_wait = dtm.now()
# TM = LDCGEETools.TaskMonitor(task_list)
# TM.run()
# elapsed_wait_time = dtm.now() - start_wait

# # update the collection start and end time
# ic = ee.ImageCollection(collection.as_posix())
# collection.setProperties(**{
#     "system:time_start": ic.aggregate_min("system:time_start").getInfo(),
#     "system:time_end": ic.aggregate_max("system:time_start").getInfo(),
# })

# # log the computation total time including the time spent waiting for Google
# logger.info("Processing completed. All tasks have been submitted for export to GEE.")
# elapsed_time = dtm.now() - now
# logger.info(
#     f"Total processing time: {elapsed_time}"
#     f" including {elapsed_wait_time} waiting for GEE tasks to complete."
# )

# # log is some task could not be send to GEE
# if len(failed_tasks) > 0:
#     logger.error("Some tasks failed to submit to GEE:")
#     log = "Task ({}) failed with error: {}"
#     [logger.error(log.format(task, "Failed to submit to GEE"))  for task in failed_tasks]

# # log if the data are all there
# not_completed = [t for t in task_list if t.state() not in  ["COMPLETED", "SUCCEEDED"]]
# if len(not_completed) > 0:
#     logger.error("Some tasks did not complete successfully:")
#     log = "Task ({}) failed with status: {}"
#     [logger.error(log.format(task.task_id, task.state()))  for task in not_completed]

# if len(not_completed) > 0 or len(failed_tasks) > 0:
#     exit(1) # return falsy exit


