Skip to content

Commit

Permalink
Create guide for BigQuery operators (#8276)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandraabbas committed Apr 15, 2020
1 parent 54d3c9a commit b198a1f
Show file tree
Hide file tree
Showing 4 changed files with 414 additions and 6 deletions.
72 changes: 68 additions & 4 deletions airflow/providers/google/cloud/example_dags/example_bigquery.py
Expand Up @@ -26,10 +26,11 @@
from airflow import models
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryCreateExternalTableOperator,
BigQueryDeleteDatasetOperator, BigQueryDeleteTableOperator, BigQueryExecuteQueryOperator,
BigQueryGetDataOperator, BigQueryGetDatasetOperator, BigQueryGetDatasetTablesOperator,
BigQueryPatchDatasetOperator, BigQueryUpdateDatasetOperator, BigQueryUpsertTableOperator,
BigQueryCheckOperator, BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator,
BigQueryCreateExternalTableOperator, BigQueryDeleteDatasetOperator, BigQueryDeleteTableOperator,
BigQueryExecuteQueryOperator, BigQueryGetDataOperator, BigQueryGetDatasetOperator,
BigQueryGetDatasetTablesOperator, BigQueryIntervalCheckOperator, BigQueryPatchDatasetOperator,
BigQueryUpdateDatasetOperator, BigQueryUpsertTableOperator, BigQueryValueCheckOperator,
)
from airflow.providers.google.cloud.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator
from airflow.providers.google.cloud.operators.bigquery_to_gcs import BigQueryToGCSOperator
Expand All @@ -40,6 +41,7 @@

default_args = {"start_date": days_ago(1)}

# [START howto_operator_bigquery_query]
MOST_VALUABLE_INCOMING_TRANSACTIONS = """
SELECT
value, to_address
Expand All @@ -52,6 +54,7 @@
ORDER BY value DESC
LIMIT 1000
"""
# [END howto_operator_bigquery_query]

