Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Async BigqueryOperator #31

Closed
8 tasks
kaxil opened this issue Dec 27, 2021 · 16 comments
Closed
8 tasks

Implement Async BigqueryOperator #31

kaxil opened this issue Dec 27, 2021 · 16 comments
Assignees
Labels
area/async Deferrable/async operators

Comments

@kaxil
Copy link
Collaborator

kaxil commented Dec 27, 2021

Build async version of https://github.com/apache/airflow/blob/main/airflow/providers/google/cloud/operators/bigquery.py

Acceptance Criteria:

@phanikumv phanikumv changed the title Async BigqueryOperator Implement Async BigqueryOperator Dec 30, 2021
@phanikumv phanikumv added the area/async Deferrable/async operators label Dec 30, 2021
@phanikumv
Copy link
Collaborator

phanikumv commented Jan 13, 2022

Below classes need to be converted to their Async versions ( based on feasibility )

Operator API used Assignee Async Implementation Needed ? Comments
BigQueryCheckOperator @phanikumv Yes Performs checks against BigQuery. The BigQueryCheckOperator expects a sql query that will return a single row.
BigQueryValueCheckOperator @pankajastro Yes Performs a simple value check using sql code.
BigQueryIntervalCheckOperator @sunank200 Yes Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before.
BigQueryGetDataOperator https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list @rajaths010494 Yes Would require async but not gcloud aio not directly usable.Cannot get pid or query id to get status of long running tasks
BigQueryExecuteQueryOperator No NA(Deprecated)
BigQueryCreateEmptyTableOperator No
BigQueryCreateExternalTableOperator No
BigQueryDeleteDatasetOperator No
BigQueryCreateEmptyDatasetOperator No
BigQueryGetDatasetOperator No
BigQueryGetDatasetTablesOperator No
BigQueryPatchDatasetOperator No It only replaces fields that are provided in the submitted dataset resource.
BigQueryUpdateTableOperator @sunank200 No Cannot get pid or query id to get status of long running tasks. Changes some field in table as specified
BigQueryUpdateDatasetOperator No Changes some field in dataset as specified
BigQueryDeleteTableOperator No
BigQueryUpsertTableOperator https://cloud.google.com/bigquery/docs/reference/v2/tables#resource No If the table already exists, update the existing table if not create new. Since BigQuery does not natively allow table upserts, this is not an atomic operation.
BigQueryUpdateTableSchemaOperator https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema No Update fields within a schema for a given dataset and table.
BigQueryInsertJobOperator https://cloud.google.com/bigquery/docs/reference/v2/jobs @phanikumv Yes implementation completed

@phanikumv
Copy link
Collaborator

Getting the below api response from gcloud-aio library when I try to read a table. While the sync version of the BigQueryGetDataOperator is returning a list[Row] --> [Row(('100', '200'), {'col1': 0, 'col2': 1}), Row(('300', '400'), {'col1': 0, 'col2': 1})], this is returning a list --> [{'f': [{'v': '300'}, {'v': '400'}]}, {'f': [{'v': '100'}, {'v': '200'}]}]

Do we need to ensure that both Async and Sync versions of an Operator return same datatypes at the end of execution? Thoughts @kaxil

gcloud-aio response

{'kind': 'bigquery#getQueryResultsResponse', 'etag': 'Ycgra5bwYNpTA3Tgy9ICdw==', 'schema': {'fields': [{'name': 'col1', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'col2', 'type': 'STRING', 'mode': 'NULLABLE'}]}, 'jobReference': {'projectId': 'astronomer-airflow-providers', 'jobId': 'job_-2e9ok-6d9gWKmBnX3K92wLpF0km', 'location': 'US'}, 'totalRows': '2', 'rows': [{'f': [{'v': '300'}, {'v': '400'}]}, {'f': [{'v': '100'}, {'v': '200'}]}], 'totalBytesProcessed': '20', 'jobComplete': True, 'cacheHit': False}

@phanikumv
Copy link
Collaborator

Let's focus on the below 4 operators on priority:
BigQueryGetDataOperator
BigQueryExecuteQueryOperator
BigQueryCheckOperator
BigQueryInsertJobOperator

@phanikumv
Copy link
Collaborator

phanikumv commented Jan 19, 2022

Only the jobs api of the BigQuery submits a job asynchronously. Refer https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
It returns a job ID through which we can track the status of the request using the Triggerer.

This might become an issue for asynchronously calling the other BigQuery api's like https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list , because this api doesnt return any kind of processId or a jobId , once the request is submitted. Hence, need to find alternate approach for implementing asynchronous version of the BigQueryGetDataOperator

@phanikumv
Copy link
Collaborator

phanikumv commented Jan 21, 2022

When multiple queries are passed within a list, it doesnt work in OSS version of the BigQueryInsertJobOperator. It does work when the queries are passed as a single string.

BigQueryInsertJobOperator uses the Google Jobs API. if the user passes the queries within a list, it wont work because the Google Jobs API doesnt allow using an array to pass multiple queries (please refer to screenshot below).

