Skip to content

Commit

Permalink
Code transfer from apache#8457
Browse files Browse the repository at this point in the history
  • Loading branch information
mf2199 committed May 8, 2020
1 parent 956f2d7 commit 254913c
Show file tree
Hide file tree
Showing 3 changed files with 376 additions and 70 deletions.
154 changes: 84 additions & 70 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Expand Up @@ -15,30 +15,36 @@
# limitations under the License.
#

"""BigTable connector
This module implements writing to BigTable tables.
The default mode is to set row data to write to BigTable tables.
The syntax supported is described here:
https://cloud.google.com/bigtable/docs/quickstart-cbt
BigTable connector can be used as main outputs. A main output
(common case) is expected to be massive and will be split into
manageable chunks and processed in parallel. In the example below
we created a list of rows then passed to the GeneratedDirectRows
DoFn to set the Cells and then we call the BigTableWriteFn to insert
those generated rows in the table.
main_table = (p
| beam.Create(self._generate())
| WriteToBigTable(project_id,
instance_id,
table_id))
""" This module implements IO classes to read and write data on Cloud Bigtable.
Read from Bigtable
------------------
:class:`ReadFromBigtable` is a ``PTransform`` that reads from a configured
Bigtable source and returns a ``PCollection`` of Bigtable rows. To configure
Bigtable source, the project, instance, and table IDs need to be provided.
Example usage::
pipeline | ReadFromBigtable(project_id='my-project-id',
instance_id='my-instance',
table_id='my-table')
Write to Bigtable
-----------------
:class:`WriteToBigtable` is a ``PTransform`` that writes Bigtable rows to
configured sink, and the write is conducted through a series of Bigtable
mutations. If the rows already existed in the Bigtable table, it results in
an overwrite, otherwise new rows will be inserted.
Example usage::
pipeline | WriteToBigtable(project_id='my-ptoject',
instance_id='my-instance',
table_id='my-table')
There are no backward compatibility guarantees.
Everything in this module is experimental.
"""
# pytype: skip-file

from __future__ import absolute_import

import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.metrics import Metrics
from apache_beam.transforms import util
from apache_beam.transforms.display import DisplayDataItem
Expand All @@ -49,27 +55,27 @@
except ImportError:
pass

__all__ = ['WriteToBigTable', 'ReadFromBigtable']
__all__ = ['WriteToBigtable', 'ReadFromBigtable']


class _BigtableReadFn(beam.DoFn):
""" A DoFn to parallelize reading from a Bigtable table
def __init__(self, project_id, instance_id, table_id, filter_=None):
""" A DoFn to parallelize reading from a Bigtable table
:type project_id: str
:param project_id: The ID of the project used for Bigtable access
:type project_id: str
:param project_id: The ID of the project used for Bigtable access
:type instance_id: str
:param instance_id: The ID of the instance that owns the table.
:type instance_id: str
:param instance_id: The ID of the instance that owns the table.
:type table_id: str
:param table_id: The ID of the table.
:type table_id: str
:param table_id: The ID of the table.
:type filter_: :class:`.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads every column in
each row.
"""
def __init__(self, project_id, instance_id, table_id, filter_=None):
:type filter_: :class:`.RowFilter`
:param filter_: (Optional) The filter to apply to the contents of the
specified row(s). If unset, reads every column in
each row.
"""
super(self.__class__, self).__init__()
self._initialize({'project_id': project_id,
'instance_id': instance_id,
Expand Down Expand Up @@ -99,10 +105,8 @@ def start_bundle(self):
.instance(self._options['instance_id'])\
.table(self._options['table_id'])

def process(self, source_bundle):
_start_key = source_bundle.start_position
_end_key = source_bundle.stop_position
for row in self._table.read_rows(_start_key, _end_key):
def process(self, key_pair):
for row in self._table.read_rows(key_pair[0], key_pair[1]):
self._counter.inc()
yield row

Expand Down Expand Up @@ -134,6 +138,7 @@ def __init__(self, project_id, instance_id, table_id, filter_=None):
each row. If noe is provided, all rows are read by default.
"""
super(self.__class__, self).__init__()
self._keys = []
self._options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id,
Expand All @@ -145,25 +150,28 @@ def __getstate__(self):
def __setstate__(self, options):
self._options = options

