Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8458] Add option to set temp dataset in BigQueryIO.Read #9852

Merged
merged 7 commits into from Feb 26, 2020

Conversation

iht
Copy link
Contributor

@iht iht commented Oct 22, 2019

When using fromQuery, BigQueryIO creates a temp dataset and table to store the results of the query. Therefore, Beam requires permissions to create datasets just to be able to run a query, a very broad permission. With this option, BigQueryIO can write the temp results of the query to a pre-existing dataset, and therefore it only needs permissions to run queries and create tables (inside that dataset, not in general) to be able to use from Query.

For instance, if we use BigQueryIO.from like in this example:

PCollection<TableRow> rows = p.apply(BigQueryIO.readTableRows().from(tableSpec));

we only need to assign the role roles/bigquery.jobUser to the Apache Beam service (e.g. Dataflow) to be able to extract data from tableSpec.

However, if we want to read from a view, and we try the following code (where query is trying to extract data from a view) with that role, it will fail because it does not have permissions to create datasets (and fromQuery creates a temporary dataset and table):

PCollection<TableRow> rows = 
    p.apply(BigQueryIO.readTableRows().fromQuery(query).usingStandardSql());

So in order to use views with BigQueryIO and Apache Beam, we need to give the role roles/bigquery.user. This role is much broader than bigquery.jobUser, and it allows the pipeline to create as many datasets as it would like in the project where it is reading data from.

With this PR, I am adding a new option to BigQueryIO, withQueryTempDataset, that makes it possible to set an existing dataset to create the necessary temporary tables. Thus, users wanting to limit the amount of permissions granted to Apache Beam could create a dataset prior to creating the pipeline, and assign permissions in that dataset only to the Apache Beam service account.

This is a much narrower set of permissions for the pipeline, confining the permission of the pipeline to write in BigQuery only to the specified dataset. (compared to the current requirement to give permissions to create datasets anywhere in the project, in order to read from views).


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • [X ] Choose reviewer(s) and mention them in a comment (R: @username).
  • [X ] Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@iht
Copy link
Contributor Author

iht commented Oct 22, 2019

R: @chamikaramj

When using fromQuery, BigQueryIO creates a temp dataset to store the results of
the query. Therefore, Beam requires permissions to create datasets just to be
able to run a query. With this option, BigQueryIO can write the temp results of
the query to a pre-existing dataset, and therefore it only needs permissions to
run queries and create tables to be able to use from Query.
@stale
Copy link

stale bot commented Jan 8, 2020

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Jan 8, 2020
@iht
Copy link
Contributor Author

iht commented Jan 10, 2020

I will resolve the conflicts, and will add some more docs for this PR.

@stale stale bot removed the stale label Jan 10, 2020
@iht
Copy link
Contributor Author

iht commented Feb 2, 2020

(PR text updated to provide more details about the intent of this PR)

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

static TableReference createTempTableReference(String projectId, String jobUuid) {
String queryTempDatasetId = "temp_dataset_" + jobUuid;
static TableReference createTempTableReference(
String projectId, String jobUuid, Optional<String> queryTempDatasetIdOpt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just "tempDatasetId" should be good I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

* <p>Users can optionally specify a query priority using {@link
* TypedRead#withQueryPriority(TypedRead.QueryPriority)} and a geographic location where the query
* will be executed using {@link TypedRead#withQueryLocation(String)}. Query location must be
* specified for jobs that are not executed in US or EU, or if you are reading from an authorized
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to mention "withQueryTempDataset" here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I accidentally reformatted these lines, that's why they are in the PR. Let me see if I can undo these changes.

@@ -1342,16 +1354,23 @@ void cleanup(ContextContainer c) throws Exception {
BigQueryOptions options = c.getPipelineOptions().as(BigQueryOptions.class);
String jobUuid = c.getJobId();

Optional<String> queryTempDataset = Optional.ofNullable(getQueryTempDataset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If dataset is provided by the user we should try to validate (before pipeline submission) that it exists. (unless user specified withoutValidation())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, let me add that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit added for that. See also reply to another of your comments.

* needed. No other tables in the dataset will be modified. If your job does not have
* permissions to create a new dataset, and you want to use {@link #fromQuery(String)} (for
* instance, to read from a view), you should use this option. Remember that the dataset must
* exist and your job needs permissions to create and remove tables inside that dataset.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also make sure that any table that Beam create or delete dynamically does not conflict with an existing table in the Dataset (at runtime).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have just added two commits. If the user specifies the temp dataset and it is using fromQuery

  • check that the specified dataset exists
  • check that the destination table does not exist, to avoid overwriting any existing table in the dataset specified by the user (unlikely, due to the random generation of uuids for the temp tables, but not impossible)

@@ -68,13 +78,15 @@ private BigQueryQuerySourceDef(
Boolean useLegacySql,
BigQueryIO.TypedRead.QueryPriority priority,
String location,
String queryTempDataset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just "tempDatasetId"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

@iht
Copy link
Contributor Author

iht commented Feb 5, 2020

I am working on addressing the suggestions from the code review, and submitting additional commits. Still WIP. I will write again once I have submitted all the changes.

Only if the user is using fromQuery and not skipping validation
When the user specifies the temp dataset, Beam does not control its creation,
and it might write over an existing table if the generated table name collides
with an existing table (unlikely, but not impossible)
@iht
Copy link
Contributor Author

iht commented Feb 7, 2020

I have now addressed all your comments @chamikaramj

Please have a look at the new changes.

Thanks.

@aaltay
Copy link
Member

aaltay commented Feb 20, 2020

R: @chamikaramj / @pabloem -- could you please take a look?

@chamikaramj
Copy link
Contributor

LGTM. Thanks.

@chamikaramj
Copy link
Contributor

Retest this please

@chamikaramj
Copy link
Contributor

Run Java PostCommit

@chamikaramj
Copy link
Contributor

Run Dataflow ValidatesRunner

@chamikaramj
Copy link
Contributor

Run Java PreCommit

1 similar comment
@chamikaramj
Copy link
Contributor

Run Java PreCommit

@chamikaramj
Copy link
Contributor

Run JavaPortabilityApi PreCommit

@chamikaramj
Copy link
Contributor

Error for failing task seems to be unrelated.
Task 'javaPreCommitPortabilityApiJava11' not found in root project 'beam'.
org.gradle.execution.TaskSelectionException: Task 'javaPreCommitPortabilityApiJava11' not found in root project 'beam'

Merging.

@chamikaramj chamikaramj merged commit 2a4092d into apache:master Feb 26, 2020
@iht
Copy link
Contributor Author

iht commented Feb 26, 2020

Thank you!

@iht iht deleted the query_temp_dataset branch February 26, 2020 22:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants