diff --git a/deploy/coiled/deploy.py b/deploy/coiled/deploy.py index 60357821..f8a1bf12 100644 --- a/deploy/coiled/deploy.py +++ b/deploy/coiled/deploy.py @@ -38,7 +38,7 @@ def wait_for_completion(self, exit_on_failure=False): if state == 'done': print(f'{job_id} success') completed.add(job_id) - elif state in ('failed', 'error'): + elif state in ('failed', 'error', 'done (errors)'): print(f'{job_id} failed') failed.add(job_id) if exit_on_failure: @@ -139,16 +139,6 @@ def main( # # this is a monitoring / blocking func. We should be able to block with this, then run 02, 03 etc. batch_manager_01.wait_for_completion() - # ----------- 02 Pyramid ------------- - # NOTE: We need to do more work on pyramiding - currently excluded - # This is non-blocking, since there are no post-pyramid dependent operations, so no wait_for_completion (I think) - # if pyramid: - # batch_manager_pyarmid_02 = CoiledBatchManager(debug=debug) - # batch_manager_pyarmid_02.submit_job( - # command=f'python ../../ocr/pipeline/02_Pyramid.py -b {branch}', - # name=f'create-pyramid-{branch}', - # ) - # ----------- 02 Aggregate ------------- batch_manager_aggregate_02 = CoiledBatchManager(debug=debug) batch_manager_aggregate_02.submit_job( @@ -161,24 +151,25 @@ def main( if summary_stats: batch_manager_county_aggregation_01 = CoiledBatchManager(debug=debug) batch_manager_county_aggregation_01.submit_job( - command=f'python ../../ocr/pipeline/02_county_summary_stats.py -b {branch}', + command=f'python ../../ocr/pipeline/02_aggregated_region_summary_stats.py -b {branch}', name=f'create-county-summary-stats-{branch}', - kwargs={**shared_coiled_kwargs, 'vm_type': 'm8g.xlarge'}, + kwargs={**shared_coiled_kwargs, 'vm_type': 'm8g.6xlarge'}, ) batch_manager_county_aggregation_01.wait_for_completion() - # create county summary stats PMTiles layer + + # create summary stats PMTiles layer batch_manager_county_tiles_02 = CoiledBatchManager(debug=debug) batch_manager_county_tiles_02.submit_job( - command=f'../../ocr/pipeline/03_county_pmtiles.sh {branch}', + command=f'../../ocr/pipeline/03_aggregated_region_pmtiles.sh {branch}', name=f'create-county-pmtiles-{branch}', kwargs={ **shared_coiled_kwargs, - 'vm_type': 'c7a.xlarge', + 'vm_type': 'c7a.4xlarge', 'container': 'quay.io/carbonplan/ocr:latest', }, ) - # # ------------- 03 Tiles --------------- + # ------------- 03 Tiles --------------- batch_manager_03 = CoiledBatchManager(debug=debug) batch_manager_03.submit_job( command=f'../../ocr/pipeline/03_Tiles.sh {branch}', diff --git a/input-data/vector/aggregated_regions/counties/01_tiger_zipped_to_geoparuqet.py b/input-data/vector/aggregated_regions/county/01_tiger_zipped_to_geoparquet.py similarity index 67% rename from input-data/vector/aggregated_regions/counties/01_tiger_zipped_to_geoparuqet.py rename to input-data/vector/aggregated_regions/county/01_tiger_zipped_to_geoparquet.py index 4cc666a0..06595bde 100644 --- a/input-data/vector/aggregated_regions/counties/01_tiger_zipped_to_geoparuqet.py +++ b/input-data/vector/aggregated_regions/county/01_tiger_zipped_to_geoparquet.py @@ -1,3 +1,10 @@ +# COILED n-tasks 1 +# COILED --region us-west-2 +# COILED --forward-aws-credentials +# COILED --vm-type m8g.large +# COILED --tag project=OCR + + import geopandas as gpd gdf = gpd.read_file( @@ -5,9 +12,8 @@ columns=['NAME', 'geometry'], ) - gdf.to_parquet( - 's3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/counties.parquet', + 's3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/counties/counties.parquet', compression='zstd', geometry_encoding='WKB', write_covering_bbox=True, diff --git a/input-data/vector/aggregated_regions/tract/01_tiger_zipped_to_geoparquet.py b/input-data/vector/aggregated_regions/tract/01_tiger_zipped_to_geoparquet.py new file mode 100644 index 00000000..c68a1988 --- /dev/null +++ b/input-data/vector/aggregated_regions/tract/01_tiger_zipped_to_geoparquet.py @@ -0,0 +1,75 @@ +# COILED n-tasks 1 +# COILED --region us-west-2 +# COILED --forward-aws-credentials +# COILED --vm-type m8g.large +# COILED --tag project=OCR + + +import geopandas as gpd +from tqdm import tqdm + +# These are FIPS codes for states minus alaska and Hawaii. +# The census tract data is split into zipped files per state / FIPS code. +FIPS_codes = [ + '01', + '04', + '05', + '06', + '08', + '09', + '10', + '11', + '12', + '13', + '16', + '17', + '18', + '19', + '20', + '21', + '22', + '23', + '24', + '25', + '26', + '27', + '28', + '29', + '30', + '31', + '32', + '33', + '34', + '35', + '36', + '37', + '38', + '39', + '41', + '42', + '44', + '45', + '46', + '47', + '48', + '49', + '50', + '51', + '53', + '54', + '55', + '56', +] + +for FIPS in tqdm(FIPS_codes): + # Using geopandas to unzip + download since fsspec allows reading zipped files over http! + tract_url = f'https://www2.census.gov/geo/tiger/TIGER2024/TRACT/tl_2024_{FIPS}_tract.zip' + gdf = gpd.read_file(tract_url, columns=['TRACTCE', 'GEOID', 'NAME', 'geometry']) + + gdf.to_parquet( + f's3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/tracts/FIPS/FIPS_{FIPS}.parquet', + compression='zstd', + geometry_encoding='WKB', + write_covering_bbox=True, + schema_version='1.1.0', + ) diff --git a/input-data/vector/aggregated_regions/tract/02_aggregate_tracts_to_geoparquet.py b/input-data/vector/aggregated_regions/tract/02_aggregate_tracts_to_geoparquet.py new file mode 100644 index 00000000..169c73d8 --- /dev/null +++ b/input-data/vector/aggregated_regions/tract/02_aggregate_tracts_to_geoparquet.py @@ -0,0 +1,18 @@ +# COILED n-tasks 1 +# COILED --region us-west-2 +# COILED --forward-aws-credentials +# COILED --vm-type m8g.large +# COILED --tag project=OCR + +import duckdb + +from ocr.utils import apply_s3_creds, install_load_extensions + +install_load_extensions() +apply_s3_creds() + +duckdb.query("""COPY (SELECT * FROM read_parquet('s3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/tracts/FIPS/*.parquet')) + TO 's3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/tracts/tracts.parquet' ( + FORMAT 'parquet', + COMPRESSION 'zstd', + OVERWRITE_OR_IGNORE true)""") diff --git a/ocr/pipeline/02_county_summary_stats.py b/ocr/pipeline/02_aggregated_region_summary_stats.py similarity index 58% rename from ocr/pipeline/02_county_summary_stats.py rename to ocr/pipeline/02_aggregated_region_summary_stats.py index 4dcc6b99..952f8413 100644 --- a/ocr/pipeline/02_county_summary_stats.py +++ b/ocr/pipeline/02_aggregated_region_summary_stats.py @@ -11,6 +11,7 @@ def create_summary_stats(branch: str): hist_bins = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100] + # tmp table for buildings con.execute(f""" CREATE TEMP TABLE temp_buildings AS SELECT geometry, @@ -36,16 +37,27 @@ def create_summary_stats(branch: str): FROM read_parquet('s3://carbonplan-ocr/intermediate/fire-risk/vector/{branch}/consolidated_geoparquet.parquet') """) + # tmp table for geoms con.execute(""" CREATE TEMP TABLE temp_counties AS SELECT NAME, geometry FROM read_parquet('s3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/counties.parquet') """) + # tmp table for tracts + con.execute(""" + CREATE TEMP TABLE temp_tracts AS + SELECT GEOID, geometry + FROM read_parquet('s3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/tracts/tracts.parquet') + """) + + # create spatial index on geom cols con.execute('CREATE INDEX buildings_spatial_idx ON temp_buildings USING RTREE (geometry)') con.execute('CREATE INDEX counties_spatial_idx ON temp_counties USING RTREE (geometry)') + con.execute('CREATE INDEX tracts_spatial_idx ON temp_tracts USING RTREE (geometry)') - con.query(f"""COPY (SELECT b.NAME as county_name, count(b.NAME) as building_count, + # county level histograms + con.query(f"""COPY (SELECT b.NAME as county_name, count(a.risk_2011_horizon_1) as building_count, round(avg(a.risk_2011_horizon_1), 2) as avg_risk_2011_horizon_1, round(avg(a.risk_2011_horizon_15), 2) as avg_risk_2011_horizon_15, round(avg(a.risk_2011_horizon_30), 2) as avg_risk_2011_horizon_30, @@ -81,6 +93,43 @@ def create_summary_stats(branch: str): OVERWRITE_OR_IGNORE true); """) + # tract level histograms - we should refactor this mostly shared SQL + con.query(f"""COPY (SELECT b.GEOID as tract_geoid, count(a.risk_2011_horizon_1) as building_count, + round(avg(a.risk_2011_horizon_1), 2) as avg_risk_2011_horizon_1, + round(avg(a.risk_2011_horizon_15), 2) as avg_risk_2011_horizon_15, + round(avg(a.risk_2011_horizon_30), 2) as avg_risk_2011_horizon_30, + round(avg(a.risk_2047_horizon_1), 2) as avg_risk_2047_horizon_1, + round(avg(a.risk_2047_horizon_15), 2) as avg_risk_2047_horizon_15, + round(avg(a.risk_2047_horizon_30), 2) as avg_risk_2047_horizon_30, + round(avg(a.wind_risk_2011_horizon_1), 2) as avg_wind_risk_2011_horizon_1, + round(avg(a.wind_risk_2011_horizon_15), 2) as avg_wind_risk_2011_horizon_15, + round(avg(a.wind_risk_2011_horizon_30), 2) as avg_wind_risk_2011_horizon_30, + round(avg(a.wind_risk_2047_horizon_1), 2) as avg_wind_risk_2047_horizon_1, + round(avg(a.wind_risk_2047_horizon_15), 2) as avg_wind_risk_2047_horizon_15, + round(avg(a.wind_risk_2047_horizon_30), 2) as avg_wind_risk_2047_horizon_30, + map_values(histogram(a.risk_2011_horizon_1, {hist_bins})) as risk_2011_horizon_1, + map_values(histogram(a.risk_2011_horizon_15, {hist_bins})) as risk_2011_horizon_15, + map_values(histogram(a.risk_2011_horizon_30, {hist_bins})) as risk_2011_horizon_30, + map_values(histogram(a.risk_2047_horizon_1, {hist_bins})) as risk_2047_horizon_1, + map_values(histogram(a.risk_2047_horizon_15, {hist_bins})) as risk_2047_horizon_15, + map_values(histogram(a.risk_2047_horizon_30, {hist_bins})) as risk_2047_horizon_30, + map_values(histogram(a.wind_risk_2011_horizon_1, {hist_bins})) as wind_risk_2011_horizon_1, + map_values(histogram(a.wind_risk_2011_horizon_15, {hist_bins})) as wind_risk_2011_horizon_15, + map_values(histogram(a.wind_risk_2011_horizon_30, {hist_bins})) as wind_risk_2011_horizon_30, + map_values(histogram(a.wind_risk_2047_horizon_1, {hist_bins})) as wind_risk_2047_horizon_1, + map_values(histogram(a.wind_risk_2047_horizon_15, {hist_bins})) as wind_risk_2047_horizon_15, + map_values(histogram(a.wind_risk_2047_horizon_30, {hist_bins})) as wind_risk_2047_horizon_30, + b.geometry as geometry + FROM temp_buildings a + JOIN temp_tracts b ON ST_Intersects(a.geometry, b.geometry) + GROUP BY b.GEOID, b.geometry ) + TO 's3://carbonplan-ocr/intermediate/fire-risk/vector/{branch}/region_aggregation/tract/tract_summary_stats.parquet' + ( + FORMAT 'parquet', + COMPRESSION 'zstd', + OVERWRITE_OR_IGNORE true); + """) + @click.command() @click.option('-b', '--branch', help='data branch: [QA, prod]. Default QA') diff --git a/ocr/pipeline/03_aggregated_region_pmtiles.sh b/ocr/pipeline/03_aggregated_region_pmtiles.sh new file mode 100644 index 00000000..1d4a74b9 --- /dev/null +++ b/ocr/pipeline/03_aggregated_region_pmtiles.sh @@ -0,0 +1,97 @@ + +#!/bin/bash + +# COILED container quay.io/carbonplan/ocr:latest +# COILED n-tasks 1 +# COILED region us-west-2 +# COILED --forward-aws-credentials +# COILED --tag project=OCR +# COILED --vm-type c7a.xlarge + + +duckdb -c " +load spatial; + +COPY ( + SELECT + 'Feature' AS type, + json_object( + 'tract_geoid', tract_geoid, + 'building_count', building_count, + 'avg_risk_2011_horizon_1', avg_risk_2011_horizon_1, + 'avg_risk_2011_horizon_15', avg_risk_2011_horizon_15, + 'avg_risk_2011_horizon_30', avg_risk_2011_horizon_30, + 'avg_risk_2047_horizon_1', avg_risk_2047_horizon_1, + 'avg_risk_2047_horizon_15', avg_risk_2047_horizon_15, + 'avg_risk_2047_horizon_30', avg_risk_2047_horizon_30, + 'avg_wind_risk_2011_horizon_1', avg_wind_risk_2011_horizon_1, + 'avg_wind_risk_2011_horizon_15', avg_wind_risk_2011_horizon_15, + 'avg_wind_risk_2011_horizon_30', avg_wind_risk_2011_horizon_30, + 'avg_wind_risk_2047_horizon_1', avg_wind_risk_2047_horizon_1, + 'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15, + 'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15, + 'risk_2011_horizon_1', risk_2011_horizon_1, + 'risk_2011_horizon_15', risk_2011_horizon_15, + 'risk_2011_horizon_30', risk_2011_horizon_30, + 'risk_2047_horizon_1', risk_2047_horizon_1, + 'risk_2047_horizon_15', risk_2047_horizon_15, + 'risk_2047_horizon_30', risk_2047_horizon_30, + 'wind_risk_2011_horizon_1', wind_risk_2011_horizon_1, + 'wind_risk_2011_horizon_15', wind_risk_2011_horizon_15, + 'wind_risk_2011_horizon_30', wind_risk_2011_horizon_30, + 'wind_risk_2047_horizon_1', wind_risk_2047_horizon_1, + 'wind_risk_2047_horizon_15', wind_risk_2047_horizon_15, + 'wind_risk_2047_horizon_30', wind_risk_2047_horizon_30 + ) AS properties, + json(ST_AsGeoJson(geometry)) AS geometry + + FROM read_parquet('s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/tract/tract_summary_stats.parquet') +) TO STDOUT (FORMAT json);" | tippecanoe -o tract.pmtiles -l risk -n "tract" -f -P --drop-smallest-as-needed -q --extend-zooms-if-still-dropping -zg + +duckdb -c " +load spatial; + +COPY ( + SELECT + 'Feature' AS type, + json_object( + 'county_name', county_name, + 'building_count', building_count, + 'avg_risk_2011_horizon_1', avg_risk_2011_horizon_1, + 'avg_risk_2011_horizon_15', avg_risk_2011_horizon_15, + 'avg_risk_2011_horizon_30', avg_risk_2011_horizon_30, + 'avg_risk_2047_horizon_1', avg_risk_2047_horizon_1, + 'avg_risk_2047_horizon_15', avg_risk_2047_horizon_15, + 'avg_risk_2047_horizon_30', avg_risk_2047_horizon_30, + 'avg_wind_risk_2011_horizon_1', avg_wind_risk_2011_horizon_1, + 'avg_wind_risk_2011_horizon_15', avg_wind_risk_2011_horizon_15, + 'avg_wind_risk_2011_horizon_30', avg_wind_risk_2011_horizon_30, + 'avg_wind_risk_2047_horizon_1', avg_wind_risk_2047_horizon_1, + 'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15, + 'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15, + 'risk_2011_horizon_1', risk_2011_horizon_1, + 'risk_2011_horizon_15', risk_2011_horizon_15, + 'risk_2011_horizon_30', risk_2011_horizon_30, + 'risk_2047_horizon_1', risk_2047_horizon_1, + 'risk_2047_horizon_15', risk_2047_horizon_15, + 'risk_2047_horizon_30', risk_2047_horizon_30, + 'wind_risk_2011_horizon_1', wind_risk_2011_horizon_1, + 'wind_risk_2011_horizon_15', wind_risk_2011_horizon_15, + 'wind_risk_2011_horizon_30', wind_risk_2011_horizon_30, + 'wind_risk_2047_horizon_1', wind_risk_2047_horizon_1, + 'wind_risk_2047_horizon_15', wind_risk_2047_horizon_15, + 'wind_risk_2047_horizon_30', wind_risk_2047_horizon_30 + ) AS properties, + json(ST_AsGeoJson(geometry)) AS geometry + + FROM read_parquet('s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/county/county_summary_stats.parquet') +) TO STDOUT (FORMAT json);" | tippecanoe -o counties.pmtiles -l risk -n "county" -f -P --drop-smallest-as-needed -q --extend-zooms-if-still-dropping -zg + + +echo tract tippecanoe tiles done + +s5cmd cp --sp "tract.pmtiles" "s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/tract/tract.pmtiles" + +echo county tippecanoe tiles done + +s5cmd cp --sp "counties.pmtiles" "s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/county/counties.pmtiles" diff --git a/ocr/pipeline/03_county_pmtiles.sh b/ocr/pipeline/03_county_pmtiles.sh deleted file mode 100644 index e324d81f..00000000 --- a/ocr/pipeline/03_county_pmtiles.sh +++ /dev/null @@ -1,55 +0,0 @@ - -#!/bin/bash - -# COILED container quay.io/carbonplan/ocr:latest -# COILED n-tasks 1 -# COILED region us-west-2 -# COILED --forward-aws-credentials -# COILED --tag project=OCR -# COILED --vm-type c7a.xlarge - - -duckdb -c " -load spatial; - -COPY ( - SELECT - 'Feature' AS type, - json_object( - 'county_name', county_name, - 'building_count', building_count, - 'avg_risk_2011_horizon_1', avg_risk_2011_horizon_1, - 'avg_risk_2011_horizon_15', avg_risk_2011_horizon_15, - 'avg_risk_2011_horizon_30', avg_risk_2011_horizon_30, - 'avg_risk_2047_horizon_1', avg_risk_2047_horizon_1, - 'avg_risk_2047_horizon_15', avg_risk_2047_horizon_15, - 'avg_risk_2047_horizon_30', avg_risk_2047_horizon_30, - 'avg_wind_risk_2011_horizon_1', avg_wind_risk_2011_horizon_1, - 'avg_wind_risk_2011_horizon_15', avg_wind_risk_2011_horizon_15, - 'avg_wind_risk_2011_horizon_30', avg_wind_risk_2011_horizon_30, - 'avg_wind_risk_2047_horizon_1', avg_wind_risk_2047_horizon_1, - 'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15, - 'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15, - 'risk_2011_horizon_1', risk_2011_horizon_1, - 'risk_2011_horizon_15', risk_2011_horizon_15, - 'risk_2011_horizon_30', risk_2011_horizon_30, - 'risk_2047_horizon_1', risk_2047_horizon_1, - 'risk_2047_horizon_15', risk_2047_horizon_15, - 'risk_2047_horizon_30', risk_2047_horizon_30, - 'wind_risk_2011_horizon_1', wind_risk_2011_horizon_1, - 'wind_risk_2011_horizon_15', wind_risk_2011_horizon_15, - 'wind_risk_2011_horizon_30', wind_risk_2011_horizon_30, - 'wind_risk_2047_horizon_1', wind_risk_2047_horizon_1, - 'wind_risk_2047_horizon_15', wind_risk_2047_horizon_15, - 'wind_risk_2047_horizon_30', wind_risk_2047_horizon_30 - ) AS properties, - json(ST_AsGeoJson(geometry)) AS geometry - - FROM read_parquet('s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/county/county_summary_stats.parquet') -) TO STDOUT (FORMAT json);" | tippecanoe -o aggregated.pmtiles -l risk -n "county" -f -P --drop-smallest-as-needed -q --extend-zooms-if-still-dropping -zg - - - -echo tippecanoe tiles done - -s5cmd cp --sp "aggregated.pmtiles" "s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/county/counties.pmtiles" diff --git a/ocr/pipeline/test_deploy.py b/ocr/pipeline/test_deploy.py new file mode 100644 index 00000000..efe0d48b --- /dev/null +++ b/ocr/pipeline/test_deploy.py @@ -0,0 +1,5 @@ +import coiled + +batch_result = coiled.batch.run( + command='python test_process.py', name='batch_par_test', map_over_values=['y2_x1', 'y2_x2'] +) diff --git a/ocr/pipeline/test_process.py b/ocr/pipeline/test_process.py new file mode 100644 index 00000000..c925df6d --- /dev/null +++ b/ocr/pipeline/test_process.py @@ -0,0 +1,11 @@ +# COILED --region us-west-2 +# COILED --forward-aws-credentials +# COILED --vm-type m8g.medium +# COILED --scheduler-vm-type m8g.medium +# COILED --tag project=OCR + + +import os + +filename = os.environ['COILED_BATCH_TASK_INPUT'] +print(filename)