def _chunks(self):
for i in range(1, len(self._keys)):
start_key = self._keys[i - 1].row_key
end_key = self._keys[i].row_key
yield [start_key, end_key]

def expand(self, pbegin):
table = Client(project=self._options['project_id'], admin=True) \
.instance(instance_id=self._options['instance_id']) \
.table(table_id=self._options['table_id'])

keys = list(table.sample_row_keys())
self._keys = list(table.sample_row_keys())
if len(self._keys) < 1:
raise ValueError('The list of Table.sample_row_keys is empty. A Bigtable'
' table must have at least one valid sample row key.')

# Creating sample row key to define starting read position of '0th' chunk
SampleRowKey = namedtuple("SampleRowKey", "row_key offset_bytes")
keys.insert(0, SampleRowKey(b'', 0))

def chunks():
for i in range(1, len(keys)):
key_1 = keys[i - 1].row_key
key_2 = keys[i].row_key
size = keys[i].offset_bytes - keys[i - 1].offset_bytes
yield iobase.SourceBundle(size, None, key_1, key_2)
self._keys.insert(0, SampleRowKey(b'', 0))

return (pbegin
| 'Bundles' >> beam.Create(iter(chunks()))
| 'Bundles' >> beam.Create(iter(self._chunks()))
| 'Reshuffle' >> util.Reshuffle()
| 'Read' >> beam.ParDo(_BigtableReadFn(self._options['project_id'],
self._options['instance_id'],
Expand All @@ -180,7 +188,6 @@ class _BigTableWriteFn(beam.DoFn):
table_id(str): GCP Table ID
"""

def __init__(self, project_id, instance_id, table_id):
""" Constructor of the Write connector of Bigtable
Args:
Expand All @@ -189,9 +196,11 @@ def __init__(self, project_id, instance_id, table_id):
table_id(str): GCP Table to write the `DirectRows`
"""
super(_BigTableWriteFn, self).__init__()
self.beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id}
self.beam_options = {
'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id
}
self.table = None
self.batcher = None
self.written = Metrics.counter(self.__class__, 'Written Row')
Expand Down Expand Up @@ -229,37 +238,42 @@ def finish_bundle(self):
self.batcher = None

def display_data(self):
return {'projectId': DisplayDataItem(self.beam_options['project_id'],
label='Bigtable Project Id'),
'instanceId': DisplayDataItem(self.beam_options['instance_id'],
label='Bigtable Instance Id'),
'tableId': DisplayDataItem(self.beam_options['table_id'],
label='Bigtable Table Id')
}
return {
'projectId': DisplayDataItem(
self.beam_options['project_id'], label='Bigtable Project Id'),
'instanceId': DisplayDataItem(
self.beam_options['instance_id'], label='Bigtable Instance Id'),
'tableId': DisplayDataItem(
self.beam_options['table_id'], label='Bigtable Table Id')
}


class WriteToBigTable(beam.PTransform):
class WriteToBigtable(beam.PTransform):
""" A transform to write to the Bigtable Table.
A PTransform that write a list of `DirectRow` into the Bigtable Table
"""
def __init__(self, project_id=None, instance_id=None,
table_id=None):
def __init__(self, project_id=None, instance_id=None, table_id=None):
""" The PTransform to access the Bigtable Write connector
Args:
project_id(str): GCP Project of to write the Rows
instance_id(str): GCP Instance to write the Rows
table_id(str): GCP Table to write the `DirectRows`
"""
super(WriteToBigTable, self).__init__()
self.beam_options = {'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id}
super(WriteToBigtable, self).__init__()
self.beam_options = {
'project_id': project_id,
'instance_id': instance_id,
'table_id': table_id
}

def expand(self, pvalue):
beam_options = self.beam_options
return (pvalue
| beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))
return (
pvalue
| beam.ParDo(
_BigTableWriteFn(
beam_options['project_id'],
beam_options['instance_id'],
beam_options['table_id'])))

0 comments on commit 254913c

Please sign in to comment.