Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6553] A Python SDK sink that supports File Loads into BQ #7655

Merged
merged 11 commits into from
Feb 17, 2019

Conversation

pabloem
Copy link
Member

@pabloem pabloem commented Jan 28, 2019

This PR implements file loads into BQ from the Python SDK for Batch pipelines.

r: @chamikaramj

Open question

  • Need to improve documentation significantly before merging.

Features being implemented

  • Use of temporary tables for atomicity given a subset of load jobs failed
  • Support for multiple destinations on streaming - left for later PR.
  • File Loads transform has not been wired up to WriteToBigQuery
  • Support for time partitioning - left for later PR.
  • Support for multiple destinations
  • If a worker dies, and the same load job is triggered, it will fail because it willhave the same name as before.

Test results

@pabloem
Copy link
Member Author

pabloem commented Jan 29, 2019

Run Python Dataflow ValidatesRunner

1 similar comment
@pabloem
Copy link
Member Author

pabloem commented Jan 29, 2019

Run Python Dataflow ValidatesRunner

@pabloem
Copy link
Member Author

pabloem commented Jan 30, 2019

Run Python PostCommit

@chamikaramj
Copy link
Contributor

cc: @reuvenlax

@pabloem pabloem changed the title A Python SDK sink that supports File Loads into BQ [BEAM-6553] A Python SDK sink that supports File Loads into BQ Jan 30, 2019
@pabloem
Copy link
Member Author

pabloem commented Jan 31, 2019

Run Python PostCommit

@pabloem
Copy link
Member Author

pabloem commented Jan 31, 2019

Run Python PreCommit

@pabloem
Copy link
Member Author

pabloem commented Jan 31, 2019

Run Python PostCommit

1 similar comment
@pabloem
Copy link
Member Author

pabloem commented Feb 2, 2019

Run Python PostCommit

@pabloem
Copy link
Member Author

pabloem commented Feb 5, 2019

Fixed lint issue. Ready for review.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Pablo. This looks great.

@chamikaramj
Copy link
Contributor

Sorry approved by mistake. Just wanted to send comments.

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(resending as a change request)

@pabloem
Copy link
Member Author

pabloem commented Feb 7, 2019

@chamikaramj I've addressed comments. LMK if that is reasonable. Overview of changes:

  • Improved documentation
  • Job names are deterministic, so a reinsertion of the same job should fail.

sdks/python/apache_beam/io/gcp/bigquery.py Show resolved Hide resolved
sdks/python/apache_beam/io/gcp/bigquery.py Show resolved Hide resolved
job_references = [elm[1] for elm in dest_ids_list]

while True:
status = self._check_job_states(job_references)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equal_to([job_reference]), label='CheckJobs')


@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chamikaramj there are three integration tests added here. They test different kinds of functionality. They can be reduced to two, but I'd say two is the minimum.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed one IT. Two more are left with a number of unit tests.

@pabloem
Copy link
Member Author

pabloem commented Feb 8, 2019

Run Python PostCommit

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

_MAXIMUM_LOAD_SIZE = 15 * ONE_TERABYTE

# Big query only supports up to 10 thousand URIs for a single load job.
_MAXIMUM_SOURCE_URIS = 10*1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used ?



def _generate_file_prefix(pipeline_gcs_location):
# If a gcs location is provided to the pipeline, then we shall use that.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: GCS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm? GCS is what it says : )


copy_job_name = '%s_copy_%s_to_%s' % (
job_name_prefix,
_bq_uuid('%s:%s.%s' % (copy_from_reference.projectId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we end up creating the same UUID for the same destination table ? Will this result in jobs being rejected when we try to execute multiple jobs against the same destination table ? It'll be interesting to see what Java SDK currently does in this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is now an index for jobs to the same table, and this id is not regenerated.

_bq_uuid('%s:%s.%s' % (copy_from_reference.projectId,
copy_from_reference.datasetId,
copy_from_reference.tableId)),
_bq_uuid('%s:%s.%s' % (copy_to_reference.projectId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if the job already exists in BQ and use that if the job exists (it's important to make sure that the BQ ID is unique for this Beam/Dataflow job instance)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The id is generated once, and not recomputed, so it's always the same for a single pipeline execution. We've resolved this in person.

table_reference.tableId = job_name
yield pvalue.TaggedOutput(TriggerLoadJobs.TEMP_TABLES, table_reference)

job_reference = self.bq_wrapper.perform_load_job(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also check if the job exists before creating to be resilient to workitem/bundle failures.


@retry.with_exponential_backoff(
num_retries=MAX_RETRIES,
retry_filter=retry.retry_on_server_errors_and_timeout_filter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be careful about timeout filters. Timeout does not give any guarantees regarding whether the job was received by the BQ service or not.

@pabloem
Copy link
Member Author

pabloem commented Feb 14, 2019

Run Python PostCommit

@chamikaramj
Copy link
Contributor

Please resolve the conflicts.

@pabloem
Copy link
Member Author

pabloem commented Feb 16, 2019

Run Python PostCommit

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Pablo. This looks great. LGTM.

Please squash commits and self merge.

@pabloem pabloem merged commit cdea885 into apache:master Feb 17, 2019
@pabloem pabloem deleted the bqloads branch February 17, 2019 02:59
pabloem added a commit to pabloem/beam that referenced this pull request Feb 19, 2019
pabloem added a commit to pabloem/beam that referenced this pull request Feb 19, 2019
pabloem added a commit that referenced this pull request Feb 22, 2019
[BEAM-6711] [BEAM-6553] A Python SDK sink that supports File Loads into BQ (#7655)
Juta pushed a commit to Juta/beam that referenced this pull request Feb 25, 2019
Juta pushed a commit to Juta/beam that referenced this pull request Feb 25, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants