Skip to content

Commit

Permalink
Revert "Fix BIGQUERY_JOB_DETAILS_LINK_FMT in BigQueryConsoleLink (#…
Browse files Browse the repository at this point in the history
…31457)" (#31935)

This reverts commit c7072c0.
  • Loading branch information
potiuk committed Jun 15, 2023
1 parent 5c1e95d commit e6960f1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 77 deletions.
38 changes: 8 additions & 30 deletions airflow/providers/google/cloud/operators/bigquery.py
Expand Up @@ -58,7 +58,7 @@
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context

BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={project_id}:{location}:{job_id}"
BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}"


class BigQueryUIColors(enum.Enum):
Expand Down Expand Up @@ -90,17 +90,8 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_id_params = XCom.get_value(key="job_id_params", ti_key=ti_key)

return (
BIGQUERY_JOB_DETAILS_LINK_FMT.format(
job_id=job_id_params["job_id"],
project_id=job_id_params["project_id"],
location=job_id_params["location"],
)
if job_id_params
else ""
)
job_id = XCom.get_value(key="job_id", ti_key=ti_key)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else ""


@attr.s(auto_attribs=True)
Expand All @@ -119,16 +110,13 @@ def get_link(
*,
ti_key: TaskInstanceKey,
):
job_ids_params = XCom.get_value(key="job_id_params", ti_key=ti_key)
job_ids = job_ids_params["job_id"]
job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
if not job_ids:
return None
if len(job_ids) < self.index:
return None
job_id = job_ids[self.index]
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(
job_id=job_id, project_id=job_ids_params["project_id"], location=job_ids_params["location"]
)
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)


class _BigQueryDbHookMixin:
Expand Down Expand Up @@ -1196,13 +1184,7 @@ def execute(self, context: Context):
]
else:
raise AirflowException(f"argument 'sql' of type {type(str)} is neither a string nor an iterable")
job_id_params = {
"job_id": job_id,
"project_id": self.hook.project_id,
"location": self.location if self.location else "US",
}
context["task_instance"].xcom_push(key="job_id_params", value=job_id_params)
return job_id
context["task_instance"].xcom_push(key="job_id", value=job_id)

def on_kill(self) -> None:
super().on_kill()
Expand Down Expand Up @@ -2745,13 +2727,9 @@ def execute(self, context: Any):
persist_kwargs["dataset_id"] = table["datasetId"]
persist_kwargs["project_id"] = table["projectId"]
BigQueryTableLink.persist(**persist_kwargs)

self.job_id = job.job_id
job_id_params = {
"job_id": job_id,
"project_id": self.project_id or self.hook.project_id,
"location": self.location if self.location else "US",
}
context["ti"].xcom_push(key="job_id_params", value=job_id_params)
context["ti"].xcom_push(key="job_id", value=self.job_id)
# Wait for the job to complete
if not self.deferrable:
job.result(timeout=self.result_timeout, retry=self.result_retry)
Expand Down
69 changes: 22 additions & 47 deletions tests/providers/google/cloud/operators/test_bigquery.py
Expand Up @@ -83,10 +83,6 @@
}
TEST_TABLE = "test-table"
GCP_CONN_ID = "google_cloud_default"
TEST_JOB_ID_1 = "test-job-id"
TEST_JOB_ID_2 = "test-123"
TEST_FULL_JOB_ID = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_1}"
TEST_FULL_JOB_ID_2 = f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"


class TestBigQueryCreateEmptyTableOperator:
Expand Down Expand Up @@ -676,15 +672,11 @@ def test_bigquery_operator_extra_serialized_field_when_single_query(

# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleLink)
test_job_id_params = {
"job_id": TEST_JOB_ID_1,
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push("job_id_params", test_job_id_params)

ti.xcom_push("job_id", 12345)

url = simple_task.get_extra_links(ti, BigQueryConsoleLink.name)
assert url == f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
assert url == "https://console.cloud.google.com/bigquery?j=12345"

@pytest.mark.need_serialized_dag
def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
Expand Down Expand Up @@ -719,23 +711,17 @@ def test_bigquery_operator_extra_serialized_field_when_multiple_queries(
# Check DeSerialized version of operator link
assert isinstance(list(simple_task.operator_extra_links)[0], BigQueryConsoleIndexableLink)

test_job_id_params = {
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)

assert {"BigQuery Console #1", "BigQuery Console #2"} == simple_task.operator_extra_link_dict.keys()

assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== simple_task.get_extra_links(ti, "BigQuery Console #1")
assert "https://console.cloud.google.com/bigquery?j=123" == simple_task.get_extra_links(
ti, "BigQuery Console #1"
)

assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== simple_task.get_extra_links(ti, "BigQuery Console #2")
assert "https://console.cloud.google.com/bigquery?j=45" == simple_task.get_extra_links(
ti, "BigQuery Console #2"
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -754,9 +740,7 @@ def test_bigquery_operator_extra_link_when_missing_job_id(

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_operator_extra_link_when_single_query(
self,
mock_hook,
create_task_instance_of_operator,
self, mock_hook, create_task_instance_of_operator
):
ti = create_task_instance_of_operator(
BigQueryExecuteQueryOperator,
Expand All @@ -767,15 +751,11 @@ def test_bigquery_operator_extra_link_when_single_query(
)
bigquery_task = ti.task

test_job_id_params = {
"job_id": TEST_JOB_ID_1,
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, BigQueryConsoleLink.name)
job_id = "12345"
ti.xcom_push(key="job_id", value=job_id)

assert f"https://console.cloud.google.com/bigquery?j={job_id}" == bigquery_task.get_extra_links(
ti, BigQueryConsoleLink.name
)

@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
Expand All @@ -791,22 +771,17 @@ def test_bigquery_operator_extra_link_when_multiple_query(
)
bigquery_task = ti.task

test_job_id_params = {
"job_id": [TEST_JOB_ID_1, TEST_JOB_ID_2],
"project_id": TEST_GCP_PROJECT_ID,
"location": TEST_DATASET_LOCATION,
}
ti.xcom_push(key="job_id_params", value=test_job_id_params)
job_id = ["123", "45"]
ti.xcom_push(key="job_id", value=job_id)

assert {"BigQuery Console #1", "BigQuery Console #2"} == bigquery_task.operator_extra_link_dict.keys()

assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #1")
assert "https://console.cloud.google.com/bigquery?j=123" == bigquery_task.get_extra_links(
ti, "BigQuery Console #1"
)

assert (
f"https://console.cloud.google.com/bigquery?j={TEST_FULL_JOB_ID_2}"
== bigquery_task.get_extra_links(ti, "BigQuery Console #2")
assert "https://console.cloud.google.com/bigquery?j=45" == bigquery_task.get_extra_links(
ti, "BigQuery Console #2"
)


Expand Down

0 comments on commit e6960f1

Please sign in to comment.