Skip to content

Commit

Permalink
Fix BIGQUERY_JOB_DETAILS_LINK_FMT in BigQueryConsoleLink (#31457)
Browse files Browse the repository at this point in the history
Co-authored-by: Beata Kossakowska <bkossakowska@google.com>
Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 15, 2023
1 parent 6673a40 commit c7072c0
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 30 deletions.
38 changes: 30 additions & 8 deletions airflow/providers/google/cloud/operators/bigquery.py
Expand Up @@ -58,7 +58,7 @@
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context

BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}"
BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={project_id}:{location}:{job_id}"


class BigQueryUIColors(enum.Enum):
Expand Down Expand Up @@ -90,8 +90,17 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_id = XCom.get_value(key="job_id", ti_key=ti_key)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""
job_id_params = XCom.get_value(key="job_id_params", ti_key=ti_key)

return (
BIGQUERY_JOB_DETAILS_LINK_FMT.format(
job_id=job_id_params["job_id"],
project_id=job_id_params["project_id"],
location=job_id_params["location"],
)
if job_id_params
else ""
)


@attr.s(auto_attribs=True)
Expand All @@ -110,13 +119,16 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
job_ids_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
job_ids = job_ids_params["job_id"]
if not job_ids:
return None
if len(job_ids) < self.index:
return None
job_id = job_ids[self.index]
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(
job_id=job_id, project_id=job_ids_params["project_id"], location=job_ids_params["location"]
)


class _BigQueryDbHookMixin:
Expand Down Expand Up @@ -1184,7 +1196,13 @@ def execute(self, context: Context):
]
else:
raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
context["task_instance"].xcom_push(key="job_id", value=job_id)
job_id_params = {
"job_id": job_id,
"project_id": self.hook.project_id,
"location": self.location if self.location else "US",
}
context["task_instance"].xcom_push(key="job_id_params", value=job_id_params)
return job_id

def on_kill(self) -> None:
super().on_kill()
Expand Down Expand Up @@ -2727,9 +2745,13 @@ def execute(self, context: Any):
persist_kwargs["dataset_id"] = table["datasetId"]
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)

self.job_id = job.job_id
context["ti"].xcom_push(key="job_id", value=self.job_id)
job_id_params = {
"job_id": job_id,
"project_id": self.project_id or self.hook.project_id,
"location": self.location if self.location else "US",
}
context["ti"].xcom_push(key="job_id_params", value=job_id_params)
# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
Expand Down
69 changes: 47 additions & 22 deletions tests/providers/google/cloud/operators/test_bigquery.py
Expand Up @@ -83,6 +83,10 @@
}
TEST_TABLE = "test-table"
GCP_CONN_ID = "google_cloud_default"
TEST_JOB_ID_1 = "test-job-id"
TEST_JOB_ID_2 = "test-123"
TEST_FULL_JOB_ID = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
TEST_FULL_JOB_ID_2 = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"


class TestBigQueryCreateEmptyTableOperator:
Expand Down Expand Up @@ -672,11 +676,15 @@ def test_bigquery_operator_extra_serialized_field_when_single_query(

# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink)

ti.xcom_push("job_id", 12345)
test_job_id_params = {
"job_id": TEST_JOB_ID_1,
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push("job_id_params", test_job_id_params)

url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
assert url == "https://console.cloud.google.com/bigquery?j=12345"
assert url == f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"

@pytest.mark.need_serialized_dag
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
Expand Down Expand Up @@ -711,17 +719,23 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleIndexableLink)

job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)
test_job_id_params = {
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)

assert {"BigQuery Console #1", "BigQuery Console #2"} == simple_task.operator_extra_link_dict.keys()

assert "https://console.cloud.google.com/bigquery?j=123" == simple_task.get_extra_links(
ti, "BigQuery Console #1"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== simple_task.get_extra_links(ti, "BigQuery Console #1")
)

assert "https://console.cloud.google.com/bigquery?j=45" == simple_task.get_extra_links(
ti, "BigQuery Console #2"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== simple_task.get_extra_links(ti, "BigQuery Console #2")
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -740,7 +754,9 @@ def test_bigquery_operator_extra_link_when_missing_job_id(

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_operator_extra_link_when_single_query(
self, mock_hook, create_task_instance_of_operator
self,
mock_hook,
create_task_instance_of_operator,
):
ti = create_task_instance_of_operator(
BigQueryExecuteQueryOperator,
Expand All @@ -751,11 +767,15 @@ def test_bigquery_operator_extra_link_when_single_query(
)
bigquery_task = ti.task

job_id = "12345"
ti.xcom_push(key="job_id", value=job_id)

assert f"https://console.cloud.google.com/bigquery?j={job_id}" == bigquery_task.get_extra_links(
ti, BigQueryConsoleLink.name
test_job_id_params = {
"job_id": TEST_JOB_ID_1,
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -771,17 +791,22 @@ def test_bigquery_operator_extra_link_when_multiple_query(
)
bigquery_task = ti.task

job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)

test_job_id_params = {
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
assert {"BigQuery Console #1", "BigQuery Console #2"} == bigquery_task.operator_extra_link_dict.keys()

assert "https://console.cloud.google.com/bigquery?j=123" == bigquery_task.get_extra_links(
ti, "BigQuery Console #1"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #1")
)

assert "https://console.cloud.google.com/bigquery?j=45" == bigquery_task.get_extra_links(
ti, "BigQuery Console #2"
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #2")
)


Expand Down

0 comments on commit c7072c0

Please sign in to comment.