Skip to content

Commit

Permalink
Merge pull request #32 from /issues/28
Browse files Browse the repository at this point in the history
fix GeoJSONSource, add example
  • Loading branch information
tjwebb committed Mar 9, 2022
2 parents f629da9 + cdae15a commit 403878b
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 3 deletions.
33 changes: 33 additions & 0 deletions geobeam/examples/README.md
Expand Up @@ -173,3 +173,36 @@ python -m geobeam.examples.shapefile_nfhl \
--dataset examples \
--table FLD_HAZ_AR
```

## `geojson_stormwater`

```
bq mk --table <dataset>.stormwater geobeam/examples/stormwater_schema.json
```

### Run Locally

```
python -m geobeam.examples.geojson_stormwater \
--runner DirectRunner \
--project <your project> \
--temp_location <your temp bucket> \
--gcs_url gs://geobeam/examples/Stormwater_Pipes.geojson \
--dataset examples \
--table stormwater \
```

### Run in Dataflow

```
python -m geobeam.examples.geojson_stormwater \
--runner DataflowRunner \
--project <your project> \
--temp_location <your temp bucket> \
--worker_harness_container_image gcr.io/dataflow-geobeam/example \
--experiment use_runner_v2 \
--service_account_email <your service account> \
--gcs_url gs://geobeam/examples/Stormwater_Pipes.geojson \
--dataset examples \
--table stormwater
```
59 changes: 59 additions & 0 deletions geobeam/examples/geojson_stormwater.py
@@ -0,0 +1,59 @@
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Example pipeline that loads a stormwater pipe system from a GeoJSON file
"""


def run(pipeline_args, known_args):
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery
from apache_beam.options.pipeline_options import PipelineOptions
from geobeam.io import GeoJSONSource
from geobeam.fn import format_record, make_valid, filter_invalid

pipeline_options = PipelineOptions([
'--experiments', 'use_beam_bq_sink',
] + pipeline_args)

with beam.Pipeline(options=pipeline_options) as p:
(p
| beam.io.Read(GeoJSONSource(known_args.gcs_url))
#| 'MakeValid' >> beam.Map(make_valid)
| 'FilterInvalid' >> beam.Filter(filter_invalid)
| 'FormatRecords' >> beam.Map(format_record)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
beam_bigquery.TableReference(
datasetId=known_args.dataset,
tableId=known_args.table),
method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))


if __name__ == '__main__':
import logging
import argparse

logging.getLogger().setLevel(logging.INFO)

parser = argparse.ArgumentParser()
parser.add_argument('--gcs_url')
parser.add_argument('--dataset')
parser.add_argument('--table')
known_args, pipeline_args = parser.parse_known_args()

run(pipeline_args, known_args)

122 changes: 122 additions & 0 deletions geobeam/examples/stormwater_schema.json
@@ -0,0 +1,122 @@
[
{
"mode": "NULLABLE",
"name": "geom",
"type": "GEOGRAPHY"
},
{
"mode": "NULLABLE",
"name": "UPSTREAM_STRUCTURE_1",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "PIPE_GEOMETRY",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "OWNER",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "DELETE_RECORD",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "DBASIN",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "UPSTREAM_INVERT",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "COMMENTS",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "PIPE_ID",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "CONDITION",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "PIPE_MATERIAL",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "VERTICAL_DIAMETER",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "YEAR_INSTALLED",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "DOWNSTREAM_ASSUMED_",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "MAJOR_WATERSHED",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "HORIZONTAL_DIAMETER",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "UPSTREAM_ASSUMED_",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "PIPE_SIZE",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "DOWNSTREAM_INVERT",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "DOWNSTREAM_STRUCTURE",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "OBJECTID",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "PIPE_LENGTH_FT",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "UPSTREAM_STRUCTURE",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "DOWNSTREAM_STRUCTURE_1",
"type": "STRING"
}
]
3 changes: 3 additions & 0 deletions geobeam/fn.py
Expand Up @@ -59,6 +59,9 @@ def filter_invalid(element):

props, geom = element

if len(geom['coordinates']) == 0:
return False

if geom is None:
return False

Expand Down
8 changes: 5 additions & 3 deletions geobeam/io.py
Expand Up @@ -369,7 +369,7 @@ def split_points_unclaimed(stop_pos):

range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)

collection = fiona.open(self.file_name)
collection = fiona.open(file_name)

is_wgs84, src_crs = _GeoSourceUtils.validate_crs(collection.crs, self.in_epsg, self.in_proj)

Expand Down Expand Up @@ -410,6 +410,7 @@ def __init__(self, file_pattern, skip_reproject=False,

super(GeoJSONSource, self).__init__(file_pattern)


class ESRIServerSource(filebasedsource.FileBasedSource):
"""A Beam FileBasedSource for reading layers from an ESRI ArcGIS Server.
Expand Down Expand Up @@ -447,7 +448,7 @@ def split_points_unclaimed(stop_pos):

range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed)

esri_dump = EsriDumper(self.file_name)
esri_dump = EsriDumper(file_name)

geojson = {
"type": "FeatureCollection",
Expand Down Expand Up @@ -492,7 +493,8 @@ def __init__(self, file_pattern, skip_reproject=False,
self.in_epsg = in_epsg
self.in_proj = in_proj

super(GeoJSONSource, self).__init__(file_pattern)
super(ESRIServerSource, self).__init__(file_pattern)


class _GeoSourceUtils():
"""Utility methods for the FileBasedSource reader classes"""
Expand Down

0 comments on commit 403878b

Please sign in to comment.