In [None]:
# !pip install git+https://github.com/boettiger-lab/cng-python

In [None]:

import ibis
from ibis import _
from cng.utils import *
from cng.h3 import * 
import os
con = ibis.duckdb.connect("local.db", extensions = ["spatial", "h3"])
install_h3()


# Must used scoped secrets with different names for the different endpoints
set_secrets(con, name = "minio") # read/write using AWS env var credentials
set_secrets(con, "", "", endpoint = "s3.amazonaws.com", region="us-west-2", name = "source", bucket = "us-west-2.opendata.source.coop")

def geom_to_cell(df, zoom=8, keep_cols=None):
    con = df.get_backend()
    
    # Default to keeping all columns except geom if not specified
    if keep_cols is None:
        keep_cols = [col for col in df.columns if col != 'geom']
    
    # Build column list for SELECT statements
    col_list = ', '.join(keep_cols)
    
    # all types must be multi-polygons
    cases = ibis.cases(
        (df.geom.geometry_type() == 'POLYGON', ST_Multi(df.geom)),
        else_=df.geom,
    )
    
    df = df.mutate(geom=cases)
    sql = ibis.to_sql(df)
    
    expr = f'''
        WITH t1 AS (
            SELECT {col_list}, UNNEST(ST_Dump(ST_GeomFromWKB(geom))).geom AS geom 
            FROM ({sql})
        ) 
        SELECT *, h3_polygon_wkt_to_cells_string(geom, {zoom}) AS h3id FROM t1
    '''

    out = con.sql(expr)
    return out







TypeError: install_h3() takes 0 positional arguments but 1 was given

In [None]:
SOURCE = "s3://us-west-2.opendata.source.coop/giswqs/nwi/wetlands/**"
SOURCE = "s3://public-nwi/aws/us-west-2.opendata.source.coop/giswqs/nwi/wetlands/**"

nwi =(con
    .read_parquet(SOURCE, filename = True)
    .select('geometry', 'ATTRIBUTE', 'WETLAND_TYPE', 'filename')
    .rename(geom = "geometry")
    .mutate(state_code=_.filename.re_extract(r"([A-Z]{2})_Wetlands.parquet", 1))
    .mutate(geom =  _.geom.convert('EPSG:5070','EPSG:4326'))
    .drop('filename')
)


x = nwi.head().execute()

In [None]:
MEMORY_LIMIT='20GB'
CHUNK_SIZE = 10000
con.raw_sql(f"SET memory_limit='{MEMORY_LIMIT}';")

OUTPUT_PATH="s3://public-wetlands/nwi/"


table = nwi
# Read parquet file

# Get total row count and calculate chunks
total_rows = table.count().execute()
num_chunks = (total_rows + CHUNK_SIZE - 1) // CHUNK_SIZE

print(f"Total rows: {total_rows:,}")
print(f"Chunk size: {CHUNK_SIZE:,}")
print(f"Number of chunks: {num_chunks}")


In [None]:

chunk_id = 0
offset = chunk_id * CHUNK_SIZE
print(f"\nProcessing chunk {chunk_id + 1}/{num_chunks} (rows {offset:,} to {min(offset + CHUNK_SIZE, total_rows):,})")

chunk = table.limit(CHUNK_SIZE, offset=offset)
result = geom_to_cell(chunk, zoom=8).mutate(h8 = _.h3id.unnest()).drop('h3id')

result.to_parquet("test_10.parquet")

In [None]:
con.read_parquet("test_10.parquet").head().execute()

In [None]:

# Process each chunk
for chunk_id in range(num_chunks):
    offset = chunk_id * CHUNK_SIZE
    print(f"\nProcessing chunk {chunk_id + 1}/{num_chunks} (rows {offset:,} to {min(offset + CHUNK_SIZE, total_rows):,})")
    
    # Get chunk with row filtering
    chunk = table.limit(CHUNK_SIZE, offset=offset)
    result = geom_to_cell(chunk, zoom=8).mutate(h8 = _.h3id.unnest())
    
    # Write to parquet
    output_file = f"{OUTPUT_PATH}chunks/chunk_{chunk_id:04d}.parquet"
    result.to_parquet(output_file)
    
    print(f"  ✓ Chunk {chunk_id} written")



In [None]:
print("\n✅ All chunks processed!")

# Combine all chunks
print("\nCombining chunks...")
combined = con.read_parquet(f'{OUTPUT_PATH}/chunks/chunk_*.parquet')
combined.to_parquet(f'{OUTPUT_PATH}/hex/combined_results.parquet')
print("✅ Combined file created!")

con.disconnect()