Skip to content

Commit

Permalink
Users can add Pub/Sub subscriptions to weather-mv pipelines. (#319)
Browse files Browse the repository at this point in the history
* Added the subscription as a CLI argument. Also, it's registered with PubSub.

* Updated documentation.

* rm example code...

* Fix pipeline tests.
  • Loading branch information
alxmrs committed Apr 24, 2023
1 parent 046f675 commit 374153b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
6 changes: 4 additions & 2 deletions weather_mv/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ _Common options_

* `-i, --uris`: (required) URI glob pattern matching input weather data, e.g. 'gs://ecmwf/era5/era5-2015-*.gb'.
* `--topic`: A Pub/Sub topic for GCS OBJECT_FINALIZE events, or equivalent, of a cloud bucket. E.g.
'projects/<PROJECT_ID>/topics/<TOPIC_ID>'.
'projects/<PROJECT_ID>/topics/<TOPIC_ID>'. Cannot be used with `--subscription`.
* `--subscription`: A Pub/Sub subscription for GCS OBJECT_FINALIZE events, or equivalent, of a cloud bucket. Cannot be
used with `--topic`.
* `--window_size`: Output file's window size in minutes. Only used with the `topic` flag. Default: 1.0 minute.
* `--num_shards`: Number of shards to use when writing windowed elements to cloud storage. Only used with the `topic`
flag. Default: 5 shards.
Expand Down Expand Up @@ -468,7 +470,7 @@ For a full list of how to configure the Dataflow pipeline, please review
to [Pub/Sub events for objects added to GCS](https://cloud.google.com/storage/docs/pubsub-notifications). This can be
used to automate ingestion into BigQuery as soon as weather data is disseminated. Another common use case it to
automatically create a down-sampled version of a dataset with `regrid`. To set up the Weather Mover with streaming
ingestion, use the `--topic` flag (see "Common options" above).
ingestion, use the `--topic` or `--subscription` flag (see "Common options" above).

Objects that don't match the `--uris` glob pattern will be filtered out of ingestion. This way, a bucket can contain
multiple types of data yet only have subsets processed with `weather-mv`.
Expand Down
17 changes: 12 additions & 5 deletions weather_mv/loader_pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ def pipeline(known_args: argparse.Namespace, pipeline_args: t.List[str]) -> None
with beam.Pipeline(argv=pipeline_args) as p:
if known_args.zarr:
paths = p
elif known_args.topic:
elif known_args.topic or known_args.subscription:

paths = (
p
# Windowing is based on this code sample:
# https://cloud.google.com/pubsub/docs/pubsub-dataflow#code_sample
| 'ReadUploadEvent' >> beam.io.ReadFromPubSub(known_args.topic)
| 'ReadUploadEvent' >> beam.io.ReadFromPubSub(known_args.topic, known_args.subscription)
| 'WindowInto' >> GroupMessagesByFixedWindows(known_args.window_size, known_args.num_shards)
| 'ParsePaths' >> beam.ParDo(ParsePaths(known_args.uris))
)
Expand Down Expand Up @@ -94,7 +95,10 @@ def run(argv: t.List[str]) -> t.Tuple[argparse.Namespace, t.List[str]]:
"a path to a Zarr.")
base.add_argument('--topic', type=str,
help="A Pub/Sub topic for GCS OBJECT_FINALIZE events, or equivalent, of a cloud bucket. "
"E.g. 'projects/<PROJECT_ID>/topics/<TOPIC_ID>'.")
"E.g. 'projects/<PROJECT_ID>/topics/<TOPIC_ID>'. Cannot be used with `--subscription`.")
base.add_argument('--subscription', type=str,
help='A Pub/Sub subscription for GCS OBJECT_FINALIZE events, or equivalent, of a cloud bucket. '
'Cannot be used with `--topic`.')
base.add_argument("--window_size", type=float, default=1.0,
help="Output file's window size in minutes. Only used with the `topic` flag. Default: 1.0 "
"minute.")
Expand Down Expand Up @@ -146,8 +150,11 @@ def run(argv: t.List[str]) -> t.Tuple[argparse.Namespace, t.List[str]]:
elif known_args.subcommand == 'earthengine' or known_args.subcommand == 'ee':
ToEarthEngine.validate_arguments(known_args, pipeline_args)

# If a topic is used, then the pipeline must be a streaming pipeline.
if known_args.topic:
# If a Pub/Sub is used, then the pipeline must be a streaming pipeline.
if known_args.topic or known_args.subscription:
if known_args.topic and known_args.subscription:
raise ValueError('only one argument can be provided at a time: `topic` or `subscription`.')

if known_args.zarr:
raise ValueError('streaming updates to a Zarr file is not (yet) supported.')

Expand Down
9 changes: 9 additions & 0 deletions weather_mv/loader_pipeline/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def setUp(self) -> None:
'infer_schema': False,
'num_shards': 5,
'topic': None,
'subscription': None,
'variables': [],
'window_size': 1.0,
'xarray_open_dataset_kwargs': {},
Expand Down Expand Up @@ -99,6 +100,10 @@ def test_topic_creates_a_streaming_pipeline(self):
_, pipeline_args = run(self.base_cli_args + '--topic projects/myproject/topics/my-topic'.split())
self.assertEqual(pipeline_args, ['--streaming', 'true', '--save_main_session', 'true'])

def test_subscription_creates_a_streaming_pipeline(self):
_, pipeline_args = run(self.base_cli_args + '--subscription projects/myproject/topics/my-topic'.split())
self.assertEqual(pipeline_args, ['--streaming', 'true', '--save_main_session', 'true'])

def test_accepts_json_string_for_xarray_open(self):
xarray_kwargs = dict(engine='cfgrib', backend_kwargs={'filter_by_keys': {'edition': 1}})
json_kwargs = json.dumps(xarray_kwargs)
Expand Down Expand Up @@ -126,6 +131,10 @@ def test_zarr_kwargs_must_come_with_zarr(self):
with self.assertRaisesRegex(ValueError, 'allowed with valid Zarr input URI'):
run(self.base_cli_args + ['--zarr_kwargs', json.dumps({"time": 100})])

def test_topic_and_subscription__mutually_exclusive(self):
with self.assertRaisesRegex(ValueError, '`topic` or `subscription`'):
run(self.base_cli_args + '--topic foo --subscription bar'.split())


class IntegrationTest(CLITests):
def test_dry_runs_are_allowed(self):
Expand Down

0 comments on commit 374153b

Please sign in to comment.