In [18]:
import ee
print(ee.__version__)
import geemap
from pprint import pprint 
from google.cloud.bigquery import Client

project = "g4g-eaas"
# Set the credentials and project
ee.Initialize(project=project,
                  opt_url="https://earthengine-highvolume.googleapis.com"
              )


1.5.14


In [19]:
# load Google EFM embedding image over EG data coverage
efm = ee.ImageCollection("GOOGLE/SATELLITE_EMBEDDING/V1/ANNUAL")
efm = efm.filterDate('2024-01-01', '2024-12-31') # EG embeddings are from 2024 so we match with EFM

load in the intermediate tile-wise EG point-to-patch FeatureCollections, we'll process EFM to patches one tile at a time.

In [20]:
import subprocess
asset_root = "projects/g4g-eaas/assets/intmd/"
tile_fcs = subprocess.run(f"earthengine ls {asset_root}", shell=True, capture_output=True, text=True).stdout.split("\n")
tile_fcs = [x for x in tile_fcs if x.startswith(asset_root)] # remove empty strings from the list

In [21]:
# lets look at one of the tile feature collections
tile_fc = ee.FeatureCollection("projects/g4g-eaas/assets/intmd/eg_pt_to_patch_fc_tile_48PVS")
m = geemap.Map(basemap='HYBRID', 
              center=[12.5, 102.1], 
              zoom=6)
m.addLayer(tile_fc,{}, tile_fcs[0].split("/")[-1],True,0.5)
m.addLayer(efm, {}, 'EFM Embeddings', True, 0.5)
m