multiple queries through bigquery jobs api- example.png

There used to be an operator called BigQueryExecuteQueryOperator , which supported passing multiple queries through a list, but this is deprecated now. @kaxil @dstandish - please opine on this observation


This is discussed and we agreed that we will add a docstring to the Operator to describe how to pass multiple queries - through a string separated by a semi-colon rather than a list.

@phanikumv
Copy link
Collaborator

BigQueryExecuteQueryOperator is deprecated. Hence no need to create an async version of it.

@phanikumv
Copy link
Collaborator

phanikumv commented Feb 8, 2022

Draft #47 for the BigQueryInsertJobOperator

@phanikumv
Copy link
Collaborator

phanikumv commented Feb 11, 2022

PR is ready for BigQueryInsertJobOperatorAsync

kaxil pushed a commit that referenced this issue Feb 14, 2022
Implements BigQueryInsertJobOperatorAsync which asynchronously submits jobs , generates a job id, and polls for job status using the job id on the Triggerer

part of #31
@sunank200
Copy link
Collaborator

sunank200 commented Feb 15, 2022

BigQueryUpdateTableOperator uses Table object and change specified fields of a table. This operator doesn't update the data in the table but instead changes the fields of Table.

I have tested the Method: tables.update and have attached the screenshot. In my opinion, Async implementation for this BigQueryUpdateTableOperator or any other operator which uses Method: tables of google cloud would not improve the performance as it just deals with Table fields rather than Tabledata

Screenshot 2022-02-15 at 7.47.32 PM.png

Also, it doesn't return any job-id or process-id as such which can be tracked on the google-cloud side. This is only available for Operators which use Method:Jobs

Moreover, gcloud-aio class Table uses Table object instead of TableData

All the operators which use google-cloud jobs API would require async implementation.

@phanikumv , @rajaths010494 , @kaxil

@rajaths010494
Copy link
Contributor

In OSS BigQueryGetDataOperator uses Table object using list_rows methods to fetch data.
In gcloud aio class Table uses Table object instead of TableData which doesn't have TableData List api implemented and also all the calls for table in gcloud uses aiohttp calls to the api where it awaits until the response is given rather than getting a pid or job-id (Method:Jobs responses with the id).
So if a long running get operation is made it awaits until the API get call is finished rather than giving a query id so that we can check for the status of the query if it's finished or not.

All the operators which use google-cloud jobs API would require async implementation.

@sunank200
Copy link
Collaborator

I have updated the table above with a field Async Implementation Needed and Comments with necessary details. Please let me know your views

@phanikumv
Copy link
Collaborator

My thought process for BigQueryGetDataOperator is that we’ll re-use the BigQueryInsertjobOperatorAsync to form a select * from table during run time, get a job id, and then poll it on the Trigger, because only the jobs API gives us the job ID. We can adopt same strategy for BigQueryCheckOperator, BigQueryValueCheckOperator and BigQueryIntervalCheckOperator

@phanikumv
Copy link
Collaborator

PR is ready for BigQueryCheckOperatorAsync

kaxil pushed a commit that referenced this issue Feb 23, 2022
Implement operator , hook and trigger to execute BigQueryCheckOperator in asynchronous mode

- Add BigQueryCheckOperatorAsync
- Use get query results API of gcloud-aio to retrieve the results
- Poll the results using the Triggerer and send data back to the Operator execute method.

Part of #31
@sunank200
Copy link
Collaborator

PR for BigQueryIntervalCheckOperatorAsync

kaxil pushed a commit that referenced this issue Feb 24, 2022
Add BigQueryGetDataOperatorAsync
Use get query results API of gcloud-aio to retrieve the results
Poll the results using the Triggerer and send data back to the Operator execute method.
Part of #31
@kaxil
Copy link
Collaborator Author

kaxil commented Feb 25, 2022

Can this be closed now, or are there any other pending implementations @phanikumv ?

@phanikumv
Copy link
Collaborator

Implementation of the below operators is done. Hence closing the story.

  1. BigQueryCheckOperator
  2. BigQueryValueCheckOperator
  3. BigQueryIntervalCheckOperator
  4. BigQueryGetDataOperator
  5. BigQueryInsertJobOperator

OlympuJupiter added a commit to OlympuJupiter/astronomer-providers that referenced this issue Nov 14, 2022
Implements BigQueryInsertJobOperatorAsync which asynchronously submits jobs , generates a job id, and polls for job status using the job id on the Triggerer

part of astronomer/astronomer-providers#31
OlympuJupiter added a commit to OlympuJupiter/astronomer-providers that referenced this issue Nov 14, 2022
Implement operator , hook and trigger to execute BigQueryCheckOperator in asynchronous mode

- Add BigQueryCheckOperatorAsync
- Use get query results API of gcloud-aio to retrieve the results
- Poll the results using the Triggerer and send data back to the Operator execute method.

Part of astronomer/astronomer-providers#31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/async Deferrable/async operators
Projects
None yet
Development

No branches or pull requests

4 participants