Skip to content

Commit

Permalink
feat(providers/google): add service_file support to GKEPodAsyncHook
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Jan 30, 2024
1 parent 8914e49 commit 3339adc
Showing 1 changed file with 43 additions and 36 deletions.
79 changes: 43 additions & 36 deletions airflow/providers/google/cloud/hooks/kubernetes_engine.py
Expand Up @@ -507,33 +507,37 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod:
:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
"""
async with Token(scopes=self.scopes) as token:
async with self.get_conn(token) as connection:
v1_api = async_client.CoreV1Api(connection)
pod: V1Pod = await v1_api.read_namespaced_pod(
name=name,
namespace=namespace,
)
return pod
async with self.service_file_as_context() as service_file: # type: ignore[attr-defined]
async with Token(scopes=self.scopes, service_file=service_file) as token:
async with self.get_conn(token) as connection:
v1_api = async_client.CoreV1Api(connection)
pod: V1Pod = await v1_api.read_namespaced_pod(
name=name,
namespace=namespace,
)
return pod

async def delete_pod(self, name: str, namespace: str):
"""Delete a pod.
:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
"""
async with Token(scopes=self.scopes) as token, self.get_conn(token) as connection:
try:
v1_api = async_client.CoreV1Api(connection)
await v1_api.delete_namespaced_pod(
name=name,
namespace=namespace,
body=client.V1DeleteOptions(),
)
except async_client.ApiException as e:
# If the pod is already deleted
if e.status != 404:
raise
async with self.service_file_as_context() as service_file: # type: ignore[attr-defined]
async with Token(scopes=self.scopes, service_file=service_file) as token, self.get_conn(
token
) as connection:
try:
v1_api = async_client.CoreV1Api(connection)
await v1_api.delete_namespaced_pod(
name=name,
namespace=namespace,
body=client.V1DeleteOptions(),
)
except async_client.ApiException as e:
# If the pod is already deleted
if e.status != 404:
raise

async def read_logs(self, name: str, namespace: str):
"""Read logs inside the pod while starting containers inside.
Expand All @@ -546,19 +550,22 @@ async def read_logs(self, name: str, namespace: str):
:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
"""
async with Token(scopes=self.scopes) as token, self.get_conn(token) as connection:
try:
v1_api = async_client.CoreV1Api(connection)
logs = await v1_api.read_namespaced_pod_log(
name=name,
namespace=namespace,
follow=False,
timestamps=True,
)
logs = logs.splitlines()
for line in logs:
self.log.info("Container logs from %s", line)
return logs
except HTTPError:
self.log.exception("There was an error reading the kubernetes API.")
raise
async with self.service_file_as_context() as service_file: # type: ignore[attr-defined]
async with Token(scopes=self.scopes, service_file=service_file) as token, self.get_conn(
token
) as connection:
try:
v1_api = async_client.CoreV1Api(connection)
logs = await v1_api.read_namespaced_pod_log(
name=name,
namespace=namespace,
follow=False,
timestamps=True,
)
logs = logs.splitlines()
for line in logs:
self.log.info("Container logs from %s", line)
return logs
except HTTPError:
self.log.exception("There was an error reading the kubernetes API.")
raise

0 comments on commit 3339adc

Please sign in to comment.