Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3342] Create a Cloud Bigtable IO connector for Python #8457

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
6fa024d
Initial version of Google Cloud Bigtable IO connector
mf2199 May 1, 2019
156358a
Changed import package name
mf2199 May 1, 2019
90697db
Fixed paranthesis formatting for 'print' commands (Python 3.x compati…
mf2199 May 1, 2019
ad9d3b9
changes to 'import' directives
mf2199 May 2, 2019
8cc6485
[no comments]
mf2199 May 2, 2019
99b0efd
experimenting with the code format - an attempt to tackle non-specifi…
mf2199 May 2, 2019
c2270cb
Partially undoing the latest changes.
mf2199 May 2, 2019
b8c85bb
Implemented the PTransform/DoFn scheme as a replacement to the standa…
mf2199 Jun 20, 2019
acc5a20
Chenged default num_workers value from 300 to 10
mf2199 Jun 25, 2019
8819ee8
Removed private classes from the interface; disabled the experiments …
mf2199 Jun 25, 2019
c4bd3ef
Merge branch 'master' into bigtableio
mf2199 Jul 1, 2019
89d9424
Addressing the PR coments
mf2199 Jul 2, 2019
c7688ef
Addressing the most recent code review comments & some additional cle…
mf2199 Aug 7, 2019
ffce22f
Removing relative import
mf2199 Aug 15, 2019
aa20d08
Fixing pylint errors
mf2199 Aug 15, 2019
057a7b0
Fixing pylint errors [cont-d]
mf2199 Aug 15, 2019
9b76efe
Fixing pylint errors [cont-d]
mf2199 Aug 15, 2019
c3b8b47
Fixing pylint errors [cont-d]
mf2199 Aug 15, 2019
614887e
Fixing pylint errors [cont-d]
mf2199 Aug 15, 2019
dd6c962
Fixing pylint errors [cont-d]
mf2199 Aug 16, 2019
3114d98
Fixing pylint and other errors [cont-d]
mf2199 Aug 16, 2019
727d54a
Refactored pipeline options
mf2199 Sep 13, 2019
b771645
Minor cleanup
mf2199 Sep 13, 2019
5414e0c
Minor refactoring
mf2199 Sep 18, 2019
b27cef5
no message
mf2199 Sep 18, 2019
ad33d95
WIP [2020.02.10]
mf2199 Feb 10, 2020
27f2a4a
WIP [2020.02.10]
mf2199 Feb 11, 2020
9eb031d
[WIP] Updates as per discussion with CBT team.
mf2199 Feb 24, 2020
cbfc64b
[WIP]
mf2199 Feb 24, 2020
ae7d712
[WIP]
mf2199 Feb 24, 2020
7acbacd
[WIP]
mf2199 Feb 24, 2020
9c54caa
[WIP]
mf2199 Feb 24, 2020
c3d4249
Code rearranged
mf2199 Apr 2, 2020
fdc426d
Updated docs
mf2199 Apr 2, 2020
1962866
Minor fix
mf2199 Apr 2, 2020
096953c
Minor fix
mf2199 Apr 2, 2020
a8878d5
Minor fix
mf2199 Apr 2, 2020
7eca091
Minor fix
mf2199 Apr 2, 2020
d8c0130
Formatting...
mf2199 Apr 2, 2020
3f6f37c
Merge branch 'master' into bigtableio
mf2199 Apr 3, 2020
e9874c4
Removed usage of `iobase.SourceBundle`
mf2199 Apr 6, 2020
0d964ac
refactoring
mf2199 Apr 10, 2020
bc2a8dd
Added unit tests
mf2199 Apr 10, 2020
7bb69e1
Minor lint fix
mf2199 May 8, 2020
2f3e37b
Merge branch 'master' of https://github.com/apache/beam into bigtableio
mf2199 Jun 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
165 changes: 146 additions & 19 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Expand Up @@ -37,16 +37,20 @@
"""
from __future__ import absolute_import

from collections import namedtuple

import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.metrics import Metrics
from apache_beam.transforms import core
from apache_beam.transforms.display import DisplayDataItem

try:
from google.cloud.bigtable import Client
except ImportError:
pass
Client = None

__all__ = ['WriteToBigTable']
__all__ = ['ReadFromBigTable', 'WriteToBigTable']


class _BigTableWriteFn(beam.DoFn):
Expand All @@ -67,27 +71,27 @@ def __init__(self, project_id, instance_id, table_id):
table_id(str): GCP Table to write the `DirectRows`
"""
super(_BigTableWriteFn, self).__init__()
self.beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id}
self._beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id}
self.table = None
self.batcher = None
self.written = Metrics.counter(self.__class__, 'Written Row')

