Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add priority parameter to BigQueryHook #30655

Merged
merged 14 commits into from Apr 24, 2023
Merged

Conversation

ying-w
Copy link
Contributor

@ying-w ying-w commented Apr 15, 2023

Currently priority= is available in run_query() function within BigQueryHook class but can't be set when class is created.

This causes problems when you're using SQLExecuteQueryOperator since priority is missing from class constructor, you won't be able to pass in with hook_params={"priority":"BATCH"}. Trying to workaround using hook_params={"api_resource_configs": {"query": {"priority": "BATCH"}}" will error on _api_resource_configs_duplication_check() since priority is defaulted to 'INTERACTIVE'.

This PR adds priority to BigQueryHook constructor and defaults to using that value for run_query()

@boring-cyborg boring-cyborg bot added area:providers provider:google Google (including GCP) related issues labels Apr 15, 2023
Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This need a unit test, especially for the missing location which is not detected in the current tests

if priority:
self.priority = priority

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should not update the class instance attribute:

# if you create a hook with default priority BATCH
hook = BigQueryHook(..., priority="BATCH")
# the you run a query with priority INTERACTIVE
hook.run_query(..., priority="INTERACTIVE")
# finally, if you run another query without priority argument, it should use the one
# provided on the instance creation (BATCH), but it is not the case
hook.run_query(....)  # self.priority is set to INTERACTIVE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK working off BigQueryHook() but was wondering if the preferred way to submit arbitrary sql is to leverage SQLExecuteQueryOperator() under commons.

I'm pretty new to airflow and wasn't sure when I should start with operator vs starting with hook. It didn't seem like there was an obvious way of passing in a modified hook into SQLExecuteQueryOperator()

If I use BigQueryInsertJobOperator() (the bigquery specific way of doing SQLExecuteQueryOperator()) then I would need to create a custom operator based on BigQueryInsertJobOperator() that overwrites _submit_job() since that calls BigQueryHook() and operator class also doesn't have priority as a class attribute.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just add a priority parameter to BigQueryInsertJobOperator as well? That way, you can instantiate the hook with it? Please default it in such a way where it is backwards compatible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In BigQueryInsertJobOperator priority is set by configuration. This field is similar to the api_resource_configs field within BigQueryHook

@@ -2133,7 +2140,8 @@ def run_query(

query_param_list: list[tuple[Any, str, str | bool | None | dict, type | tuple[type]]] = [
(sql, "query", None, (str,)),
(priority, "priority", "INTERACTIVE", (str,)),
(priority, "priority", priority, (str,)),
(location, "location", self.location, (str,)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to revert the location change, in insert_job it's not used in configuration but rather jobReference

@ying-w
Copy link
Contributor Author

ying-w commented Apr 22, 2023

@vchiapaikeo i think it's ready for another review

I think the ideal fix is that run_query() is deprecated and cursor should instead call insert_job(), however, need the former to parse api_resource_configs from hook and build this into configuration for insert_job()

@vchiapaikeo
Copy link
Contributor

I think the ideal fix is that run_query() is deprecated and cursor should instead call insert_job(), however, need the former to parse api_resource_configs from hook and build this into configuration for insert_job()

This makes sense to me. You are suggesting we replace this call to run_query to use insert_job instead right? Would it be possible to refactor the api_resource_configs parsing done in run_query and call that within execute?

Also, can we add a unit test or modify an existing one that shows no priority passed to the hook, the value of priority passed to insert_job within configuration, if it was passed it, and if it is set on the method itself?

Nice work so far!

Comment on lines -2082 to -2084
if location:
self.location = location

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is used in some other way to propagate location to the instance attribute. It isn't the right thing to do but at the same time, I'm worried that an operator was relying on this functionality - especially given the fact that there was a test around this. Can you check if callers of run_query were previously relying on it?

Copy link
Contributor Author

@ying-w ying-w Apr 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was just checking what other calls go to run_query() and in hooks/bigquery.py theres only 1 other call from deprecated BigQueryCursor.run_query which passes in *args, **kwargs so might rely on it

in operators/bigquery.py are 2 calls from within the deprecated BigQueryExecuteQueryOperator class just passing through values for execute() but doesn't pass in location. Many of these operators set self.location = location and then pass that to BigQueryHook

So getting rid of the call from cursor.execute() should remove calls to run_query() from non-deprecated functions

(i checked all references of run_query within airflow/providers/google/cloud/)

@ying-w
Copy link
Contributor Author

ying-w commented Apr 22, 2023

This makes sense to me. You are suggesting we replace this call to run_query to use insert_job instead right?

yup

Would it be possible to refactor the api_resource_configs parsing done in run_query and call that within execute?

yeah, just wasn't sure how much to put in one PR

btw, is there a way to test changes kinda like pip install -e .? I couldn't figure out how to 'install' the provider and instead modifying the code in python site-packages

@potiuk
Copy link
Member

potiuk commented Apr 22, 2023

Generally pip install -e should also include provider packages if you use INSTALL_PROVIDERS_FROM_SOURCES (see https://github.com/potiuk/airflow/blob/main/INSTALL) for example. And Any IDE will work fine wiht Providers automatically as they are part of the sources.

There is a bug about some problems with latest pip versions #30764 so it might not work for all cases or you might need to use older pip version.

Also you can use breeze environment if you want to use containerized environment - this is the same as used in CI.

All the options of setting up the env are also well described in CONTRIBUTING docs.

@ying-w
Copy link
Contributor Author

ying-w commented Apr 22, 2023

This makes sense to me. You are suggesting we replace this call to run_query to use insert_job instead right? Would it be possible to refactor the api_resource_configs parsing done in run_query and call that within execute?

On second thought, a lot of what run_query() is doing is parsing and validating api_resource_configs so I think there would be a lot of duplication if i refactored it. I think what happened was they moved from an ExecuteSQL(sql, api_resource_configs) abstraction to InsertJob(configuration={query: sql, <other config>: api_resource_configs}) abstraction and hook.run_query() links the two

Also, can we add a unit test or modify an existing one that shows no priority passed to the hook, the value of priority passed to insert_job within configuration, if it was passed it, and if it is set on the method itself?

Done @vchiapaikeo

@potiuk
Copy link
Member

potiuk commented Apr 23, 2023

LGTM: pending tests pass.

@potiuk
Copy link
Member

potiuk commented Apr 23, 2023

Tests failed though :)

@ying-w
Copy link
Contributor Author

ying-w commented Apr 23, 2023

I'll make some changes later tonight, it has to do with moving location out of configuration and being passed in as parameter

@potiuk potiuk merged commit 48c9625 into apache:main Apr 24, 2023
42 checks passed
@ying-w ying-w deleted the bigquery-priority branch April 24, 2023 17:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants