-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Official Helm Chart version
1.3.0 (latest released)
Apache Airflow version
2.2.1
Kubernetes Version
1.19.14-gke.1900
Helm Chart configuration
executor: "KubernetesExecutor"
postgresql:
enabled: false
pgbouncer:
enabled: true
flower:
enabled: false
config:
core:
load_examples: 'False'
load_default_connections: 'False'
webserver:
expose_config: 'False'
logging:
remote_logging: 'True'
remote_log_conn_id: "gcs-conn-dev"
remote_base_log_folder: "gs://[REDACTED]/airflow_logs"
cleanup:
enabled: true
dags:
gitSync:
enabled: true
repo: ssh://git@github.com/[REDACTED]/[REDACTED].git
branch: airflow-dev
rev: HEAD
depth: 1
subPath: "airflow/dags"
sshKeySecret: airflow-git-ssh-secret
knownHosts: |
github.com ssh-rsa [REDACTED]==Docker Image customisations
Dockerfile
FROM apache/airflow:2.2.1-python3.9
SHELL ["/bin/bash", "-o", "pipefail", "-e", "-u", "-x", "-c"]
USER root
RUN apt-get update \
&& apt-get upgrade \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
USER airflow
COPY airflow/requirements.txt .
RUN pip install --upgrade --no-cache-dir -r requirements.txt && rm requirements.txtrequirements.txt
apache-airflow-providers-google==6.0
apache-airflow-providers-cncf-kubernetes==2.0
pandas==1.3
quandl==3.6.1What happened
I have simple tasks running with simple logs, sent to GCS via remote logging. But they don't show up in GCS.
What you expected to happen
I expect logs to be visible but they are not. I cannot set up a PMV in my k8s cluster, so instead I chose to use remote logging to persist the logs in GCS. I have verified that the permissions are correct and even tested it out. But whenever any task runs, no logs appear in GCS. So of course, when I click "Log" in the task afterwards, I get this, because the worker pod is already deleted, and GCS never got the logs shipped to it.
*** Unable to read remote log from gs://[REDACTED]/airflow_logs/logging_test/list_gcp_bucket_objects_in_dev/2021-12-19T17:44:14.483757+00:00/1.log
*** 404 GET https://storage.googleapis.com/download/storage/v1/b/[REDACTED]/o/airflow_logs%2Flogging_test%2Flist_gcp_bucket_objects_in_dev%2F2021-12-19T17%3A44%3A14.483757%2B00%3A00%2F1.log?alt=media: No such object: [REDACTED]/airflow_logs/logging_test/list_gcp_bucket_objects_in_dev/2021-12-19T17:44:14.483757+00:00/1.log: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)
*** Trying to get logs (last 100 lines) from worker pod loggingtestlistgcpbucketobjectsindev.6c24f1cc7ffe45a88c54afedeb ***
*** Unable to fetch logs from worker pod loggingtestlistgcpbucketobjectsindev.6c24f1cc7ffe45a88c54afedeb ***
(404)
Reason: Not Found
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'de670d54-6f87-4ce5-90f8-a6c161d70fe2', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Sun, 19 Dec 2021 18:16:45 GMT', 'Content-Length': '294'})
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"pods \\"loggingtestlistgcpbucketobjectsindev.6c24f1cc7ffe45a88c54afedeb\\" not found","reason":"NotFound","details":{"name":"loggingtestlistgcpbucketobjectsindev.6c24f1cc7ffe45a88c54afedeb","kind":"pods"},"code":404}\n'
How to reproduce
You will need a GCP account which I obviously cannot provide to reproduce.
Set up the Helm chart with the values.yaml override I provided.
Then run this Dag, I even added a long time.sleep(30) in case it needs some time to ship the logs before the pod is killed, but this didn't work.
import logging
import time
from airflow.models.dag import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.utils import dates
BUCKET ="some-redacted-gcs-bucket"
def ping_gcp(gcp_conn_id, bucket):
logging.debug("this is a test debug log")
logging.info("starting logging task")
logging.warning("this is a test warning log")
hook = GCSHook(gcp_conn_id=gcp_conn_id)
objs = hook.list(bucket)
logging.info(f"Here are the objects in GCS bucket: {objs}")
time.sleep(30)
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"start_date": dates.days_ago(1),
}
dag = DAG("logging_test", default_args=default_args)
task = PythonOperator(
dag=dag,
task_id="list_gcp_bucket_objects_in_dev",
python_callable=ping_gcp,
op_kwargs=dict(gcp_conn_id="gcs-conn-dev", bucket=BUCKET),
)Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct