From 9d82c1eacc594a13cf45357ed077c2ece725a593 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Thu, 1 Dec 2016 10:27:05 -0800 Subject: [PATCH 1/2] snippet for datastoreio --- .../apache_beam/examples/snippets/snippets.py | 41 +++++++++++++++++++ .../examples/snippets/snippets_test.py | 7 ++++ 2 files changed, 48 insertions(+) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 891f464147107..d34a4e8bd4eab 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -891,6 +891,47 @@ def filter_words(x): p.run() +def model_datastoreio(): + """Using a Read and Write transform to read/write to Cloud Datastore. + + URL: https://cloud.google.com/dataflow/model/datastoreio + """ + + import uuid + from google.datastore.v1 import entity_pb2 + from google.datastore.v1 import query_pb2 + import googledatastore + import apache_beam as beam + from apache_beam.utils.options import PipelineOptions + from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore + from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore + + project = 'my_project' + kind = 'my_kind' + query = query_pb2.Query() + query.kind.add().name = kind + + # [START model_datastoreio_read] + p = beam.Pipeline(options=PipelineOptions()) + entities = p | 'Read From Datastore' >> ReadFromDatastore(project, query) + # [END model_datastoreio_read] + + # [START model_datastoreio_write] + p = beam.Pipeline(options=PipelineOptions()) + musicians = p | 'Musicians' >> beam.Create( + ['Mozart', 'Chopin', 'Beethoven', 'Bach']) + + def to_entity(content): + entity = entity_pb2.Entity() + googledatastore.helper.add_key_path(entity.key, kind, str(uuid.uuid4())) + googledatastore.helper.add_properties(entity, {'content': unicode(content)}) + return entity + + entities = musicians | "To Entity" >> beam.Map(to_entity) + entities | 'Write To Datastore' >> WriteToDatastore(project) + # [END model_datastoreio_write] + + def model_bigqueryio(): """Using a Read and Write transform to read/write to BigQuery. diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 72fccb242858f..4ab104bedec6d 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -470,6 +470,13 @@ def test_model_textio(self): ['aa', 'bb', 'bb', 'cc', 'cc', 'cc'], self.get_output(result_path, suffix='.csv')) + def test_model_datastoreio(self): + # We cannot test datastoreio functionality in unit tests therefore we limit + # ourselves to making sure the pipeline containing Datastore read and write + # transforms can be built. + # TODO(vikasrk): Expore using Datastore Emulator. + self.assertEqual(None, snippets.model_datastoreio()) + def test_model_bigqueryio(self): # We cannot test BigQueryIO functionality in unit tests therefore we limit # ourselves to making sure the pipeline containing BigQuery sources and From eff3d441106ffeb6858f98dbddca563878dc02a0 Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Thu, 1 Dec 2016 22:12:39 -0800 Subject: [PATCH 2/2] Address comments --- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/examples/snippets/snippets_test.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index d34a4e8bd4eab..a5d7438a2e5dd 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -927,7 +927,7 @@ def to_entity(content): googledatastore.helper.add_properties(entity, {'content': unicode(content)}) return entity - entities = musicians | "To Entity" >> beam.Map(to_entity) + entities = musicians | 'To Entity' >> beam.Map(to_entity) entities | 'Write To Datastore' >> WriteToDatastore(project) # [END model_datastoreio_write] diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 4ab104bedec6d..09b4ba4caa1ac 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -475,13 +475,13 @@ def test_model_datastoreio(self): # ourselves to making sure the pipeline containing Datastore read and write # transforms can be built. # TODO(vikasrk): Expore using Datastore Emulator. - self.assertEqual(None, snippets.model_datastoreio()) + snippets.model_datastoreio() def test_model_bigqueryio(self): # We cannot test BigQueryIO functionality in unit tests therefore we limit # ourselves to making sure the pipeline containing BigQuery sources and # sinks can be built. - self.assertEqual(None, snippets.model_bigqueryio()) + snippets.model_bigqueryio() def _run_test_pipeline_for_options(self, fn): temp_path = self.create_temp_file('aa\nbb\ncc')