In [1]:
from datetime import datetime
import json
# import logging
import os
import pprint
import base64


import ee
from google.cloud import storage
from pyproj import Transformer
import pystac
from pystac_client import Client
import rasterio
import rasterio.mask
import requests
import shapely
import shapely.ops

# import IPython.display
# import matplotlib.pyplot as plt

# %matplotlib inline


endpoint = 'https://fusion-stac.hydrosat.com/'


In [2]:
ee.Initialize(project='dri-hydrosat')

In [3]:
with open('creds.json') as f:
    creds = json.loads(f.read())

userpass = f"{creds['username']}:{creds['password']}"
b64 = base64.b64encode(userpass.encode()).decode()
headers = {'Authorization':'Basic ' + b64}

cat_url = 'https://stac.hydrosat.com'
catalog = Client.open(cat_url, headers)


In [4]:
# Caballo Reservoir, Blue Mesa Reservoir, Morrow Reservoir, Crystal Reservoir, 
# Elephant Butte Reservoir, Flaming Gorge Reservoir, Navajo Reservoir, 
# Great Salt Lake, Lake Mead, Lake Powell, Lake Mohave, Lake Tahoe, Pyramid Lake, Salton Sea
lake_geometries = {
    'tahoe': shapely.box(*[-120.21, 38.87, -119.88, 39.28]),
    'mead':  shapely.union(
        shapely.box(*[-114.94, 35.98, -114.06, 36.20]), 
        shapely.box(*[-114.48, 35.98, -114.29, 36.48]), 
        grid_size=0.01),
    # 'mead': shapely.box(*[-114.94, 35.96, -113.88, 36.48]),
    'mohave': shapely.box(*[-114.76, 35.15, -114.52, 36.00]),
    'powell': shapely.union(
        shapely.box(*[-111.62, 36.84, -110.98, 37.24]),
        shapely.box(*[-111.06, 37.02, -110.36, 37.92]),
        grid_size=0.01),
    # 'powell': [-111.60, 36.84, -110.26, 37.92],
    # # 2024 
    'bluemesa': shapely.box(*[-107.64, 38.37, -107.00, 38.55]),
    'caballo':  shapely.box(*[-107.33, 32.87, -107.24, 33.14]),
    'elephant': shapely.box(*[-107.24, 33.12, -107.04, 33.58]),
    'flaming':  shapely.box(*[-109.72, 40.87, -109.36, 41.50]),
    'great':    shapely.box(*[-113.18, 40.63, -111.88, 41.80]),
    'havasu':   shapely.box(*[-114.46, 34.24, -114.05, 34.66]),
    'navajo':   shapely.box(*[-107.64, 36.78, -107.30, 37.08]),
    'pyramid':  shapely.box(*[-119.74, 39.82, -119.38, 40.23]),
    'salton':   shapely.box(*[-116.13, 33.06, -115.55, 33.56]),
}
# pprint.pprint(lake_geometries)

# lake_tiles = {
#     'tahoe': ['10SGJ'],
#     'pyramid': ['10TGK'],
#     'mead': ['11SPV', '11SQV', '11SPA', '11SQA'],
#     'mohave': ['11SQV'],
#     'havasu': ['11SQU'],
#     'powell': ['12SVF', '12SVG', '12SWG', '12SWH'],
#     'salton': ['11SNS', '11SPT', '11SPS', '11SPT'],
#     'great': ['12TUL', '12TUM', '12TVL'],
# }


In [5]:
workspace = '/Users/Charles.Morton@dri.edu/Projects/hydrosat-assets/geotiffs/'
# workspace = os.getcwd()

project_id = 'dri-hydrosat'
bucket_name = 'hydrosat'

# Output band names
lst_band_name = 'lst'
qa_band_name = 'qa'


