Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,6 @@ cython_debug/
Pipfile
Pipfile.lock

data/
data/
cbsurge/azure/auth.py
cbsurge/azure/token_cache.json
4 changes: 3 additions & 1 deletion cbsurge/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from osgeo import ogr
AGE_STRUCTURES_ROOT_URL = "https://hub.worldpop.org/geodata"
AZURE_BLOB_CONTAINER_NAME = "stacdata"
AZURE_FILESHARE_NAME = "cbrapida"
AZURE_FILESHARE_NAME = "cbrapida"
ARROWTYPE2OGRTYPE = {'string':ogr.OFTString, 'double':ogr.OFTReal, 'int64':ogr.OFTInteger64, 'int':ogr.OFTInteger}
3 changes: 2 additions & 1 deletion cbsurge/exposure/builtenv/buildings/fgb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from cbsurge.exposure.builtenv.buildings.fgbgdal import OVERPASS_API_URL, GMOSM_BUILDINGS_ROOT
from pyogrio.raw import open_arrow, write_arrow, read
from cbsurge.exposure.builtenv.buildings.pmt import WEB_MERCATOR_TMS
from cbsurge.constants import ARROWTYPE2OGRTYPE
import morecantile as m
from cbsurge import util
import logging
Expand Down Expand Up @@ -60,7 +61,7 @@ def render(self, task):
}
logger = logging.getLogger(__name__)

ARROWTYPE2OGRTYPE = {'string':ogr.OFTString, 'double':ogr.OFTReal, 'int64':ogr.OFTInteger64, 'int':ogr.OFTInteger}


def country_info(bbox=None, overpass_url=OVERPASS_API_URL):
"""
Expand Down
293 changes: 273 additions & 20 deletions cbsurge/exposure/builtenv/buildings/preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,34 @@
import logging
logger = logging.getLogger(__name__)
from osgeo import gdal, ogr
gdal.UseExceptions()
import multiprocessing
import random
from multiprocessing import sharedctypes
import threading
import numpy as np
import pyarrow as pa
import pyproj
import shapely
from osgeo import gdal, ogr, osr


from cbsurge import util
from collections import deque
from rich.progress import Progress
import concurrent
from pyogrio import read_info, read_arrow, read_dataframe
from rasterio import warp
# from pyogrio.raw import open_arrow, read_arrow
import rasterio
from rasterio.windows import Window
from pyarrow import compute as pc
from cbsurge.constants import ARROWTYPE2OGRTYPE
logger = logging.getLogger(__name__)
gdal.UseExceptions()

'''
ogr2ogr -sql "SELECT ST_PointOnSurface(geometry) as geometry, ogc_fid, area_in_meters, confidence FROM buildings" -dialect sqlite /tmp/bldgs1c.fgb /tmp/bldgs1.fgb
'''
def cb(complete, message, stop_event):
#logger.info(f'{complete * 100:.2f}%')
if stop_event and stop_event.is_set():
logger.info(f'GDAL was signalled to stop')
return 0

def create_centroid(src_path=None, src_layer_name=None, dst_path=None, dst_srs=None):
"""
Expand All @@ -27,6 +48,10 @@ def create_centroid(src_path=None, src_layer_name=None, dst_path=None, dst_srs=N
geometryType='POINT'

)

if not ogr.GetDriverByName(options['format']).TestCapability(ogr.OLCFastFeatureCount):
logger.debug('No progress bar is available in create_centroid')

