Skip to content

Commit

Permalink
Fix accessing a GKE cluster through the private endpoint in `GKEStart…
Browse files Browse the repository at this point in the history
…PodOperator` (#31391)

* Fix accessing a GKE cluster through the private endpoint in `GKEStartPodOperator`

* Add a unit test for cluster info
  • Loading branch information
hussein-awala committed May 23, 2023
1 parent 45b6cfa commit c082aec
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
17 changes: 6 additions & 11 deletions airflow/providers/google/cloud/operators/kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ def __init__(
*,
location: str,
cluster_name: str,
use_internal_ip: bool | None = None,
use_internal_ip: bool = False,
project_id: str | None = None,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
Expand All @@ -433,15 +433,6 @@ def __init__(
)
is_delete_operator_pod = False

if use_internal_ip is not None:
warnings.warn(
f"You have set parameter use_internal_ip in class {self.__class__.__name__}. "
"In current implementation of the operator the parameter is not used and will "
"be deleted in future.",
AirflowProviderDeprecationWarning,
stacklevel=2,
)

if regional is not None:
warnings.warn(
f"You have set parameter regional in class {self.__class__.__name__}. "
Expand All @@ -457,6 +448,7 @@ def __init__(
self.cluster_name = cluster_name
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
self.use_internal_ip = use_internal_ip

self.pod: V1Pod | None = None
self._ssl_ca_cert: str | None = None
Expand Down Expand Up @@ -516,7 +508,10 @@ def fetch_cluster_info(self) -> tuple[str, str | None]:
project_id=self.project_id,
)

self._cluster_url = f"https://{cluster.endpoint}"
if not self.use_internal_ip:
self._cluster_url = f"https://{cluster.endpoint}"
else:
self._cluster_url = f"https://{cluster.private_cluster_config.private_endpoint}"
self._ssl_ca_cert = cluster.master_auth.cluster_ca_certificate
return self._cluster_url, self._ssl_ca_cert

Expand Down
26 changes: 26 additions & 0 deletions tests/providers/google/cloud/operators/test_kubernetes_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
TEMP_FILE = "tempfile.NamedTemporaryFile"
GKE_OP_PATH = "airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator"
CLUSTER_URL = "https://test-host"
CLUSTER_PRIVATE_URL = "https://test-private-host"
SSL_CA_CERT = "TEST_SSL_CA_CERT_CONTENT"


Expand Down Expand Up @@ -293,6 +294,31 @@ def test_execute_with_impersonation_service_chain_one_element(

fetch_cluster_info_mock.assert_called_once()

@pytest.mark.parametrize("use_internal_ip", [True, False])
@mock.patch(f"{GKE_HOOK_PATH}.get_cluster")
def test_cluster_info(self, get_cluster_mock, use_internal_ip):
get_cluster_mock.return_value = mock.MagicMock(
**{
"endpoint": "test-host",
"private_cluster_config.private_endpoint": "test-private-host",
"master_auth.cluster_ca_certificate": SSL_CA_CERT,
}
)
gke_op = GKEStartPodOperator(
project_id=TEST_GCP_PROJECT_ID,
location=PROJECT_LOCATION,
cluster_name=CLUSTER_NAME,
task_id=PROJECT_TASK_ID,
name=TASK_NAME,
namespace=NAMESPACE,
image=IMAGE,
use_internal_ip=use_internal_ip,
)
cluster_url, ssl_ca_cert = gke_op.fetch_cluster_info()

assert cluster_url == CLUSTER_PRIVATE_URL if use_internal_ip else CLUSTER_URL
assert ssl_ca_cert == SSL_CA_CERT


class TestGKEPodOperatorAsync:
def setup_method(self):
Expand Down

0 comments on commit c082aec

Please sign in to comment.