def asset_ingest(collection, extent, start_date, end_date, overwrite_flag=False):
    print(f'Collection: {collection}')
    print(f'Extent:     {extent}')
    print(f'Start Date: {start_date}')
    print(f'End Date:   {end_date}\n')
    
    search = catalog.search(
        bbox = extent,
        #intersects = aoi,
        datetime = [start_date, end_date],
        collections = [collection],
        max_items = 3000,
    )
    #items = search.get_all_items()
    items = search.item_collection()
    itemjson = items.to_dict()
    print(f'Number of catalog items: {len(items)}\n')
    
    
    # Get the list of local file names (not paths)
    file_list = [
        item 
        for root, dirs, files in os.walk(workspace, topdown=False)
        for item in files 
        if item.endswith('.tif')
    ]

    # Connect to the bucket
    storage_client = storage.Client(project=project_id)

    
    # Check if the asset collection needs to be built
    asset_coll_id = f'projects/{project_id}/assets/{collection}'
    if not ee.data.getInfo(asset_coll_id):
        print(f'\nCollection does not exist and will be built\n  {asset_coll_id}')
        input('Press ENTER to continue')
        ee.data.createAsset({'type': 'IMAGE_COLLECTION'}, asset_coll_id)
    
    # Get the list of assets IDs instead of calling ee.data.getInfo in the loop
    asset_id_list = {
        f'{asset_coll_id}/{image_id}' 
        for image_id in ee.ImageCollection(asset_coll_id).aggregate_array('system:index').getInfo()
    }
    # pprint.pprint(asset_id_list)

    
    # Process each item
    for item in itemjson["features"]:
        # pprint.pprint(item['assets'])
        mgrs_tile = item["properties"]["mgrs_tile"]
        utm_zone = mgrs_tile[:2]
        
        lst_image_url = item["assets"]["lst"]["href"]
        qa_image_url = item["assets"]["combined_qa"]["href"]
    
        collection = lst_image_url.split('?', 1)[0].split('/')[-4]
        year = lst_image_url.split('?', 1)[0].split('/')[-3]
        #temp = lst_image_url.split('?', 1)[0].split('/')[-2]
        
        lst_file_name = lst_image_url.split('?', 1)[0].split('/')[-1]
        qa_file_name = qa_image_url.split('?', 1)[0].split('/')[-1]
        # print(f'{lst_file_name}')
        # print(f'{qa_file_name}')
    
        year_folder = os.path.join(workspace, collection, year)
        lst_local_path = os.path.join(year_folder, lst_file_name)
        qa_local_path = os.path.join(year_folder, qa_file_name)
        lst_bucket_path = f'gs://{bucket_name}/{collection}/{year}/{lst_file_name}'
        qa_bucket_path = f'gs://{bucket_name}/{collection}/{year}/{qa_file_name}'
    
        image_dt = datetime.fromisoformat(item['properties']['datetime'])
        if 'starfm' in collection:
            image_dt = image_dt.replace(hour=18, minute=0, second=1)
            item['properties']['datetime'] = image_dt.isoformat()
            # print(image_dt)
        image_id = f'{mgrs_tile}_{image_dt.strftime("%Y%m%d")}'
        asset_id = f'{asset_coll_id}/{image_id}'
        print(asset_id)

        # if ee.data.getInfo(asset_id):
        if asset_id in asset_id_list:
            if overwrite_flag:
                print(f'  Asset already exists, removing')
                try:
                    ee.data.deleteAsset(asset_id)
                except Exception as e:
                    logging.exception(f'unhandled exception: {e}')
                    continue
            else:
                print(f'  Asset already exists and overwrite is False')
                continue

        # Get the properties dictionary from the item
        # pprint.pprint(item['properties'])
        properties = item['properties']
        properties['file_name'] = lst_file_name
        properties['date_ingested'] = f'{datetime.today().strftime("%Y-%m-%d")}'
    
        # Converting properties to string
        # This is needed especially for the nested dictionary properties
        # TODO: Check if JSON dump would be better for this
        str_properties = [
            'processing:time_of_day_range', 'processing:nrt', 'processing:overwrite_outputs',
            'processing:public', 'processing:sr_only', 'processing:test_mode',
            'processing:qa_screen_opts', 'processing:starfm_opts', 
            'processing:pydms_common_opts', 'processing:pydms_dt_opts', 
            'processing:starfm_opts', 'hydrosat:fusion_inputs',
        ]
        for p in str_properties:
            if p in properties.keys():
                properties[p] = str(properties[p])
        
        # TODO: Check for a cleaner way to rename properties in a dictionary
        del_properties = [
            'processing:software', 'processing:pydms_nn_opts', 'processing:pydms_sknn_opts', 'processing:lineage', 
        ]
        new_properties = {}
        for k, v in properties.items():
            if ((":" in k) or ('-' in k)) and (k not in del_properties):
                new_properties[k.replace(':', '_').replace('-', '_')] = v
                del_properties.append(k)
        for k in del_properties:
            del properties[k] 
        properties.update(new_properties)
        #pprint.pprint(properties)
        #input('ENTER')
    
    
        # Download the image locally
        if overwrite_flag or (lst_file_name not in file_list):
            print('  Downloading LST image from API')
            if not os.path.isdir(year_folder):
                os.makedirs(year_folder)
                
            with requests.get(lst_image_url, stream=True) as result:
                result.raise_for_status()
                with open(lst_local_path, 'wb') as f:
                    for chunk in result.iter_content(chunk_size=10000000):
                        f.write(chunk)
                        
        if overwrite_flag or (qa_file_name not in file_list):
            print('  Downloading QA image from API')
            with requests.get(qa_image_url, stream=True) as result:
                result.raise_for_status()
                with open(qa_local_path, 'wb') as f:
                    for chunk in result.iter_content(chunk_size=10000000):
                        f.write(chunk)
    
        # Mask/clip images to reservoir bounding extents
        # Project all reservoir extents to the tile projection
        # TODO: Add some basic filtering to ignore geometries outside tile
        transformer = Transformer.from_crs("EPSG:4326", f"EPSG:326{utm_zone}", always_xy=True)
        projected_lake_masks = [
            shapely.ops.transform(transformer.transform, lake_geom)
            for lake_nane, lake_geom in lake_geometries.items()
        ]
        # pprint.pprint(projected_lake_masks)

        # Mask the LST and QA images
        # The changes to the metadata are only needed if crop=True
        #   but leaving for now
        with rasterio.open(lst_local_path) as src:
            out_image, out_transform = rasterio.mask.mask(src, projected_lake_masks, crop=False)
            out_meta = src.meta
            out_meta.update({
                "driver": "COG",
                # "height": out_image.shape[1],
                # "width": out_image.shape[2],
                # "transform": out_transform,
            })
        with rasterio.open(lst_local_path, "w", **out_meta) as dest:
            dest.write(out_image)

        with rasterio.open(qa_local_path) as src:
            out_image, out_transform = rasterio.mask.mask(src, projected_lake_masks, crop=False)
            out_meta = src.meta
            out_meta.update({"driver": "COG"})
        with rasterio.open(qa_local_path, "w", **out_meta) as dest:
            dest.write(out_image)
        
        
        # Upload the image to the bucket
        # print('  Uploading to bucket')
        bucket = storage_client.bucket(bucket_name)
        lst_blob = bucket.blob(lst_bucket_path.replace(f'gs://{bucket_name}/', ''))
        lst_blob.upload_from_filename(lst_local_path, timeout=120)
        qa_blob = bucket.blob(qa_bucket_path.replace(f'gs://{bucket_name}/', ''))
        qa_blob.upload_from_filename(qa_local_path, timeout=120)
    
        
        # Ingest into Earth Engine
        # print('  Ingesting into Earth Engine')
        params = {
            'name': asset_id,
            'bands': [
                {'id': lst_band_name, 'tilesetId': 'lst_image', 'tilesetBandIndex': 0},
                {'id': qa_band_name, 'tilesetId': 'qa_image', 'tilesetBandIndex': 0},
            ],
            'tilesets': [
                {'id': 'lst_image', 'sources': [{'uris': [lst_bucket_path]}]},
                {'id': 'qa_image', 'sources': [{'uris': [qa_bucket_path]}]},
            ],
            'properties': properties,
            'startTime': image_dt.isoformat(),
            # 'startTime': image_dt.isoformat() + '.000000000Z',
            # 'pyramiding_policy': 'MEAN',
            # 'missingData': {'values': [nodata_value]},
        }
        task_id = ee.data.newTaskId()[0]
        ee.data.startIngestion(task_id, params, allow_overwrite=True)

        # Cleanup
        try:
            os.remove(lst_local_path)
        except:
            pass
        try:
            os.remove(qa_local_path)
        except:
            pass