if dst_srs is not None:
options['dstSRS'] = dst_srs
options['reproject'] = True
Expand All @@ -39,42 +64,270 @@ def create_centroid(src_path=None, src_layer_name=None, dst_path=None, dst_srs=N
del ds


def geoarrow_schema_adapter(schema: pa.Schema) -> pa.Schema:
"""
Convert a geoarrow-compatible schema to a proper geoarrow schema

This assumes there is a single "geometry" column with WKB formatting

Parameters
----------
schema: pa.Schema

Returns
-------
pa.Schema
A copy of the input schema with the geometry field replaced with
a new one with the proper geoarrow ARROW:extension metadata

"""
geometry_field_index = schema.get_field_index("geometry")
geometry_field = schema.field(geometry_field_index)
geoarrow_geometry_field = geometry_field.with_metadata(
{b"ARROW:extension:name": b"geoarrow.wkb"}
)

geoarrow_schema = schema.set(geometry_field_index, geoarrow_geometry_field)

return geoarrow_schema

def buildings_in_mask_ogrio(buildings_centroid_path=None, mask_path=None, mask_pixel_value=None,
horizontal_chunks=None, vertical_chunks=None):
def proj_are_equal(src_srs: osr.SpatialReference = None, dst_srs: osr.SpatialReference = None):
"""
Decides if two projections are equal
@param src_srs: the source projection
@param dst_srs: the dst projection
@return: bool, True if the source is different then dst else false
If the src is ESPG:4326 or EPSG:3857 returns False
"""
auth_code_func_name = ".".join(
[osr.SpatialReference.GetAuthorityCode.__module__, osr.SpatialReference.GetAuthorityCode.__name__])
is_same_func_name = ".".join([osr.SpatialReference.IsSame.__module__, osr.SpatialReference.IsSame.__name__])
try:
proj_are_equal = int(src_srs.GetAuthorityCode(None)) == int(dst_srs.GetAuthorityCode(None))
except Exception as evpe:
logger.error(
f'Failed to compare src and dst projections using {auth_code_func_name}. Trying using {is_same_func_name}')
try:
proj_are_equal = bool(src_srs.IsSame(dst_srs))
except Exception as evpe1:
logger.error(
f'Failed to compare src and dst projections using {is_same_func_name}. Error is \n {evpe1}')
raise evpe1

return proj_are_equal

def filter_buildings_in_block(buildings_ds_path=None, mask_ds=None, block=None, block_id=None, band=1):

try:

window = Window(*block)
col_start, row_start, col_size, row_size = block
m = mask_ds.read(band, window=window)

bbox = rasterio.windows.bounds(window=window, transform=mask_ds.transform)

ds = read_dataframe(buildings_ds_path, bbox=bbox, read_geometry=True)

if len(ds) == 0:
return block_id, None, None
pcoords = ds.centroid.get_coordinates()
pcols, prows = ~mask_ds.transform * (pcoords.x, pcoords.y)
prows, pcols = np.floor(prows).astype('i4'), np.floor(pcols).astype('i4')
prows -= row_start
pcols -= col_start
rowmask = (prows >= 0) & (prows < row_size)
colmask = (pcols >= 0) & (pcols < col_size)
rcmask = rowmask & colmask
if rcmask[rcmask].size == 0:
return block_id, None, None
prows = prows[rcmask]
pcols = pcols[rcmask]
ds = ds[rcmask]
rm = m[prows, pcols] == True
mds = ds[rm]
ao = mds.to_arrow(index=False)
table = pa.table(ao)
# schema = geoarrow_schema_adapter(table.schema)
# table = pa.table(table, schema=schema)
table = table.rename_columns(names={'geometry':'wkb_geometry'})

if mds.size == 0:
return block_id, None, None
out_srs = osr.SpatialReference()
out_srs.SetFromUserInput(':'.join(mds.crs.to_authority()))
return block_id, table, out_srs
except Exception as e:

return block_id, None, None



def worker(work=None, result=None, finished=None):


logger.debug(f'starting building filter thread {threading.current_thread().name}')
while True:

job = None
try:
job = work.pop()
except IndexError as ie:
pass
if job is None:
if finished.is_set():
logger.debug(f'worker is finishing in {threading.current_thread().name}')
break
continue

if finished.is_set():
break
logger.debug(f'Starting job in block {job["block_id"]}')

result.append(filter_buildings_in_block(**job))



def filter_buildings(buildings_path=None, mask_path=None, mask_pixel_value=None,
horizontal_chunks=None, vertical_chunks=None, nworkers=1,
out_path=None):
"""
Select buildings whose centroid is inside the masked pixels
:param buildings_centroid_path:
:param buildings_path:
:param mask_path:
:param mask_pixel_value:
:param horizontal_chunks:
:param vertical_chunks:
:return:
"""

with gdal.Open(mask_path, gdal.OF_READONLY) as mds:
print(mds)
nfiltered = 0
failed = []


with ogr.GetDriverByName('FlatGeobuf').CreateDataSource(out_path) as dst_ds:
with rasterio.open(mask_path) as mask_ds:
msr = osr.SpatialReference()
msr.SetFromUserInput(str(mask_ds.crs))
# should_reproj = not proj_are_equal(src_srs=bsr, dst_srs=msr)
# assert should_reproj is False, f'{buildings_path} and {mask_path} need to be in the same projection'
width = mask_ds.width
height = mask_ds.height
assert mask_ds.count == 1, f'The mask dataset {mask_path} contains more than one band'
#mband = mds.GetRasterBand(1)
#m = mband.ReadAsArray()
#ctypes_shared_mem = sharedctypes.RawArray(np.ctypeslib.as_ctypes_type(m.dtype), m.ravel())
# width = mds.RasterXSize
# height = mds.RasterYSize
block_xsize = width // horizontal_chunks
block_ysize = height // vertical_chunks
blocks = util.gen_blocks(blockxsize=block_xsize, blockysize=block_ysize, width=width, height=height)
nblocks, blocks = util.generator_length(blocks)
stop_event = threading.Event()
jobs = deque()
results = deque()
nworkers = nblocks if nworkers > nblocks else nworkers
print(nworkers)
with concurrent.futures.ThreadPoolExecutor(max_workers=nworkers) as executor:
[executor.submit(worker, jobs, results, stop_event) for i in range(nworkers)]
with Progress() as progress:
total_task = progress.add_task(
description=f'[red]Going to filter buildings from {nblocks} blocks/chunks',
total=nblocks)
for block_id, block in enumerate(blocks):
job = dict(
buildings_ds_path=buildings_path,
mask_ds=mask_ds,
block=block,
block_id=block_id,

)
jobs.append(job)


while True:
try:
try:
block_id, table, dst_srs = results.pop()

if table is None:
nfiltered += 1
progress.update(total_task,
description=f'[red]Filtered buildings from {nfiltered} out of {nblocks} blocks',
advance=1)
continue

if dst_ds.GetLayerCount() == 0:

dst_lyr = dst_ds.CreateLayer('buildings_filtered', geom_type=ogr.wkbPolygon, srs=dst_srs,
)
for name in table.schema.names:
if 'wkb' in name or 'geometry' in name: continue

field = table.schema.field(name)
field_type = ARROWTYPE2OGRTYPE[field.type]
logger.debug(f'Creating field {name}: {field.type}: {field_type}')
dst_lyr.CreateField(ogr.FieldDefn(name, field_type))


try:

dst_lyr.WritePyArrow(table)
except Exception as e:
logger.error(
f'Failed to write {table.num_rows} features/rows in block id {block_id} because {e}. Skipping')

dst_lyr.SyncToDisk()
nfiltered += 1
progress.update(total_task,
description=f'[red]Filtered buildings from {nfiltered} out of {nblocks} blocks',
advance=1)
logger.debug(f'{block_id} was processed')
except IndexError as ie:
if not jobs and progress.finished:
stop_event.set()
break
s = random.random() # this one is necessary for ^C/KeyboardInterrupt
time.sleep(s)

continue
except Exception as e:
failed.append(f'Error in block_id {block_id} failed: {e.__class__.__name__}("{e}")')
progress.update(total_task,
description=f'[red]Filtered buildings from {nfiltered} out of {nblocks} blocks',
advance=1)
nfiltered+=1

except KeyboardInterrupt:
logger.info(f'Cancelling jobs. Please wait/allow for a graceful shutdown')
stop_event.set()
break
logger.info(f'{dst_lyr.GetFeatureCount()} feature were written to {out_path} ')
if failed:
for msg in failed:
logger.error(msg)

if __name__ == '__main__':
import time
httpx_logger = logging.getLogger('httpx')
httpx_logger.setLevel(100)
logging.basicConfig()
logger.setLevel(logging.INFO)

logger = util.setup_logger(name='rapida', level=logging.INFO, make_root=True)

src_path = '/tmp/bldgs1.fgb'
dst_path = '/tmp/bldgs1c.fgb'
mask = '/data/surge/surge/stats/floods_mask.tif'

src_path = '/data/surge/buildings_eqar.fgb'
dst_path = '/data/surge/bldgs1c.fgb'
mask = '/data/surge/surge/stats/floods_mask.tif'
filtered_buildings_path = '/data/surge/bldgs1_filtered.fgb'

start = time.time()


create_centroid(src_path=src_path, src_layer_name='buildings', dst_path=dst_path, dst_srs='EPSG:3857')
#create_centroid(src_path=src_path, src_layer_name='buildings', dst_path=dst_path, dst_srs='ESRI:54034')
filter_buildings(
buildings_path=src_path,
mask_path=mask,
mask_pixel_value=1,
horizontal_chunks=10,
vertical_chunks=20,
out_path=filtered_buildings_path
)

end = time.time()
print((end-start))
1 change: 1 addition & 0 deletions cbsurge/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ def setup_logger(name=None, make_root=True, level=logging.INFO):

if make_root:
logger = logging.getLogger()

else:
logger = logging.getLogger(name)
formatter = logging.Formatter(
Expand Down
Loading