MOST_ACTIVE_PLAYERS = """
SELECT
Expand Down Expand Up @@ -91,6 +94,7 @@
tags=['example'],
) as dag:

# [START howto_operator_bigquery_execute_query]
execute_query = BigQueryExecuteQueryOperator(
task_id="execute_query",
sql=MOST_VALUABLE_INCOMING_TRANSACTIONS,
Expand All @@ -103,7 +107,9 @@
}
],
)
# [END howto_operator_bigquery_execute_query]

# [START howto_operator_bigquery_execute_query_list]
bigquery_execute_multi_query = BigQueryExecuteQueryOperator(
task_id="execute_multi_query",
sql=[MOST_VALUABLE_INCOMING_TRANSACTIONS, MOST_ACTIVE_PLAYERS],
Expand All @@ -116,7 +122,9 @@
}
],
)
# [END howto_operator_bigquery_execute_query_list]

# [START howto_operator_bigquery_execute_query_save]
execute_query_save = BigQueryExecuteQueryOperator(
task_id="execute_query_save",
sql=MOST_VALUABLE_INCOMING_TRANSACTIONS,
Expand All @@ -130,19 +138,23 @@
}
],
)
# [END howto_operator_bigquery_execute_query_save]

# [START howto_operator_bigquery_get_data]
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id="save_query_result",
max_results="10",
selected_fields="value,to_address",
)
# [END howto_operator_bigquery_get_data]

get_data_result = BashOperator(
task_id="get_data_result", bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\""
)

# [START howto_operator_bigquery_create_external_table]
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
Expand All @@ -151,6 +163,7 @@
skip_leading_rows=1,
schema_fields=[{"name": "name", "type": "STRING"}, {"name": "post_abbr", "type": "STRING"}],
)
# [END howto_operator_bigquery_create_external_table]

execute_query_external_table = BigQueryExecuteQueryOperator(
task_id="execute_query_external_table",
Expand All @@ -171,14 +184,17 @@
destination_cloud_storage_uris=["gs://{}/export-bigquery.csv".format(DATA_EXPORT_BUCKET_NAME)],
)

# [START howto_operator_bigquery_create_dataset]
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)
# [END howto_operator_bigquery_create_dataset]

create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
task_id="create_dataset_with_location",
dataset_id=LOCATION_DATASET_NAME,
location=BQ_LOCATION
)

# [START howto_operator_bigquery_create_table]
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
Expand All @@ -188,6 +204,7 @@
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
# [END howto_operator_bigquery_create_table]

create_table_with_location = BigQueryCreateEmptyTableOperator(
task_id="create_table_with_location",
Expand All @@ -199,6 +216,7 @@
],
)

# [START howto_operator_bigquery_create_view]
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=LOCATION_DATASET_NAME,
Expand All @@ -208,52 +226,68 @@
"useLegacySql": False
}
)
# [END howto_operator_bigquery_create_view]

get_empty_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_empty_dataset_tables",
dataset_id=DATASET_NAME
)

# [START howto_operator_bigquery_get_dataset_tables]
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables",
dataset_id=DATASET_NAME
)
# [END howto_operator_bigquery_get_dataset_tables]

# [START howto_operator_bigquery_delete_view]
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view", deletion_dataset_table="{}.test_view".format(DATASET_NAME)
)
# [END howto_operator_bigquery_delete_view]

# [START howto_operator_bigquery_delete_table]
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table", deletion_dataset_table="{}.test_table".format(DATASET_NAME)
)
# [END howto_operator_bigquery_delete_table]

# [START howto_operator_bigquery_get_dataset]
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
# [END howto_operator_bigquery_get_dataset]

get_dataset_result = BashOperator(
task_id="get_dataset_result",
bash_command="echo \"{{ task_instance.xcom_pull('get-dataset')['id'] }}\"",
)

# [START howto_operator_bigquery_patch_dataset]
patch_dataset = BigQueryPatchDatasetOperator(
task_id="patch_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"friendlyName": "Patched Dataset", "description": "Patched dataset"},
)
# [END howto_operator_bigquery_patch_dataset]

# [START howto_operator_bigquery_update_dataset]
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset", dataset_id=DATASET_NAME, dataset_resource={"description": "Updated dataset"}
)
# [END howto_operator_bigquery_update_dataset]

# [START howto_operator_bigquery_delete_dataset]
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
# [END howto_operator_bigquery_delete_dataset]

delete_dataset_with_location = BigQueryDeleteDatasetOperator(
task_id="delete_dataset_with_location",
dataset_id=LOCATION_DATASET_NAME,
delete_contents=True
)

# [START howto_operator_bigquery_upsert_table]
update_table = BigQueryUpsertTableOperator(
task_id="update_table", dataset_id=DATASET_NAME, table_resource={
"tableReference": {
Expand All @@ -262,6 +296,34 @@
"expirationTime": (int(time.time()) + 300) * 1000
}
)
# [END howto_operator_bigquery_upsert_table]

# [START howto_operator_bigquery_check]
check_count = BigQueryCheckOperator(
task_id="check_count",
sql="SELECT COUNT(*) FROM {}.save_query_result".format(DATASET_NAME),
use_legacy_sql=False,
)
# [END howto_operator_bigquery_check]

# [START howto_operator_bigquery_value_check]
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql="SELECT COUNT(*) FROM {}.save_query_result".format(DATASET_NAME),
pass_value=1000,
use_legacy_sql=False,
)
# [END howto_operator_bigquery_value_check]

# [START howto_operator_bigquery_interval_check]
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table="{}.save_query_result".format(DATASET_NAME),
days_back=1,
metrics_thresholds={'COUNT(*)': 1.5},
use_legacy_sql=False,
)
# [END howto_operator_bigquery_interval_check]

create_dataset >> execute_query_save >> delete_dataset
create_dataset >> get_empty_dataset_tables >> create_table >> get_dataset_tables >> delete_dataset
Expand All @@ -275,3 +337,5 @@
create_table >> create_view >> delete_view >> delete_table >> delete_dataset
create_dataset_with_location >> create_table_with_location >> delete_dataset_with_location
create_dataset >> create_table >> update_table >> delete_table >> delete_dataset
create_dataset >> execute_query_save >> check_count >> check_value >> \
check_interval >> delete_dataset

0 comments on commit b198a1f

Please sign in to comment.