print('Done')


# asset_ingest(
#     collection="pydms_sharpened_landsat", 
#     extent=lake_geometries['powell'].bounds, 
#     start_date="2024-07-01T00:00:00Z", 
#     end_date="2024-08-01T00:00:00Z", 
#     overwrite_flag=False,
# )


Done


In [6]:
# July 2024 @ 
# Caballo Reservoir, Blue Mesa Reservoir, Morrow Reservoir, Crystal Reservoir, Elephant Butte Reservoir, Flaming Gorge Reservoir, Navajo Reservoir, Great Salt Lake, Lake Mead, Lake Powell, Lake Mohave, Lake Tahoe, Pyramid Lake, & Salton Sea
# April 2017 – April 2020 @ Lake Mead & Lake Mohave validation sites
# June 2021 – June 2024 @ Lake Powell validation sites
# June 2021 – June 2024 @ Lake Tahoe validation sites


# First process the overpass LST July 2024 images for all reservoirs
for lake_name, lake_geometry in lake_geometries.items():
    print(f'\n{lake_name}')
    if lake_name not in ['powell']:
        continue
        
    asset_ingest(
        collection="pydms_sharpened_landsat", 
        extent=lake_geometry.bounds, 
        start_date="2024-07-01T00:00:00Z", 
        end_date="2024-08-01T00:00:00Z", 
        overwrite_flag=True,
    )


