diff --git a/geobeam/examples/README.md b/geobeam/examples/README.md index 5101be7..56a38a3 100644 --- a/geobeam/examples/README.md +++ b/geobeam/examples/README.md @@ -173,3 +173,36 @@ python -m geobeam.examples.shapefile_nfhl \ --dataset examples \ --table FLD_HAZ_AR ``` + +## `geojson_stormwater` + +``` +bq mk --table .stormwater geobeam/examples/stormwater_schema.json +``` + +### Run Locally + +``` +python -m geobeam.examples.geojson_stormwater \ + --runner DirectRunner \ + --project \ + --temp_location \ + --gcs_url gs://geobeam/examples/Stormwater_Pipes.geojson \ + --dataset examples \ + --table stormwater \ +``` + +### Run in Dataflow + +``` +python -m geobeam.examples.geojson_stormwater \ + --runner DataflowRunner \ + --project \ + --temp_location \ + --worker_harness_container_image gcr.io/dataflow-geobeam/example \ + --experiment use_runner_v2 \ + --service_account_email \ + --gcs_url gs://geobeam/examples/Stormwater_Pipes.geojson \ + --dataset examples \ + --table stormwater +``` diff --git a/geobeam/examples/geojson_stormwater.py b/geobeam/examples/geojson_stormwater.py new file mode 100644 index 0000000..848828d --- /dev/null +++ b/geobeam/examples/geojson_stormwater.py @@ -0,0 +1,59 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Example pipeline that loads a stormwater pipe system from a GeoJSON file +""" + + +def run(pipeline_args, known_args): + import apache_beam as beam + from apache_beam.io.gcp.internal.clients import bigquery as beam_bigquery + from apache_beam.options.pipeline_options import PipelineOptions + from geobeam.io import GeoJSONSource + from geobeam.fn import format_record, make_valid, filter_invalid + + pipeline_options = PipelineOptions([ + '--experiments', 'use_beam_bq_sink', + ] + pipeline_args) + + with beam.Pipeline(options=pipeline_options) as p: + (p + | beam.io.Read(GeoJSONSource(known_args.gcs_url)) + #| 'MakeValid' >> beam.Map(make_valid) + | 'FilterInvalid' >> beam.Filter(filter_invalid) + | 'FormatRecords' >> beam.Map(format_record) + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( + beam_bigquery.TableReference( + datasetId=known_args.dataset, + tableId=known_args.table), + method=beam.io.WriteToBigQuery.Method.FILE_LOADS, + write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, + create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)) + + +if __name__ == '__main__': + import logging + import argparse + + logging.getLogger().setLevel(logging.INFO) + + parser = argparse.ArgumentParser() + parser.add_argument('--gcs_url') + parser.add_argument('--dataset') + parser.add_argument('--table') + known_args, pipeline_args = parser.parse_known_args() + + run(pipeline_args, known_args) + diff --git a/geobeam/examples/stormwater_schema.json b/geobeam/examples/stormwater_schema.json new file mode 100644 index 0000000..35cf967 --- /dev/null +++ b/geobeam/examples/stormwater_schema.json @@ -0,0 +1,122 @@ +[ + { + "mode": "NULLABLE", + "name": "geom", + "type": "GEOGRAPHY" + }, + { + "mode": "NULLABLE", + "name": "UPSTREAM_STRUCTURE_1", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "PIPE_GEOMETRY", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "OWNER", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "DELETE_RECORD", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "DBASIN", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "UPSTREAM_INVERT", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "COMMENTS", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "PIPE_ID", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "CONDITION", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "PIPE_MATERIAL", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "VERTICAL_DIAMETER", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "YEAR_INSTALLED", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "DOWNSTREAM_ASSUMED_", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "MAJOR_WATERSHED", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "HORIZONTAL_DIAMETER", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "UPSTREAM_ASSUMED_", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "PIPE_SIZE", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "DOWNSTREAM_INVERT", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "DOWNSTREAM_STRUCTURE", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "OBJECTID", + "type": "INTEGER" + }, + { + "mode": "NULLABLE", + "name": "PIPE_LENGTH_FT", + "type": "FLOAT" + }, + { + "mode": "NULLABLE", + "name": "UPSTREAM_STRUCTURE", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "DOWNSTREAM_STRUCTURE_1", + "type": "STRING" + } +] diff --git a/geobeam/fn.py b/geobeam/fn.py index 5bd3322..a5d52a3 100644 --- a/geobeam/fn.py +++ b/geobeam/fn.py @@ -59,6 +59,9 @@ def filter_invalid(element): props, geom = element + if len(geom['coordinates']) == 0: + return False + if geom is None: return False diff --git a/geobeam/io.py b/geobeam/io.py index 282fc58..1815b12 100644 --- a/geobeam/io.py +++ b/geobeam/io.py @@ -369,7 +369,7 @@ def split_points_unclaimed(stop_pos): range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed) - collection = fiona.open(self.file_name) + collection = fiona.open(file_name) is_wgs84, src_crs = _GeoSourceUtils.validate_crs(collection.crs, self.in_epsg, self.in_proj) @@ -410,6 +410,7 @@ def __init__(self, file_pattern, skip_reproject=False, super(GeoJSONSource, self).__init__(file_pattern) + class ESRIServerSource(filebasedsource.FileBasedSource): """A Beam FileBasedSource for reading layers from an ESRI ArcGIS Server. @@ -447,7 +448,7 @@ def split_points_unclaimed(stop_pos): range_tracker.set_split_points_unclaimed_callback(split_points_unclaimed) - esri_dump = EsriDumper(self.file_name) + esri_dump = EsriDumper(file_name) geojson = { "type": "FeatureCollection", @@ -492,7 +493,8 @@ def __init__(self, file_pattern, skip_reproject=False, self.in_epsg = in_epsg self.in_proj = in_proj - super(GeoJSONSource, self).__init__(file_pattern) + super(ESRIServerSource, self).__init__(file_pattern) + class _GeoSourceUtils(): """Utility methods for the FileBasedSource reader classes"""