Skip to content

Commit

Permalink
Closes #1481
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Dec 2, 2016
2 parents a463f00 + 557a2f9 commit 2363ee5
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 1 deletion.
41 changes: 41 additions & 0 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Expand Up @@ -891,6 +891,47 @@ def filter_words(x):
p.run()


def model_datastoreio():
"""Using a Read and Write transform to read/write to Cloud Datastore.
URL: https://cloud.google.com/dataflow/model/datastoreio
"""

import uuid
from google.datastore.v1 import entity_pb2
from google.datastore.v1 import query_pb2
import googledatastore
import apache_beam as beam
from apache_beam.utils.options import PipelineOptions
from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore

project = 'my_project'
kind = 'my_kind'
query = query_pb2.Query()
query.kind.add().name = kind

# [START model_datastoreio_read]
p = beam.Pipeline(options=PipelineOptions())
entities = p | 'Read From Datastore' >> ReadFromDatastore(project, query)
# [END model_datastoreio_read]

# [START model_datastoreio_write]
p = beam.Pipeline(options=PipelineOptions())
musicians = p | 'Musicians' >> beam.Create(
['Mozart', 'Chopin', 'Beethoven', 'Bach'])

def to_entity(content):
entity = entity_pb2.Entity()
googledatastore.helper.add_key_path(entity.key, kind, str(uuid.uuid4()))
googledatastore.helper.add_properties(entity, {'content': unicode(content)})
return entity

entities = musicians | 'To Entity' >> beam.Map(to_entity)
entities | 'Write To Datastore' >> WriteToDatastore(project)
# [END model_datastoreio_write]


def model_bigqueryio():
"""Using a Read and Write transform to read/write to BigQuery.
Expand Down
9 changes: 8 additions & 1 deletion sdks/python/apache_beam/examples/snippets/snippets_test.py
Expand Up @@ -470,11 +470,18 @@ def test_model_textio(self):
['aa', 'bb', 'bb', 'cc', 'cc', 'cc'],
self.get_output(result_path, suffix='.csv'))

def test_model_datastoreio(self):
# We cannot test datastoreio functionality in unit tests therefore we limit
# ourselves to making sure the pipeline containing Datastore read and write
# transforms can be built.
# TODO(vikasrk): Expore using Datastore Emulator.
snippets.model_datastoreio()

def test_model_bigqueryio(self):
# We cannot test BigQueryIO functionality in unit tests therefore we limit
# ourselves to making sure the pipeline containing BigQuery sources and
# sinks can be built.
self.assertEqual(None, snippets.model_bigqueryio())
snippets.model_bigqueryio()

def _run_test_pipeline_for_options(self, fn):
temp_path = self.create_temp_file('aa\nbb\ncc')
Expand Down

0 comments on commit 2363ee5

Please sign in to comment.