def __getstate__(self):
return self.beam_options
return self._beam_options

def __setstate__(self, options):
self.beam_options = options
self._beam_options = options
self.table = None
self.batcher = None
self.written = Metrics.counter(self.__class__, 'Written Row')

def start_bundle(self):
if self.table is None:
client = Client(project=self.beam_options['project_id'])
instance = client.instance(self.beam_options['instance_id'])
self.table = instance.table(self.beam_options['table_id'])
client = Client(project=self._beam_options['project_id'])
instance = client.instance(self._beam_options['instance_id'])
self.table = instance.table(self._beam_options['table_id'])
self.batcher = self.table.mutations_batcher()

def process(self, row):
Expand All @@ -107,11 +111,11 @@ def finish_bundle(self):
self.batcher = None

def display_data(self):
return {'projectId': DisplayDataItem(self.beam_options['project_id'],
return {'projectId': DisplayDataItem(self._beam_options['project_id'],
label='Bigtable Project Id'),
'instanceId': DisplayDataItem(self.beam_options['instance_id'],
'instanceId': DisplayDataItem(self._beam_options['instance_id'],
label='Bigtable Instance Id'),
'tableId': DisplayDataItem(self.beam_options['table_id'],
'tableId': DisplayDataItem(self._beam_options['table_id'],
label='Bigtable Table Id')
}

Expand All @@ -122,22 +126,145 @@ class WriteToBigTable(beam.PTransform):
A PTransform that write a list of `DirectRow` into the Bigtable Table

"""
def __init__(self, project_id=None, instance_id=None,
table_id=None):
def __init__(self, project_id=None, instance_id=None, table_id=None):
""" The PTransform to access the Bigtable Write connector
Args:
project_id(str): GCP Project of to write the Rows
instance_id(str): GCP Instance to write the Rows
table_id(str): GCP Table to write the `DirectRows`
"""
super(WriteToBigTable, self).__init__()
self.beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id}
self._beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id}

def expand(self, pvalue):
beam_options = self.beam_options
beam_options = self._beam_options
return (pvalue
| beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))


class _BigtableReadFn(beam.DoFn):
""" Creates the connector that can read rows for Beam pipeline

Args:
project_id(str): GCP Project ID
instance_id(str): GCP Instance ID
table_id(str): GCP Table ID

"""

def __init__(self, project_id, instance_id, table_id, filter_=b''):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter_ should probably default to None since that is the default for read_rows if you pass in an empty bytestring like you do here, then you receive an error cause they do not have a to_pb function as expected of RowFilter type objects.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

""" Constructor of the Read connector of Bigtable
mf2199 marked this conversation as resolved.
Show resolved Hide resolved

Args:
project_id: [str] GCP Project of to write the Rows
instance_id: [str] GCP Instance to write the Rows
table_id: [str] GCP Table to write the `DirectRows`
filter_: [RowFilter] Filter to apply to columns in a row.
"""
super(self.__class__, self).__init__()
self._initialize({'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id,
'filter_': filter_})

def __getstate__(self):
return self._beam_options

def __setstate__(self, options):
self._initialize(options)

def _initialize(self, options):
self._beam_options = options
self.table = None
self.sample_row_keys = None
self.row_count = Metrics.counter(self.__class__.__name__, 'Rows read')

def start_bundle(self):
if self.table is None:
self.table = Client(project=self._beam_options['project_id'])\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: PEP 8 recommends using parenthesis for formatting instead of backslashes.

.instance(self._beam_options['instance_id'])\
.table(self._beam_options['table_id'])

def process(self, element, **kwargs):
mf2199 marked this conversation as resolved.
Show resolved Hide resolved
for row in self.table.read_rows(start_key=element.start_position,
end_key=element.end_position,
filter_=self._beam_options['filter_']):
self.row_count.inc()
yield row

def display_data(self):
return {'projectId': DisplayDataItem(self._beam_options['project_id'],
label='Bigtable Project Id'),
'instanceId': DisplayDataItem(self._beam_options['instance_id'],
label='Bigtable Instance Id'),
'tableId': DisplayDataItem(self._beam_options['table_id'],
label='Bigtable Table Id'),
'filter_': DisplayDataItem(str(self._beam_options['filter_']),
label='Bigtable Filter')
}


class ReadFromBigTable(beam.PTransform):
def __init__(self, project_id, instance_id, table_id, filter_=b''):
""" The PTransform to access the Bigtable Read connector

Args:
project_id: [str] GCP Project of to read the Rows
instance_id): [str] GCP Instance to read the Rows
table_id): [str] GCP Table to read the Rows
filter_: [RowFilter] Filter to apply to columns in a row.
"""
super(self.__class__, self).__init__()
self._beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id,
'filter_': filter_}

def __getstate__(self):
return self._beam_options

def __setstate__(self, options):
self._beam_options = options

def expand(self, pbegin):
from apache_beam.transforms import util

beam_options = self._beam_options
table = Client(project=beam_options['project_id'])\
.instance(beam_options['instance_id'])\
.table(beam_options['table_id'])
sample_row_keys = list(table.sample_row_keys())

if len(sample_row_keys) > 1 and sample_row_keys[0].row_key != b'':
SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
first_key = SampleRowKey(b'', 0)
sample_row_keys.insert(0, first_key)
sample_row_keys = list(sample_row_keys)

def split_source(unused_impulse):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method doesn't need to exist. I would think that you could use bundles

Copy link
Author

@mf2199 mf2199 Jul 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the way it's implemented in Apache Beam iobase.Read(PTransform). The FlatMap needs a callable object to process the elements in parallel, and the split_source makes up that callable. I'd also suggest we use similar naming convention for better unification/readability. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Would it make sense to call this _split_source?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed it too. The iobase.Read version doesn't have the underscore. It's whether we prefer the "proper" underscored way or the "unified" non-underscored one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a chat with Cham, and he rightly suggested it would be good to have unit testing of this splitting function, as it could cause data being dropped if it has any bugs - so maybe make it static, and add tests?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on a unit test.

bundles = []
for i in range(1, len(sample_row_keys)):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case that sample_row_keys is length 1, split_source will return an empty list and no data will be processed from the table. In addition, the documentation for table says The table might have contents before the first row key in the list and after the last one which I dont think is taken into consideration here https://googleapis.dev/python/bigtable/latest/table.html

What you may want to do is only do the Reshuffle + FlatMap if sample_row_keys is > 1. That worked for me.

Copy link
Contributor

@sduskis sduskis Sep 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

["", "") means read the entire table. "" is used as a placeholder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

split_source must return shuffle([["", <key1>), [[<key1>, <key2>), ... [<keyN>, ""))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sample_row_keys() on line 240 returns a list of length one, then line 250 (in split_source) will be skipped (as it is trying to iterate over range(1,1) and shuffle will be called as shuffle([]) (which my python shell returns as None).

Maybe line 242 should read as if len(sample_row_keys) >= 1 and sample_row_keys[0].row_key != b''

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I dont see a place where [, ""] is appended to sample_row_keys.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might have been implemented incorrectly. 'sample_row_keys' is implemented in the outer scope but 'split_source' is used as input to a FlatMap transform (hence split_source will be come a function wrapping a DoFn). Instead of using FlatMap here implement a DoFn class and provide whatever state that it needs to perform splitting to the constructor of the DoFn. Also note that this class (with it's state) has to be picklable.

One difference between Direct and Dataflow runners is that Dataflow runner pickles DoFn objects to send to remote workers. That might explain why your code doesn't work for Dataflow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chamikaramj It might, but then how could it work a few months back? The code below works now, with Dataflow, albeit from a standalone file, meaning that pickling is probably not an issue:

... [imports and definitions] ...

class _BigTableReadFn(beam.DoFn):
	def __init__(self, project_id, instance_id, table_id, filter_=None):
		super(self.__class__, self).__init__()
		self._options = {'project_id': project_id, 'instance_id': instance_id, 'table_id': table_id, 'filter_': filter_}
		self._initialize()

	def _initialize(self):
		from apache_beam.metrics import Metrics
		self._table = None
		self._counter = Metrics.counter(self.__class__, 'Rows Read')

	def __getstate__(self):
		return self._options

	def __setstate__(self, options):
		self._initialize()
		self._options = options

	def start_bundle(self):
		from google.cloud.bigtable import Client
		if self._table is None:
			_client = Client(self._options['project_id'])
			_instance = _client.instance(self._options['instance_id'])
			self._table = _instance.table(self._options['table_id'])

	def process(self, source_bundle):
		_start_key = source_bundle.start_position
		_end_key = source_bundle.stop_position
		for row in self._table.read_rows(_start_key, _end_key):
			self._counter.inc()
			yield row

	def finish_bundle(self):
		pass

	def display_data(self):
		return {'projectId': DisplayDataItem(self._options['project_id'], label='Bigtable Project Id'),
				'instanceId': DisplayDataItem(self._options['instance_id'], label='Bigtable Instance Id'),
				'tableId': DisplayDataItem(self._options['table_id'], label='Bigtable Table Id')}

table = Client(PROJECT_ID, admin=True).instance(INSTANCE_ID).table(TABLE_ID)
keys = table.sample_row_keys()
key_list = list(keys)

SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
key_list.insert(0, SampleRowKey(b'', 0))

def bundles():
	for i in range(1, len(key_list)):
		key_1 = key_list[i - 1].row_key
		key_2 = key_list[i].row_key
		size = key_list[i].offset_bytes - key_list[i - 1].offset_bytes
		yield iobase.SourceBundle(size, None, key_1, key_2)

p = beam.Pipeline(options=p_options)
count = (p
		 | 'Bundles' >> beam.Create(bundles())
		 | 'Read' >> beam.ParDo(_BigTableReadFn(PROJECT_ID, INSTANCE_ID, TABLE_ID))
		 | 'Count' >> Count.Globally()
		 )

result = p.run()
result.wait_until_finish()

 ... [assertions] ...

When packaged, it becomes more like this:

class _BigTableReadFn(beam.DoFn):
  def __init__(self, project_id, instance_id, table_id, filter_=None):
    super(_BigTableReadFn, self).__init__()
    self._initialize({'project_id': project_id, 'instance_id': instance_id, 'table_id': table_id, 'filter_': filter_})

  def _initialize(self, options):
    self._options = options
    self._table = None
    self._counter = Metrics.counter(self.__class__, 'Rows Read')

  def __getstate__(self):
    return self._options

  def __setstate__(self, options):
    self._initialize(options)

  def start_bundle(self):
    if self._table is None:
      _client = Client(project=self._options['project_id'])
      _instance = _client.instance(self._options['instance_id'])
      # noinspection PyAttributeOutsideInit
      self._table = _instance.table(self._options['table_id'])

  def process(self, source_bundle):
    _start_key = source_bundle.start_position
    _end_key = source_bundle.stop_position
    for row in self._table.read_rows(_start_key, _end_key):
      self._counter.inc()
      yield row

  def display_data(self):
    return {'projectId': DisplayDataItem(self._options['project_id'], label='Bigtable Project Id'),
            'instanceId': DisplayDataItem(self._options['instance_id'], label='Bigtable Instance Id'),
            'tableId': DisplayDataItem(self._options['table_id'], label='Bigtable Table Id')}


class ReadFromBigTable(beam.PTransform):
  def __init__(self, project_id, instance_id, table_id, filter_=None):
    super(self.__class__, self).__init__()
    self._options = {'project_id': project_id,
                     'instance_id': instance_id,
                     'table_id': table_id,
                     'filter_': filter_}

  def expand(self, pbegin):
    table = Client(project=self._options['project_id'], admin=True) \
      .instance(instance_id=self._options['instance_id']) \
      .table(table_id=self._options['table_id'])

    keys = list(table.sample_row_keys())
    SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
    keys.insert(0, SampleRowKey(b'', 0))

    def bundles():
      for i in range(1, len(keys)):
        key_1 = keys[i - 1].row_key
        key_2 = keys[i].row_key
        size = keys[i].offset_bytes - keys[i - 1].offset_bytes
        yield iobase.SourceBundle(size, None, key_1, key_2)

    return (pbegin
            | 'Bundles' >> beam.Create(iter(bundles()))
            | 'Reshuffle' >> util.Reshuffle()
            | 'Read' >> beam.ParDo(_BigtableReadFn(self._options['project_id'], self._options['instance_id'], self._options['table_id']))
            )

The latter breaks with Dataflow while still running under Direct. As you can see, the logic is nearly identical, suggesting that some magic might happen during [un]packaging.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "when packaged" ?

Note that if you are trying to utilize other modules that are not included with SDK, you have to build a local tarball before testing.

python setup.py sdist

When running include the SDK tarball with your module using --sdk_location option (since the default tarball downloaded by Dataflow will not have your module). For example, for worcount,
python -m apache_beam.examples.wordcount --input <input file path in GCS> --output <output path in GCS> --runner DataflowRunner --temp_location <temp location in GCS> --project <project> --sdk_location <local path to beam>/beam/sdks/python/dist/apache-beam-2.17.0.dev0.tar.gz

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"When packaged" means when packed into a local tarball, precisely via python setup.py sdist.

So far we've been using extra_package option, and it seems to work fine for the write part, and at some point worked for the read part too. The main functional difference between the write and read tests is that for the write part the rows are generated locally [within the test script], then sent to workers without intermediate steps, while during the read sequence we first try splitting the database based on the sample row keys within the Dataflow engine, and then read the chunks of rows, as you may see from the sample script above.

But I don't disagree, we could try specifying the --sdk_location option, see what happens.

key_1 = sample_row_keys[i - 1].row_key
key_2 = sample_row_keys[i].row_key
size = sample_row_keys[i].offset_bytes \
- sample_row_keys[i - 1].offset_bytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this work with variable key sizes?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sample_row_keys offsets are predefined according to the Bigtable internal storage format (~800MB apart) and hence cannot be changed without changing the Bigtable core. On the other hand, the exact number may slightly differ from key pair to key pair and is almost guaranteed to be different for the last key pair, i.e. the end of the table. So, answering your question, yes, it most certainly will and had shown to work through multiple tests with different table sizes, by returning the proper count of the rows read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To @mf2199's point, sample row keys is a Cloud Bigtable core concept and is intended to be used this way. See https://github.com/googleapis/googleapis/blob/3c04bbedf1a1913507eca6dcb7ef9a1d849998f4/google/bigtable/v1/bigtable_service.proto#L47 for more detail.

bundles.append(iobase.SourceBundle(size, None, key_1, key_2))

from random import shuffle
# Shuffle randomizes reading locations for better performance
shuffle(bundles)
return bundles

return (pbegin
| core.Impulse()
| 'Split' >> core.FlatMap(split_source)
mf2199 marked this conversation as resolved.
Show resolved Hide resolved
| util.Reshuffle()
| 'Read Bundles' >> beam.ParDo(
_BigtableReadFn(project_id=beam_options['project_id'],
instance_id=beam_options['instance_id'],
table_id=beam_options['table_id'],
filter_=beam_options['filter_'])))