# Then process the interpolated daily LST July 2024 images for all reservoirs
for lake_name, lake_geometry in lake_geometries.items():
    print(f'\n{lake_name}')
    asset_ingest(
        collection="starfm_predictions_modis_landsat", 
        extent=lake_geometry.bounds, 
        start_date="2024-07-01T00:00:00Z", 
        end_date="2024-08-01T00:00:00Z", 
        overwrite_flag=False,
    )


# Then process the overpass LST for the full timeseries for the main reservoirs
for year in [2024, 2023, 2022, 2021]:
    print('\nTahoe')
    asset_ingest(
        collection="pydms_sharpened_landsat", 
        extent=lake_geometries['tahoe'].bounds, 
        start_date=f"{year}-01-01T00:00:00Z", 
        end_date=f"{year+1}-01-01T00:00:00Z", 
        overwrite_flag=False,
    )
    print('\nPowell')
    asset_ingest(
        collection="pydms_sharpened_landsat", 
        extent=lake_geometries['powell'].bounds, 
        start_date=f"{year}-01-01T00:00:00Z", 
        end_date=f"{year+1}-01-01T00:00:00Z", 
        overwrite_flag=False,
    )
for year in [2020, 2019, 2018, 2017]:
    print('\nMead & Mohave')
    asset_ingest(
        collection="pydms_sharpened_landsat", 
        extent=lake_geometries['mead'].bounds, 
        start_date=f"{year}-01-01T00:00:00Z", 
        end_date=f"{year+1}-01-01T00:00:00Z", 
        overwrite_flag=False,
    )


# Then process the interpolated daily LST for the full timeseries for the main reservoirs
for year in [2024, 2023, 2022, 2021]:
    print('\nTahoe')
    asset_ingest(
        collection="starfm_predictions_modis_landsat", 
        extent=lake_geometries['tahoe'].bounds, 
        start_date=f"{year}-01-01T00:00:00Z", 
        end_date=f"{year+1}-01-01T00:00:00Z", 
        overwrite_flag=False,
    )
    print('\nPowell')
    asset_ingest(
        collection="starfm_predictions_modis_landsat", 
        extent=lake_geometries['powell'].bounds, 
        start_date=f"{year}-01-01T00:00:00Z", 
        end_date=f"{year+1}-01-01T00:00:00Z", 
        overwrite_flag=False,
    )
for year in [2020, 2019, 2018, 2017]:
    print('\nMead & Mohave')
    asset_ingest(
        collection="starfm_predictions_modis_landsat", 
        extent=lake_geometries['mead'].bounds, 
        start_date=f"{year}-01-01T00:00:00Z", 
        end_date=f"{year+1}-01-01T00:00:00Z", 
        overwrite_flag=False,
    )

print('\nDone')


Tahoe
Collection: starfm_predictions_modis_landsat
Extent:     (-120.21, 38.87, -119.88, 39.28)
Start Date: 2024-01-01T00:00:00Z
End Date:   2025-01-01T00:00:00Z

Number of catalog items: 174

projects/dri-hydrosat/assets/starfm_predictions_modis_landsat/10SGJ_20240731
  Asset already exists and overwrite is False
projects/dri-hydrosat/assets/starfm_predictions_modis_landsat/10SGJ_20240730
  Asset already exists and overwrite is False
projects/dri-hydrosat/assets/starfm_predictions_modis_landsat/10SGJ_20240729
  Asset already exists and overwrite is False
projects/dri-hydrosat/assets/starfm_predictions_modis_landsat/10SGJ_20240727
  Asset already exists and overwrite is False
projects/dri-hydrosat/assets/starfm_predictions_modis_landsat/10SGJ_20240726
  Asset already exists and overwrite is False
projects/dri-hydrosat/assets/starfm_predictions_modis_landsat/10SGJ_20240725
  Asset already exists and overwrite is False
projects/dri-hydrosat/assets/starfm_predictions_modis_landsat/10SGJ_