Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions deploy/coiled/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def wait_for_completion(self, exit_on_failure=False):
if state == 'done':
print(f'{job_id} success')
completed.add(job_id)
elif state in ('failed', 'error'):
elif state in ('failed', 'error', 'done (errors)'):
print(f'{job_id} failed')
failed.add(job_id)
if exit_on_failure:
Expand Down Expand Up @@ -139,16 +139,6 @@ def main(
# # this is a monitoring / blocking func. We should be able to block with this, then run 02, 03 etc.
batch_manager_01.wait_for_completion()

# ----------- 02 Pyramid -------------
# NOTE: We need to do more work on pyramiding - currently excluded
# This is non-blocking, since there are no post-pyramid dependent operations, so no wait_for_completion (I think)
# if pyramid:
# batch_manager_pyarmid_02 = CoiledBatchManager(debug=debug)
# batch_manager_pyarmid_02.submit_job(
# command=f'python ../../ocr/pipeline/02_Pyramid.py -b {branch}',
# name=f'create-pyramid-{branch}',
# )

# ----------- 02 Aggregate -------------
batch_manager_aggregate_02 = CoiledBatchManager(debug=debug)
batch_manager_aggregate_02.submit_job(
Expand All @@ -161,24 +151,25 @@ def main(
if summary_stats:
batch_manager_county_aggregation_01 = CoiledBatchManager(debug=debug)
batch_manager_county_aggregation_01.submit_job(
command=f'python ../../ocr/pipeline/02_county_summary_stats.py -b {branch}',
command=f'python ../../ocr/pipeline/02_aggregated_region_summary_stats.py -b {branch}',
name=f'create-county-summary-stats-{branch}',
kwargs={**shared_coiled_kwargs, 'vm_type': 'm8g.xlarge'},
kwargs={**shared_coiled_kwargs, 'vm_type': 'm8g.6xlarge'},
)
batch_manager_county_aggregation_01.wait_for_completion()
# create county summary stats PMTiles layer

# create summary stats PMTiles layer
batch_manager_county_tiles_02 = CoiledBatchManager(debug=debug)
batch_manager_county_tiles_02.submit_job(
command=f'../../ocr/pipeline/03_county_pmtiles.sh {branch}',
command=f'../../ocr/pipeline/03_aggregated_region_pmtiles.sh {branch}',
name=f'create-county-pmtiles-{branch}',
kwargs={
**shared_coiled_kwargs,
'vm_type': 'c7a.xlarge',
'vm_type': 'c7a.4xlarge',
'container': 'quay.io/carbonplan/ocr:latest',
},
)

# # ------------- 03 Tiles ---------------
# ------------- 03 Tiles ---------------
batch_manager_03 = CoiledBatchManager(debug=debug)
batch_manager_03.submit_job(
command=f'../../ocr/pipeline/03_Tiles.sh {branch}',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# COILED n-tasks 1
# COILED --region us-west-2
# COILED --forward-aws-credentials
# COILED --vm-type m8g.large
# COILED --tag project=OCR


import geopandas as gpd

gdf = gpd.read_file(
'https://www2.census.gov/geo/tiger/TIGER2024/COUNTY/tl_2024_us_county.zip',
columns=['NAME', 'geometry'],
)


gdf.to_parquet(
's3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/counties.parquet',
's3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/counties/counties.parquet',
compression='zstd',
geometry_encoding='WKB',
write_covering_bbox=True,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# COILED n-tasks 1
# COILED --region us-west-2
# COILED --forward-aws-credentials
# COILED --vm-type m8g.large
# COILED --tag project=OCR


import geopandas as gpd
from tqdm import tqdm

# These are FIPS codes for states minus alaska and Hawaii.
# The census tract data is split into zipped files per state / FIPS code.
FIPS_codes = [
'01',
'04',
'05',
'06',
'08',
'09',
'10',
'11',
'12',
'13',
'16',
'17',
'18',
'19',
'20',
'21',
'22',
'23',
'24',
'25',
'26',
'27',
'28',
'29',
'30',
'31',
'32',
'33',
'34',
'35',
'36',
'37',
'38',
'39',
'41',
'42',
'44',
'45',
'46',
'47',
'48',
'49',
'50',
'51',
'53',
'54',
'55',
'56',
]

for FIPS in tqdm(FIPS_codes):
# Using geopandas to unzip + download since fsspec allows reading zipped files over http!
tract_url = f'https://www2.census.gov/geo/tiger/TIGER2024/TRACT/tl_2024_{FIPS}_tract.zip'
gdf = gpd.read_file(tract_url, columns=['TRACTCE', 'GEOID', 'NAME', 'geometry'])

gdf.to_parquet(
f's3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/tracts/FIPS/FIPS_{FIPS}.parquet',
compression='zstd',
geometry_encoding='WKB',
write_covering_bbox=True,
schema_version='1.1.0',
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# COILED n-tasks 1
# COILED --region us-west-2
# COILED --forward-aws-credentials
# COILED --vm-type m8g.large
# COILED --tag project=OCR

import duckdb

from ocr.utils import apply_s3_creds, install_load_extensions

install_load_extensions()
apply_s3_creds()

duckdb.query("""COPY (SELECT * FROM read_parquet('s3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/tracts/FIPS/*.parquet'))
TO 's3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/tracts/tracts.parquet' (
FORMAT 'parquet',
COMPRESSION 'zstd',
OVERWRITE_OR_IGNORE true)""")
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def create_summary_stats(branch: str):

hist_bins = [0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

# tmp table for buildings
con.execute(f"""
CREATE TEMP TABLE temp_buildings AS
SELECT geometry,
Expand All @@ -36,16 +37,27 @@ def create_summary_stats(branch: str):
FROM read_parquet('s3://carbonplan-ocr/intermediate/fire-risk/vector/{branch}/consolidated_geoparquet.parquet')
""")

# tmp table for geoms
con.execute("""
CREATE TEMP TABLE temp_counties AS
SELECT NAME, geometry
FROM read_parquet('s3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/counties.parquet')
""")

# tmp table for tracts
con.execute("""
CREATE TEMP TABLE temp_tracts AS
SELECT GEOID, geometry
FROM read_parquet('s3://carbonplan-ocr/input/fire-risk/vector/aggregated_regions/tracts/tracts.parquet')
""")

# create spatial index on geom cols
con.execute('CREATE INDEX buildings_spatial_idx ON temp_buildings USING RTREE (geometry)')
con.execute('CREATE INDEX counties_spatial_idx ON temp_counties USING RTREE (geometry)')
con.execute('CREATE INDEX tracts_spatial_idx ON temp_tracts USING RTREE (geometry)')

con.query(f"""COPY (SELECT b.NAME as county_name, count(b.NAME) as building_count,
# county level histograms
con.query(f"""COPY (SELECT b.NAME as county_name, count(a.risk_2011_horizon_1) as building_count,
round(avg(a.risk_2011_horizon_1), 2) as avg_risk_2011_horizon_1,
round(avg(a.risk_2011_horizon_15), 2) as avg_risk_2011_horizon_15,
round(avg(a.risk_2011_horizon_30), 2) as avg_risk_2011_horizon_30,
Expand Down Expand Up @@ -81,6 +93,43 @@ def create_summary_stats(branch: str):
OVERWRITE_OR_IGNORE true);
""")

# tract level histograms - we should refactor this mostly shared SQL
con.query(f"""COPY (SELECT b.GEOID as tract_geoid, count(a.risk_2011_horizon_1) as building_count,
round(avg(a.risk_2011_horizon_1), 2) as avg_risk_2011_horizon_1,
round(avg(a.risk_2011_horizon_15), 2) as avg_risk_2011_horizon_15,
round(avg(a.risk_2011_horizon_30), 2) as avg_risk_2011_horizon_30,
round(avg(a.risk_2047_horizon_1), 2) as avg_risk_2047_horizon_1,
round(avg(a.risk_2047_horizon_15), 2) as avg_risk_2047_horizon_15,
round(avg(a.risk_2047_horizon_30), 2) as avg_risk_2047_horizon_30,
round(avg(a.wind_risk_2011_horizon_1), 2) as avg_wind_risk_2011_horizon_1,
round(avg(a.wind_risk_2011_horizon_15), 2) as avg_wind_risk_2011_horizon_15,
round(avg(a.wind_risk_2011_horizon_30), 2) as avg_wind_risk_2011_horizon_30,
round(avg(a.wind_risk_2047_horizon_1), 2) as avg_wind_risk_2047_horizon_1,
round(avg(a.wind_risk_2047_horizon_15), 2) as avg_wind_risk_2047_horizon_15,
round(avg(a.wind_risk_2047_horizon_30), 2) as avg_wind_risk_2047_horizon_30,
map_values(histogram(a.risk_2011_horizon_1, {hist_bins})) as risk_2011_horizon_1,
map_values(histogram(a.risk_2011_horizon_15, {hist_bins})) as risk_2011_horizon_15,
map_values(histogram(a.risk_2011_horizon_30, {hist_bins})) as risk_2011_horizon_30,
map_values(histogram(a.risk_2047_horizon_1, {hist_bins})) as risk_2047_horizon_1,
map_values(histogram(a.risk_2047_horizon_15, {hist_bins})) as risk_2047_horizon_15,
map_values(histogram(a.risk_2047_horizon_30, {hist_bins})) as risk_2047_horizon_30,
map_values(histogram(a.wind_risk_2011_horizon_1, {hist_bins})) as wind_risk_2011_horizon_1,
map_values(histogram(a.wind_risk_2011_horizon_15, {hist_bins})) as wind_risk_2011_horizon_15,
map_values(histogram(a.wind_risk_2011_horizon_30, {hist_bins})) as wind_risk_2011_horizon_30,
map_values(histogram(a.wind_risk_2047_horizon_1, {hist_bins})) as wind_risk_2047_horizon_1,
map_values(histogram(a.wind_risk_2047_horizon_15, {hist_bins})) as wind_risk_2047_horizon_15,
map_values(histogram(a.wind_risk_2047_horizon_30, {hist_bins})) as wind_risk_2047_horizon_30,
b.geometry as geometry
FROM temp_buildings a
JOIN temp_tracts b ON ST_Intersects(a.geometry, b.geometry)
GROUP BY b.GEOID, b.geometry )
TO 's3://carbonplan-ocr/intermediate/fire-risk/vector/{branch}/region_aggregation/tract/tract_summary_stats.parquet'
(
FORMAT 'parquet',
COMPRESSION 'zstd',
OVERWRITE_OR_IGNORE true);
""")


@click.command()
@click.option('-b', '--branch', help='data branch: [QA, prod]. Default QA')
Expand Down
97 changes: 97 additions & 0 deletions ocr/pipeline/03_aggregated_region_pmtiles.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@

#!/bin/bash

# COILED container quay.io/carbonplan/ocr:latest
# COILED n-tasks 1
# COILED region us-west-2
# COILED --forward-aws-credentials
# COILED --tag project=OCR
# COILED --vm-type c7a.xlarge


duckdb -c "
load spatial;

COPY (
SELECT
'Feature' AS type,
json_object(
'tract_geoid', tract_geoid,
'building_count', building_count,
'avg_risk_2011_horizon_1', avg_risk_2011_horizon_1,
'avg_risk_2011_horizon_15', avg_risk_2011_horizon_15,
'avg_risk_2011_horizon_30', avg_risk_2011_horizon_30,
'avg_risk_2047_horizon_1', avg_risk_2047_horizon_1,
'avg_risk_2047_horizon_15', avg_risk_2047_horizon_15,
'avg_risk_2047_horizon_30', avg_risk_2047_horizon_30,
'avg_wind_risk_2011_horizon_1', avg_wind_risk_2011_horizon_1,
'avg_wind_risk_2011_horizon_15', avg_wind_risk_2011_horizon_15,
'avg_wind_risk_2011_horizon_30', avg_wind_risk_2011_horizon_30,
'avg_wind_risk_2047_horizon_1', avg_wind_risk_2047_horizon_1,
'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15,
'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15,
'risk_2011_horizon_1', risk_2011_horizon_1,
'risk_2011_horizon_15', risk_2011_horizon_15,
'risk_2011_horizon_30', risk_2011_horizon_30,
'risk_2047_horizon_1', risk_2047_horizon_1,
'risk_2047_horizon_15', risk_2047_horizon_15,
'risk_2047_horizon_30', risk_2047_horizon_30,
'wind_risk_2011_horizon_1', wind_risk_2011_horizon_1,
'wind_risk_2011_horizon_15', wind_risk_2011_horizon_15,
'wind_risk_2011_horizon_30', wind_risk_2011_horizon_30,
'wind_risk_2047_horizon_1', wind_risk_2047_horizon_1,
'wind_risk_2047_horizon_15', wind_risk_2047_horizon_15,
'wind_risk_2047_horizon_30', wind_risk_2047_horizon_30
) AS properties,
json(ST_AsGeoJson(geometry)) AS geometry

FROM read_parquet('s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/tract/tract_summary_stats.parquet')
) TO STDOUT (FORMAT json);" | tippecanoe -o tract.pmtiles -l risk -n "tract" -f -P --drop-smallest-as-needed -q --extend-zooms-if-still-dropping -zg

duckdb -c "
load spatial;

COPY (
SELECT
'Feature' AS type,
json_object(
'county_name', county_name,
'building_count', building_count,
'avg_risk_2011_horizon_1', avg_risk_2011_horizon_1,
'avg_risk_2011_horizon_15', avg_risk_2011_horizon_15,
'avg_risk_2011_horizon_30', avg_risk_2011_horizon_30,
'avg_risk_2047_horizon_1', avg_risk_2047_horizon_1,
'avg_risk_2047_horizon_15', avg_risk_2047_horizon_15,
'avg_risk_2047_horizon_30', avg_risk_2047_horizon_30,
'avg_wind_risk_2011_horizon_1', avg_wind_risk_2011_horizon_1,
'avg_wind_risk_2011_horizon_15', avg_wind_risk_2011_horizon_15,
'avg_wind_risk_2011_horizon_30', avg_wind_risk_2011_horizon_30,
'avg_wind_risk_2047_horizon_1', avg_wind_risk_2047_horizon_1,
'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15,
'avg_wind_risk_2047_horizon_15', avg_wind_risk_2047_horizon_15,
'risk_2011_horizon_1', risk_2011_horizon_1,
'risk_2011_horizon_15', risk_2011_horizon_15,
'risk_2011_horizon_30', risk_2011_horizon_30,
'risk_2047_horizon_1', risk_2047_horizon_1,
'risk_2047_horizon_15', risk_2047_horizon_15,
'risk_2047_horizon_30', risk_2047_horizon_30,
'wind_risk_2011_horizon_1', wind_risk_2011_horizon_1,
'wind_risk_2011_horizon_15', wind_risk_2011_horizon_15,
'wind_risk_2011_horizon_30', wind_risk_2011_horizon_30,
'wind_risk_2047_horizon_1', wind_risk_2047_horizon_1,
'wind_risk_2047_horizon_15', wind_risk_2047_horizon_15,
'wind_risk_2047_horizon_30', wind_risk_2047_horizon_30
) AS properties,
json(ST_AsGeoJson(geometry)) AS geometry

FROM read_parquet('s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/county/county_summary_stats.parquet')
) TO STDOUT (FORMAT json);" | tippecanoe -o counties.pmtiles -l risk -n "county" -f -P --drop-smallest-as-needed -q --extend-zooms-if-still-dropping -zg


echo tract tippecanoe tiles done

s5cmd cp --sp "tract.pmtiles" "s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/tract/tract.pmtiles"

echo county tippecanoe tiles done

s5cmd cp --sp "counties.pmtiles" "s3://carbonplan-ocr/intermediate/fire-risk/vector/$1/region_aggregation/county/counties.pmtiles"
Loading
Loading