Map(center=[12.5, 102.1], controls=(WidgetControl(options=['position', 'transparent_bg'], widget=SearchDataGUIâ€¦

Set up some automation

In [22]:
# want precision with chunking my exports
# so instead of doing fc.size().getInfo() for every intermeiate FC, 
# i'm borrwing from step1 NB to reference original EG embeddings BQ table for tile-wise row counts
PROJECT_ID = "g4g-eaas"
DATASET_ID = "embeddings_sea"
TABLE_ID = "earthgenome_cambodia_v1"
query = f"""
SELECT 
    tile, 
    COUNT(*) AS row_count
FROM 
    `{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}`
GROUP BY 
    tile
ORDER BY 
    row_count DESC
"""
client = Client(project=PROJECT_ID)
query_job = client.query(query)
records_tile = {}
for row in query_job:
    # print(row)
    records_tile[row['tile']] = row['row_count']
print(f"Total tiles in BQ table: {len(records_tile)}")
print("record count by tile:")
pprint(records_tile)

Total tiles in BQ table: 36
record count by tile:
{'47PRP': 234973,
 '47PRQ': 234942,
 '47PRR': 234927,
 '48PTA': 235319,
 '48PTR': 235437,
 '48PTS': 235396,
 '48PTT': 235393,
 '48PTU': 235383,
 '48PTV': 235335,
 '48PUA': 235520,
 '48PUB': 235520,
 '48PUS': 235644,
 '48PUT': 235638,
 '48PUU': 235624,
 '48PUV': 235520,
 '48PVA': 235698,
 '48PVB': 235674,
 '48PVS': 235758,
 '48PVT': 235748,
 '48PVU': 235720,
 '48PVV': 235709,
 '48PWA': 235708,
 '48PWS': 235737,
 '48PWT': 235712,
 '48PWU': 235727,
 '48PWV': 235699,
 '48PXA': 235520,
 '48PXB': 235516,
 '48PXS': 235606,
 '48PXT': 235520,
 '48PXU': 235520,
 '48PXV': 235520,
 '48PYA': 235296,
 '48PYB': 235267,
 '48PYU': 235331,
 '48PYV': 235318}


A few processing functions defined up front

In [23]:
# Aggregate EFM embedding imagery to each EarthGenome embedding patch location)
def reduce_nested(img, 
                   fc, 
                   reducer:ee.Reducer,
                   scale:int, 
                   crs:str, 
                   crs_transform:ee.List,
                   best_effort:bool, 
                   maxPixels:int, 
                   tileScale:int
                   ):
    def reduce(f):
        reduced = img.reduceRegion(reducer, 
                               f.geometry(), 
                               scale, 
                               crs, 
                               crs_transform, 
                               best_effort, 
                               maxPixels, 
                               tileScale
                               )	
        return f.set(reduced)
    all_reduced = fc.map(reduce)
    return all_reduced


In [24]:
# convert band-wise embedding properties returned from reduceRegion() to one embedding array property of type list[float]
def embedding_array_prop(feature):
    # Get all property names
    property_names = feature.propertyNames()
    
    # Filter property names starting with "A"
    matching_properties = property_names.filter(ee.Filter.stringStartsWith('item', 'A'))
    
    # Get the values of the matching properties
    matching_values = matching_properties.map(lambda prop: feature.get(prop))
    
    # Add a new property with the list of matching values
    return feature.set('embedding', matching_values)

In [25]:
# first gotcha: can't export 'embedding' property as list[float]... 
# so took the suggestion in the export error to encode it to a JSON string.. we'll convert it back in BQ
def convert_embedding_to_string(feature):
    embedding = feature.get('embedding')
    # Convert the list to a string
    embedding_str = ee.String.encodeJSON(embedding)
    return feature.set('embedding', embedding_str)

#### for each EG tile (intermediate feature collection where we've converted Point geom to 320x320m Polygon geom):
* do some processing to return EFM average embedding for each patch
* export resulting FC to BQ in chunks

In [38]:
import math
chunk_size = 5000 # approximate record chunk per BQ export

asset_pattern = "projects/g4g-eaas/assets/intmd/eg_pt_to_patch_fc_tile_"

for i,tile in enumerate(records_tile.keys()):
    if tile in ["48PWS", "48PVB", "48PXS", "48PUB", "48PTR", "48PTS", "48PYB", "47PRP"]:
        continue
    print(i)
    print(f"Processing tile: {tile}")
    record_count = records_tile[tile]
    chunks = math.ceil(record_count / chunk_size)
    step = math.ceil(records_tile[tile] / chunks)
    print(f"record count: {record_count}, chunks: {chunks}, step:{step}")
    tile_fc_path = f"{asset_pattern}{tile}"
    print(tile_fc_path)

    tile_fc = ee.FeatureCollection(tile_fc_path)
    
    # average EFM to eg patches
    efm_patch_embed = reduce_nested(efm.mosaic(),
                           tile_fc,
                           reducer=ee.Reducer.mean(),
                           scale=10,
                           crs='EPSG:4326',
                           crs_transform=None,
                           best_effort=True,
                           maxPixels=1e13,
                           tileScale=16
                          )
    # print("\nresult after reduceRegion:")
    # pprint(efm_patch_embed.first().getInfo()['properties'])

    # convert the band-wise embedding properties to a single embedding array property
    updated_fc = efm_patch_embed.map(embedding_array_prop)
    # print("\nresult after converting band-wise properties to one embedding property:")
    # pprint(updated_fc.first().getInfo())

    # convert embedding property to JSON string for export
    updated_fc = updated_fc.map(convert_embedding_to_string)
    select_fc = updated_fc.select(['id','tile', 'embedding'])
    # print("\nresult after converting embedding to string:")
    # pprint(updated_fc.first().getInfo())
    
    # begin chunking out records for BQ export
    tile_list_allSteps = select_fc.toList(record_count)
    tile_list_noSelect = updated_fc.toList(record_count)
    tile_list_noPP = efm_patch_embed.toList(record_count)


    for i in range(chunks):
        if i < 5:
            continue
        start = i * step
        end = (i + 1) * step
        
        print(f"Exporting records {start} to {end}")
        
        record_chunk_allSteps = ee.FeatureCollection(ee.List(tile_list_allSteps).slice(i*step, (i+1)*step))
        record_chunk_noSelect = ee.FeatureCollection(ee.List(tile_list_noSelect).slice(i*step, (i+1)*step))
        record_chunk_noPP = ee.FeatureCollection(ee.List(tile_list_noPP).slice(i*step, (i+1)*step))

        # taskBQ = ee.batch.Export.table.toBigQuery(
        #     collection=record_chunk_allSteps,
        #     description=f"earthgenome_efm_patch_{tile}_chunked_{start}_{end}_BQ_testAllSteps",
        #     table='g4g-eaas.embeddings_sea.google_efm_cambodia_testAllSteps',
        #     append=True,
        # )
        # taskBQ.start()
        
        # taskBQ2 = ee.batch.Export.table.toBigQuery(
        #     collection=record_chunk_noSelect,
        #     description=f"earthgenome_efm_patch_{tile}_chunked_{start}_{end}_BQ_testNoSelect",
        #     table='g4g-eaas.embeddings_sea.google_efm_cambodia_testNoSelect',
        #     append=True,
        # )
        # taskBQ2.start()
        
        taskBQ3 = ee.batch.Export.table.toBigQuery(
            collection=record_chunk_noPP,
            description=f"earthgenome_efm_patch_{tile}_chunked_{start}_{end}_BQ_testNoPostProcess",
            table='g4g-eaas.embeddings_sea.google_efm_cambodia_testNoPostProcess',
            append=True,
        )
        taskBQ3.start()

        
    
    # break

0
Processing tile: 48PVS
record count: 235758, chunks: 48, step:4912
projects/g4g-eaas/assets/intmd/eg_pt_to_patch_fc_tile_48PVS
Exporting records 24560 to 29472
Exporting records 29472 to 34384
Exporting records 34384 to 39296
Exporting records 39296 to 44208
Exporting records 44208 to 49120
Exporting records 49120 to 54032
Exporting records 54032 to 58944
Exporting records 58944 to 63856
Exporting records 63856 to 68768
Exporting records 68768 to 73680
Exporting records 73680 to 78592
Exporting records 78592 to 83504
Exporting records 83504 to 88416
Exporting records 88416 to 93328
Exporting records 93328 to 98240
Exporting records 98240 to 103152
Exporting records 103152 to 108064
Exporting records 108064 to 112976
Exporting records 112976 to 117888
Exporting records 117888 to 122800
Exporting records 122800 to 127712
Exporting records 127712 to 132624
Exporting records 132624 to 137536
Exporting records 137536 to 142448
Exporting records 142448 to 147360
Exporting records 147360 to

# DEBUGGING

In [27]:
# EG patch embeddings are produced from 32x32 px patches of Sentinel-2 (so 320m^2 image footprints), 
# efm_patch_embed = reduce_nested(efm.mosaic(),
#                            eg_patches_asset,
#                            reducer=ee.Reducer.mean(),
#                            scale=10,
#                            crs='EPSG:4326',
#                            crs_transform=None,
#                            best_effort=True,
#                            maxPixels=1e13,
#                            tileScale=16
#                           )

# pprint(efm_patch_embed.first().getInfo()['properties'])

# # Apply the function to the feature collection
# updated_fc = efm_patch_embed.map(embedding_array_prop)
# updated_fc = updated_fc.select(['id','tile','geometry', 'embedding_efm','random'])
# # Print the first feature to verify
# pprint(updated_fc.first().getInfo())


# updated_fc = updated_fc.map(convert_embedding_to_string)
# pprint(updated_fc.first().get('embedding_efm').getInfo())

# m = geemap.Map()
# m.addLayer(efm, {}, "EFM",False)
# m.addLayer(eg_patches_asset, {}, "EG patches asset",False)
# m.addLayer(efm_patch_embed.first(), {}, "EFM patch embedding first")
# m.centerObject(efm_patch_embed.first(), 12)
# m

In [28]:
# is it the slicing/chunking of the FC itself that is inefficient? 
# task = ee.batch.Export.table.toAsset(
#     collection=updated_fc.limit(50,'random',True),
#     description='efm_patch_embed_from_eg_patchasset_Asset_lim50',
#     assetId='projects/g4g-eaas/assets/efm_patch_embed_from_eg_patchasset_lim50'
# )
# task.start()

# is exporting to BQ less effient than exporting to asset?
# task = ee.batch.Export.table.toBigQuery(
#             collection=updated_fc.limit(50,'random',True),
#             description="efm_patch_embed_from_eg_patchasset_BQ_lim50",
#             table='g4g-eaas.embeddings_sea.efm_patch_embed_from_eg_patchasset_BQ_lim50',
#             append=True,
#             selectors=['id', 'tile', 'geometry', 'embedding_efm']
#         )
# task.start()

In [29]:
# using .random() and then .filter() to chunk the feature collection into smaller export chunks
# import math
# chunks = 1000
# for tile in records_tile.keys():
#     record_count = records_tile[tile]
#     step = 1/chunks
#     chunk_size = math.ceil(record_count/chunks)
#     print(f"record count: {record_count}, chunks: {chunks}, chunk_size: {chunk_size}, step:{step}")
    
#     this_tile = (
#         updated_fc.filter(ee.Filter.eq('tile', tile))
#         .randomColumn('random', 0,'uniform',['id'])
#                  )
    
#     for i in range(chunks):
#         start = (i * step)
#         end = ((i + 1) * step)
#         print(start, end)
#         filtered_fc = this_tile.filter(ee.Filter.rangeContains('random', start, end))
#         task = ee.batch.Export.table.toBigQuery(
#             collection=filtered_fc,
#             description=f"EG_embeddings_efm_{start}_{end}",
#             table='g4g-eaas.embeddings_sea.earthgenome_efm_fc_chunked',
#             append=True,
#             selectors=['id', 'tile', 'geometry', 'embedding_efm'],
#         )
#         task.start()
#         if start > 0.0:
#             break
    
    
#     break

In [30]:
# why are fc chunks empty upon export?...

# this_tile = updated_fc.filter(ee.Filter.eq('tile', tile)).randomColumn('random', 0,'uniform')
# print(this_tile.limit(1000,'random',True).size().getInfo())
# task = ee.batch.Export.table.toAsset(
#             collection=this_tile.limit(1000, 'random',True),
#             description=f"eg_embeddings_efm_randFC_lim1000",
#             assetId='projects/g4g-eaas/assets/eg_embeddings_efm_randFC_lim1000',
#             # table='g4g-eaas.embeddings_sea.earthgenome_efm_fc_chunked',
#             # append=True,
#             selectors=['id', 'tile', 'geometry', 'embedding_efm']
#         )
# task.start()


In [31]:
# bq_task = ee.batch.Export.table.toBigQuery(
#     updated_fc,
#     description='efm_agg_embed_on_eg_patches_ones2tile',
#     table='g4g-eaas.embeddings_sea.efm_agg_embed_on_eg_patches_ones2tile',
#     append=True,
#     selectors=['id', 'tile', 'geo', 'embedding_efm'],)
# bq_task.start()