-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Apache Airflow version
2.2.3 (latest released)
What happened
Operator logging not work.
For example, for a task with logging:

the log in webserver is:

no any log information are recorded.
In my view, the reason is that when a TaskInstance is loaded through session with DagRun.run_id in function _get_ti in airflow/cli/commands/task_command.py, ti._log object is not properly initialzed.

Then, when call 'ti.init_run_context', ti._log will be set as logging.getLogger(self.class.module + '.' + self.class.name) according to function log of class LoggingMixin in airflow/utils/log/logging_mixin.py. And the context of logging.getLogger('airflow.task') is not properly set.

Following, in function '_capture_task_logs' in airflow/cli/commands/task_command.py, the root logger configuration is replaced with the airflow.task configuration, which context is not set.

It means all the loging happen after the replacement cannot be recorded.
Possible way to fix:
In function refresh_from_task, initialize self._log like follows:

Moreover, it may be also addeed in function refresh_from_db
What you expected to happen
I wish the log looks like this:

How to reproduce
My config related to logging in airlfow.cfg:
[logging]
base_log_folder = /home/airflow/airflow/logs
remote_logging = False
remote_log_conn_id =
google_key_path =
remote_base_log_folder =
encrypt_s3_logs = False
logging_level = INFO
fab_logging_level = INFO
logging_config_class =
colored_console_log = True
colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
task_log_prefix_template =
log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
log_processor_filename_template = {{ filename }}.log
dag_processor_manager_log_location = /home/airflow/airflow/logs/dag_processor_manager/dag_processor_manager.log
task_log_reader = task
extra_logger_names =
worker_log_server_port = 8793
My DAG and task:
import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
def test_log():
import logging
logging.info("airflow.task >>> INFO logger test")
logging.warn("airflow.task >>> WARN logger test")
logging.error("airflow.task >>> ERROR logger test")
logging.critical("airflow.task >>> CRITICAL logger test")
with DAG(
'test_log',
description='test log',
schedule_interval=None,
start_date=datetime.datetime(2022, 1, 19),
catchup=False,
tags=['log'],
) as dag:
task = PythonOperator(
task_id='test_log_task',
python_callable=test_log,
dag=dag
)
Operating System
NAME="Ubuntu" VERSION="18.04.4 LTS (Bionic Beaver)" ID=ubuntu ID_LIKE=debian PRETTY_NAME="Ubuntu 18.04.4 LTS" VERSION_ID="18.04" HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="https://help.ubuntu.com/" BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/" PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy" VERSION_CODENAME=bionic UBUNTU_CODENAME=bionic
Versions of Apache Airflow Providers
None
Deployment
Virtualenv installation
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct