forked from azavea/raster-vision
-
Notifications
You must be signed in to change notification settings - Fork 0
/
geojson.py
392 lines (303 loc) · 12.8 KB
/
geojson.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
from typing import (TYPE_CHECKING, Callable, Dict, Iterable, Iterator, List,
Optional, Union)
from copy import deepcopy
from shapely.geometry import shape, mapping
from shapely.affinity import translate
from tqdm.auto import tqdm
import geopandas as gpd
from rastervision.core.box import Box
if TYPE_CHECKING:
from rastervision.core.data.crs_transformer import CRSTransformer
from shapely.geometry.base import BaseGeometry
MULTI_GEOM_TYPES = {'MultiPolygon', 'MultiPoint', 'MultiLineString'}
PROGRESSBAR_DELAY_SEC = 5
def geometry_to_feature(mapping: dict,
properties: Optional[dict] = None) -> dict:
"""Convert a serialized geometry to a serialized GeoJSON feature."""
already_a_feature = mapping.get('type') == 'Feature'
if already_a_feature:
return mapping
if properties is None:
properties = {}
return {'type': 'Feature', 'geometry': mapping, 'properties': properties}
def geometries_to_geojson(geometries: Iterable[dict]) -> dict:
"""Convert serialized geometries to a serialized GeoJSON FeatureCollection.
"""
features = [geometry_to_feature(g) for g in geometries]
return features_to_geojson(features)
def features_to_geojson(features: List[dict]) -> dict:
"""Convert GeoJSON-like mapping of Features to a FeatureCollection."""
return {'type': 'FeatureCollection', 'features': features}
def map_features(func: Callable,
geojson: dict,
include_geom_types: Iterable[str] = [],
progressbar_kw: Optional[dict] = None) -> dict:
"""Map GeoJSON features to new features. Returns a new GeoJSON dict."""
features_in = geojson['features']
if progressbar_kw is None:
progressbar_kw = dict(desc='Transforming features')
with tqdm(
features_in,
delay=PROGRESSBAR_DELAY_SEC,
mininterval=0.5,
**progressbar_kw) as bar:
if len(include_geom_types) > 0:
include_geom_types = set(include_geom_types)
features_out = [
func(f)
if f['geometry']['type'] in include_geom_types else deepcopy(f)
for f in bar
]
else:
features_out = [func(f) for f in bar]
return features_to_geojson(features_out)
def map_geoms(func: Callable,
geojson: dict,
include_geom_types: Iterable[str] = [],
progressbar_kw: Optional[dict] = None) -> dict:
"""Map GeoJSON features to new features by applying func to geometries.
Returns a new GeoJSON dict.
"""
def feat_func(feature_in: dict) -> dict:
# to shapely geometry
geom_in = shape(feature_in['geometry'])
# apply func
geom_out = func(geom_in, feature=feature_in)
# back to dict
geom_out = mapping(geom_out)
# new feature with updated geometry
feature_out = geometry_to_feature(geom_out,
feature_in.get('properties', {}))
return feature_out
if progressbar_kw is None:
progressbar_kw = dict(desc='Transforming geoms')
return map_features(
feat_func,
geojson,
include_geom_types=include_geom_types,
progressbar_kw=progressbar_kw)
def geojson_to_geoms(geojson: dict) -> Iterator['BaseGeometry']:
"""Return the shapely geometry for each feature in the GeoJSON."""
geoms = (shape(f['geometry']) for f in geojson['features'])
return geoms
def geoms_to_geojson(geoms: Iterable['BaseGeometry'],
properties: Optional[Iterable[dict]] = None) -> dict:
"""Serialize shapely geometries to GeoJSON."""
if properties is None:
features = [geom_to_feature(g) for g in geoms]
else:
features = [geom_to_feature(g, p) for g, p in zip(geoms, properties)]
geojson = features_to_geojson(features)
return geojson
def geom_to_feature(geom: 'BaseGeometry',
properties: Optional[dict] = None) -> dict:
"""Serialize a single shapely geomety to a GeoJSON Feature."""
geomety = mapping(geom)
feature = geometry_to_feature(geomety, properties=properties)
return feature
def filter_features(func: Callable,
geojson: dict,
progressbar_kw: Optional[dict] = None) -> dict:
"""Filter GeoJSON features. Returns a new GeoJSON dict."""
features_in = geojson['features']
if progressbar_kw is None:
progressbar_kw = dict(desc='Filtering features.')
with tqdm(
features_in,
delay=PROGRESSBAR_DELAY_SEC,
mininterval=0.5,
**progressbar_kw) as bar:
features_out = list(filter(func, bar))
return features_to_geojson(features_out)
def is_empty_feature(f: dict) -> bool:
"""Check if a GeoJSON Feature lacks geometry info.
This was added to handle empty geoms which appear when using
OSM vector tiles.
Args:
f (dict): A GeoJSON-like mapping of a Feature.
Returns:
bool: Whether the feature contains any geometry.
"""
g: Optional[dict] = f.get('geometry')
if not g:
return True
no_geometries = not g.get('geometries')
no_coordinates = not g.get('coordinates')
is_empty = no_geometries and no_coordinates
return is_empty
def remove_empty_features(geojson: dict) -> dict:
"""Remove Features from a FeatureCollection that lack geometry info.
Args:
geojson (dict): A GeoJSON-like mapping of a FeatureCollection.
Returns:
dict: Filtered FeatureCollection.
"""
return filter_features(
lambda f: not is_empty_feature(f),
geojson,
progressbar_kw=dict(desc='Removing empty features'))
def split_multi_geometries(geojson: dict) -> dict:
"""Split any Features with multi-part geometries into multiple Features.
Args:
geojson (dict): A GeoJSON-like mapping of a FeatureCollection.
Returns:
dict: FeatureCollection without multi-part geometries.
"""
def split_geom(geom: 'BaseGeometry') -> List['BaseGeometry']:
# Split GeometryCollection into list of geoms.
if geom.geom_type == 'GeometryCollection':
geoms = list(geom)
else:
geoms = [geom]
# Split any MultiX to list of X.
new_geoms = []
for g in geoms:
if g.geom_type in MULTI_GEOM_TYPES:
new_geoms.extend(list(g.geoms))
else:
new_geoms.append(g)
return new_geoms
include_geom_types = set(['GeometryCollection', *MULTI_GEOM_TYPES])
all_geom_types = set(f['geometry']['type'] for f in geojson['features'])
if len(include_geom_types.intersection(all_geom_types)) == 0:
return geojson
new_features = []
with tqdm(
geojson['features'],
desc='Splitting multi-part geoms',
delay=PROGRESSBAR_DELAY_SEC,
mininterval=0.5) as bar:
for f in bar:
geom_type = f['geometry']['type']
if geom_type not in include_geom_types:
new_features.append(deepcopy(f))
continue
geom = shape(f['geometry'])
split_geoms = split_geom(geom)
for g in split_geoms:
new_feature = geometry_to_feature(
mapping(g), f.get('properties', {}))
new_features.append(new_feature)
return features_to_geojson(new_features)
def map_to_pixel_coords(geojson: dict,
crs_transformer: 'CRSTransformer') -> dict:
"""Convert a GeoJSON dict from map to pixel coordinates."""
return map_geoms(
lambda g, **kw: crs_transformer.map_to_pixel(g),
geojson,
progressbar_kw=dict(desc='Transforming to pixel coords'))
def pixel_to_map_coords(geojson: dict,
crs_transformer: 'CRSTransformer') -> dict:
"""Convert a GeoJSON dict from pixel to map coordinates."""
return map_geoms(
lambda g, **kw: crs_transformer.pixel_to_map(g),
geojson,
progressbar_kw=dict(desc='Transforming to map coords'))
def simplify_polygons(geojson: dict) -> dict:
"""Simplify polygon geometries by applying ``.buffer(0)``.
For Polygon geomtries, ``.buffer(0)`` can do the following:
1. *Sometimes* break up a polygon with "bowties" into multiple polygons.
(See https://github.com/shapely/shapely/issues/599.)
2. *Sometimes* "simplify" polygons. See shapely documentation for
:meth:`.BaseGeometry.buffer`.
Args:
geojson (dict): A GeoJSON-like mapping of a FeatureCollection.
Returns:
dict: FeatureCollection with simplified geometries.
"""
all_geom_types = set(f['geometry']['type'] for f in geojson['features'])
if 'Polygon' not in all_geom_types:
return geojson
geojson_buffered = map_geoms(
lambda g, **kw: g.buffer(0),
geojson,
include_geom_types=['Polygon'],
progressbar_kw=dict(desc='Simplifying polygons'))
geojson_cleaned = remove_empty_features(geojson_buffered)
geojson_split = split_multi_geometries(geojson_cleaned)
return geojson_split
def buffer_geoms(geojson: dict,
geom_type: str,
class_bufs: Dict[int, Optional[float]] = {},
default_buf: Optional[float] = 1) -> dict:
"""Buffer geometries.
Geometries in features without a class_id property will be ignored.
Args:
geojson (dict): A GeoJSON-like mapping of a FeatureCollection.
geom_type (str): Shapely geometry type to apply the buffering to. Other
types of geometries will not be affected.
class_bufs (Dict[int, Optional[float]]): Optional
mapping from class ID to buffer distance (in pixel units) for
geom_type geometries.
Returns:
dict: FeatureCollection with buffered geometries.
"""
def buffer_geom(geom: 'BaseGeometry',
feature: Optional[dict] = None) -> 'BaseGeometry':
has_class_id = (('properties' in feature)
and ('class_id' in feature['properties']))
if has_class_id:
class_id = feature['properties']['class_id']
buf = class_bufs.get(class_id, default_buf)
else:
buf = default_buf
# If buf for the class_id was explicitly set as None, don't buffer.
if buf is not None:
geom = geom.buffer(buf)
return geom
all_geom_types = set(f['geometry']['type'] for f in geojson['features'])
if geom_type not in all_geom_types:
return geojson
geojson_buffered = map_geoms(
buffer_geom,
geojson,
include_geom_types=[geom_type],
progressbar_kw=dict(desc=f'Buffering {geom_type}s (if any)'))
return geojson_buffered
def all_geoms_valid(geojson: dict):
"""Check if there are any invalid geometries in the GeoJSON."""
geoms = geojson_to_geoms(geojson)
return all(g.is_valid for g in geoms)
def get_polygons_from_uris(uris: Union[str, List[str]],
crs_transformer: 'CRSTransformer',
bbox: Optional['Box'] = None,
map_coords: bool = False) -> List['BaseGeometry']:
"""Load and return polygons (in pixel coords) from one or more URIs."""
# use local imports to avoid circular import problems
from rastervision.core.data import GeoJSONVectorSource
source = GeoJSONVectorSource(
uris=uris,
ignore_crs_field=True,
crs_transformer=crs_transformer,
bbox=bbox)
polygons = source.get_geoms(to_map_coords=map_coords)
return polygons
def merge_geojsons(geojsons: Iterable[dict]) -> dict:
"""Merge features from all given GeoJSONs into one GeoJSON."""
features = sum([g.get('features', []) for g in geojsons], [])
geojson_merged = features_to_geojson(features)
return geojson_merged
def geojson_to_geodataframe(geojson: dict) -> gpd.GeoDataFrame:
df = gpd.GeoDataFrame.from_features(geojson)
if len(df) == 0 and 'geometry' not in df.columns:
df.loc[:, 'geometry'] = []
return df
def get_geodataframe_extent(gdf: gpd.GeoDataFrame) -> Box:
envelope = gdf.unary_union.envelope
extent = Box.from_shapely(envelope).to_int()
return extent
def get_geojson_extent(geojson: dict) -> Box:
gdf = geojson_to_geodataframe(geojson)
extent = get_geodataframe_extent(gdf)
return extent
def filter_geojson_to_window(geojson: dict, window: Box) -> dict:
gdf = geojson_to_geodataframe(geojson)
window_geom = window.to_shapely()
gdf: gpd.GeoDataFrame = gdf[gdf.intersects(window_geom)]
out_geojson = gdf._to_geo()
return out_geojson
def geoms_to_bbox_coords(geoms: Iterable['BaseGeometry'],
bbox: Box) -> Iterator['BaseGeometry']:
xmin, ymin = bbox.xmin, bbox.ymin
out = (translate(p, xoff=-xmin, yoff=-ymin) for p in geoms)
return out