Skip to content

[BEAM-2405] Write to BQ using the streaming API#3288

Closed
sb2nov wants to merge 2 commits intoapache:masterfrom
sb2nov:BEAM-BQ-SINK
Closed

[BEAM-2405] Write to BQ using the streaming API#3288
sb2nov wants to merge 2 commits intoapache:masterfrom
sb2nov:BEAM-BQ-SINK

Conversation

@sb2nov
Copy link
Copy Markdown
Contributor

@sb2nov sb2nov commented Jun 2, 2017

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

  • Make sure the PR title is formatted like:
    [BEAM-<Jira issue #>] Description of pull request
  • Make sure tests pass via mvn clean verify.
  • Replace <Jira issue #> in the title with the actual Jira issue
    number, if there is one.
  • If this contribution is large, please file an Apache
    Individual Contributor License Agreement.

This should be ready for review so that we can test it with streaming pipelines

There are few followup items after this PR:

  • Integration with the Dataflow runner to work in both Batch and Streaming
  • Migrate the tornadoes example to use this and deprecate the sink interface

@coveralls
Copy link
Copy Markdown

Coverage Status

Coverage decreased (-0.01%) to 70.626% when pulling 4792096 on sb2nov:BEAM-BQ-SINK into 9cdae6c on apache:master.

@sb2nov
Copy link
Copy Markdown
Contributor Author

sb2nov commented Jun 3, 2017

R: @chamikaramj PTAL

@coveralls
Copy link
Copy Markdown

Coverage Status

Coverage remained the same at 70.64% when pulling 4533a80 on sb2nov:BEAM-BQ-SINK into 9cdae6c on apache:master.

Copy link
Copy Markdown
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

request = bigquery.BigqueryTablesInsertRequest(
projectId=project_id, datasetId=dataset_id, table=table)
response = self.client.tables.Insert(request)
logging.info("Created the table with id %s", table_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging.debug ?

"Created a table"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def __init__(self, table_id, dataset_id, project_id, batch_size, schema,
create_disposition, write_disposition, client):
self.table_id = table_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a doc comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

self._rows_buffer = []
# Transform the table schema into a bigquery.TableSchema instance.
if isinstance(self.schema, basestring):
# TODO(silviuc): Should add a regex-based validation of the format.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we still hoping to do this (TODO) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

if isinstance(self.schema, basestring):
# TODO(silviuc): Should add a regex-based validation of the format.
table_schema = bigquery.TableSchema()
schema_list = [s.strip(' ') for s in self.schema.split(',')]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s.strip()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

field_schema = bigquery.TableFieldSchema()
field_schema.name = field_name
field_schema.type = field_type
field_schema.mode = 'NULLABLE'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support other modes ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in the string schema input

create_disposition=self.create_disposition,
write_disposition=self.write_disposition,
client=self.test_client)
return pcoll | 'Write to BQ' >> ParDo(bigquery_write_fn)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BigQuery instead of BQ here ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

client.tables.Get.return_value = bigquery.Table(
tableReference=bigquery.TableReference(
projectId='project_id', datasetId='dataset_id', tableId='table_id'))
client.tabledata.InsertAll.return_value = \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we usually use () instead of \ for line breaks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

self.assertTrue(client.tables.Get.called)
self.assertTrue(client.tables.Insert.called)

def test_dofn_client_process_flush_not_called(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better name might be "test_dofn_client_process_performs_batching".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

fn.finish_bundle()
# InsertRows called in finish bundle
self.assertTrue(client.tabledata.InsertAll.called)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, add a test that writes zero records.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

schema_list = [s.strip(' ') for s in self.schema.split(',')]
for field_and_type in schema_list:
field_name, field_type = field_and_type.split(':')
field_schema = bigquery.TableFieldSchema()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add tests for schema handling logic here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Contributor Author

@sb2nov sb2nov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.

request = bigquery.BigqueryTablesInsertRequest(
projectId=project_id, datasetId=dataset_id, table=table)
response = self.client.tables.Insert(request)
logging.info("Created the table with id %s", table_id)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

self.create_disposition = create_disposition
self.write_disposition = write_disposition
self._rows_buffer = []
self._max_batch_size = batch_size or 500
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is based on what Java had

self._rows_buffer = []
# Transform the table schema into a bigquery.TableSchema instance.
if isinstance(self.schema, basestring):
# TODO(silviuc): Should add a regex-based validation of the format.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

if isinstance(self.schema, basestring):
# TODO(silviuc): Should add a regex-based validation of the format.
table_schema = bigquery.TableSchema()
schema_list = [s.strip(' ') for s in self.schema.split(',')]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have just liked to deprecate this string schema thing and asked everyone to create a table schema object but that is a larger change

field_schema = bigquery.TableFieldSchema()
field_schema.name = field_name
field_schema.type = field_type
field_schema.mode = 'NULLABLE'
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in the string schema input

fn.finish_bundle()
# InsertRows called in finish bundle
self.assertTrue(client.tabledata.InsertAll.called)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def __init__(self, table_id, dataset_id, project_id, batch_size, schema,
create_disposition, write_disposition, client):
self.table_id = table_id
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if isinstance(self.schema, basestring):
# TODO(silviuc): Should add a regex-based validation of the format.
table_schema = bigquery.TableSchema()
schema_list = [s.strip(' ') for s in self.schema.split(',')]
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

schema_list = [s.strip(' ') for s in self.schema.split(',')]
for field_and_type in schema_list:
field_name, field_type = field_and_type.split(':')
field_schema = bigquery.TableFieldSchema()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

class WriteToBigQuery(PTransform):

def __init__(self, table, dataset=None, project=None, schema=None,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@coveralls
Copy link
Copy Markdown

Coverage Status

Coverage decreased (-0.004%) to 70.63% when pulling 85ffcb2 on sb2nov:BEAM-BQ-SINK into a054550 on apache:master.

@chamikaramj
Copy link
Copy Markdown
Contributor

LGTM. Thanks.

@asfgit asfgit closed this in fdfd775 Jun 6, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants