Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't read S3 remote logs when using gevent/eventlent webserver workers. #8212

Closed
webster-chainalysis opened this issue Apr 8, 2020 · 66 comments · Fixed by #28283
Closed
Labels
kind:bug This is a clearly a bug provider:amazon-aws AWS/Amazon - related issues

Comments

@webster-chainalysis
Copy link

Hey everyone. I’ve upgraded to 1.10.9. It appears that the logging changes broke the functionality for reading S3 remote logs in the Web UI (writing is ok). In the change log it mentions that Airflow's logging mechanism has been refactored to uses Python’s builtin logging module:

[AIRFLOW-1611] Customize logging

I followed the directions in the changelog and created the following log config:

import os
import six

from airflow import AirflowException
from airflow.configuration import conf
from airflow.utils.file import mkdirs
from typing import Dict, Any

LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()

FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()

LOG_FORMAT = conf.get('core', 'LOG_FORMAT')

COLORED_LOG_FORMAT = conf.get('core', 'COLORED_LOG_FORMAT')

COLORED_LOG = conf.getboolean('core', 'COLORED_CONSOLE_LOG')

COLORED_FORMATTER_CLASS = conf.get('core', 'COLORED_FORMATTER_CLASS')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')

DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
    conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')

FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')

PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

FORMATTER_CLASS_KEY = '()' if six.PY2 else 'class'


#
# Getting this from environment because the changelog for 1.10.9 says to set
# the path of `REMOTE_BASE_LOG_FOLDER` explicitly in the config. The
# `REMOTE_BASE_LOG_FOLDER` key is not used anymore.
#
REMOTE_BASE_LOG_FOLDER = os.environ.get('AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow.task': {
            'format': LOG_FORMAT,
        },
        'airflow.processor': {
            'format': LOG_FORMAT,
        },
        'airflow_coloured': {
            'format': COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
            FORMATTER_CLASS_KEY: COLORED_FORMATTER_CLASS if COLORED_LOG else 'logging.Formatter'
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'airflow.task',
            'stream': 'ext://sys.stdout'
        },
        'file.task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'file.processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow.processor',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
        's3.task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow.task',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
    },
    'loggers': {
        '': {
            'handlers': ['console'],
            'level': LOG_LEVEL
        },
        'airflow': {
            'handlers': ['console'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.processor': {
            'handlers': ['file.processor'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
        'airflow.task': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task_runner': {
            'handlers': ['s3.task'],
            'level': LOG_LEVEL,
            'propagate': True,
        },
    }
}  # type: Dict[str, Any]

However, the task log reader is always defaulting to using the FileTaskHandler. This should not occur because I have the following settings in airflow.cfg:

remote_logging = True
remote_base_log_folder = s3://my-bucket-name
remote_log_conn_id = aws_default
task_log_reader = s3.task

The s3.task handler passed to the task_log_reader setting should be creating an instance of the S3TaskHandler class to read the task logs to from S3. This occurs when rendering the get_logs_with_metadata view in www/views.py.


Apache Airflow version: 1.10.9
Kubernetes version: 1.15

Environment:

  • Cloud provider or hardware configuration: AWS
  • OS (e.g. from /etc/os-release): python:3.7.6-buster docker image

What happened: Logs did not appear in the Airflow Web UI. The FileTaskHandler tries to fetch the file locally or from the worker on port 8793. However, the logs do not exist in either location since we are using the Kubernetes Executor. This produces the following errors messages:

*** Log file does not exist: /usr/local/airflow/logs/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log
*** Fetching from: http://MY_DAG_NAME-0dde5ff5a786437cb14234:8793/log/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='MY_DAG_NAME-0dde5ff5a786437cb14234', port=8793): Max retries exceeded with url: /log/MY_DAG_NAME/MY_TASK_NAME/2020-04-07T20:59:19.312402+00:00/6.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f708332fc90>: Failed to establish a new connection: [Errno -2] Name or service not known'))

What you expected to happen:

The logs should be rendered in the Web UI using the S3TaskHandler class.

@webster-chainalysis webster-chainalysis added the kind:bug This is a clearly a bug label Apr 8, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 8, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@dimon222
Copy link
Contributor

dimon222 commented May 6, 2020

@webster-chainalysis
I noticed such issue on 1.10.10. Do you also have same behaviour on this version?

@kaxil kaxil added this to the Airflow 1.10.11 milestone May 6, 2020
@webster-chainalysis
Copy link
Author

@dimon222 - Yes.

@dimon222
Copy link
Contributor

dimon222 commented May 8, 2020

@webster-chainalysis
Worker class switch in airflow.cfg from gevent/eventlet to "sync" seem to resolve the issue for me.

@webster-chainalysis
Copy link
Author

@dimon222 - That makes sense. I'm currently using gevent behind an Amazon elastic load balancer. When using the sync worker class with the ELB I had unacceptable performance.

@mik-laj mik-laj added the provider:amazon-aws AWS/Amazon - related issues label May 12, 2020
@TRReeve
Copy link

TRReeve commented May 12, 2020

I have had this issue as well on Airflow 10.10 and been tearing my hear out at it a couple of days now. I'm running using the KubernetesOperator and the logging seems to upload to S3 fine it is only the reading of the log back from the web ui that is always in local mode.

@TRReeve
Copy link

TRReeve commented May 16, 2020

Having gotten remote logging working on 1.10.10 I've noticed there seems to be a difference in how the workers handle the log upload versus the webserver component. The Connection you define needs to point to the same folder as the remote_base_logs_folder. E.g if your remote base logs folder is data/airflow/logs then your connection used for remote logging also needs to point to it e.g s3://access_key:secret@data/airflow/logs. You cannot just have a generic "AWS connection" and then have it figure out which folder it needs to point to on its own using the filepath from the worker.

@marclamberti
Copy link

@TRReeve I didn’t need to do that. Actually, if you want to have the remote logging working with the kubernetes executor, you have to define additional kubernetes environment variables so that your PODs are in sync with the scheduler/web server. Eg:
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_BASE_LOGS_FOLDER=s3://my-bucket/my-key
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=myawsconn
AIRFLOW__CORE__FERNET_KEY=myfernetkey

AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABALES__AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABALES__AIRFLOW__CORE__REMOTE_BASE_LOGS_FOLDER=s3://my-bucket/my-key
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABALES__AIRFLOW__CORE__REMOTE_LOG_CONN_ID=myawsconn
AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABALES__AIRFLOW__CORE__FERNET_KEY=myfernetkey

The connection myawsconn with type S3 and extra field

  • aws_access_key_id
  • aws_secret_acess_key_id
  • region_name

Also I didn’t need to change the task handler as it is automatically changed when remote logging is set to True, and I didn’t have to define a custom logging class.

Hope it helps :)

@TRReeve
Copy link

TRReeve commented May 17, 2020

@marclamberti Your answer is how I understood it would work as well and it half worked for me I would get the logs uploading fine into the S3 bucket but then when i went to "view logs" in the UI it would give the "logs not found" error with no output in the logs to indicate it was using the s3 connection or the read_key function to retrieve anything.

It would be really nice if I could just define AIRFLOW_CONN_S3_URI = s3://user:pass@S3 then have REMOTE_BASE_LOGS_FOLDER=s3://airflow-stuff/logs and the UI would build the path but I could only get logs uploading. My working helm template for airflow on k8s builds the connection s3://access_key:secret_key@{{ mys3path }} and then remote_log_path is s3://{{ mys3path }}. Aside that it's exactly the same as you defined above with the same variables defined under AIRFLOW__KUBERNETES__ENVIRONMENT_VARIABLES.

And yes I can confirm for anyone reading there was no need for any custom logging classes or task handlers.

@marclamberti
Copy link

marclamberti commented May 18, 2020

Well, in my case it works for both reading and writing logs from S3 with Airflow 1.10.10 and the Kubernetes executor
I agreed it could be simpler :)

@ashb
Copy link
Member

ashb commented Jun 3, 2020

@webster-chainalysis I've just opened #9118 to improve the debugging of S3 log handler.

Could you also try the debug steps I put in that description there -- lets rule out that sort of problem.

@prithvisathiya
Copy link

prithvisathiya commented Jul 5, 2020

Im also experiencing the exact same issue when I upgraded to 1.10.9, but Im still using LocalExecutor. I can clearly see from the S3 console that the logs are getting uploaded, but the Airflow UI is unable to read it back. It attempts to read the local folder and then just gives up with the following error:

*** Log file does not exist: /app/logs/dag_name/task_name/2020-07-05T19:21:09.715128+00:00/1.log
*** Fetching from: http://airflow-test-scheduler-5f8ccc76df-tc8j8:8793/log/dag_name/task_name/2020-07-05T19:21:09.715128+00:00/1.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='airflow-test-scheduler-5f8ccc76df-tc8j8', port=8793): Max retries exceeded with url: /log/dag_name/task_name/2020-07-05T19:21:09.715128+00:00/1.log 

My configurations below:

AIRFLOW__CORE__EXECUTOR=LocalExecutor
AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://{bucket}/logs/
AIRFLOW__CORE__REMOTE_LOGGING=True
AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws_default
AIRFLOW_CONN_AWS_DEFAULT=aws://?region_name=us-west-2

update: Also when I switched from gevent to sync worker class, it seemed to read fine again

@alanbrent
Copy link

alanbrent commented Jul 8, 2020

Can confirm that using a threaded gunicorn worker (gevent in our case) breaks the web component's ability to show task logs that are in S3. Moving back to sync works for us, and since we're running on Kubernetes a few more web pods to absorb the ALB/ELB health checks isn't (so far) a performance concern.

Would love to see this fixed, however.

Using Airflow default aws_default for the S3 connection. Relevant configuration shared among web, scheduler, and worker pods below the fold
  - AIRFLOW__CORE__REMOTE_LOGGING=True
  - AIRFLOW__CORE__REMOTE_LOG_CONN_ID=aws_default
  - AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://${OUR-BUCKET}/logs
  - AIRFLOW__WEBSERVER__WORKER_CLASS=sync
  - AIRFLOW__WEBSERVER__WORKER_REFRESH_INTERVAL=3600 # the default 30s worker lifetime is *way* too short
  - AWS_DEFAULT_REGION=us-east-1
  - AWS_ROLE_ARN=arn:aws:iam::${OUR_AWS_ACCOUNT_ID}:role/${OUR_AIRFLOW_APP_ROLE}
  - AWS_WEB_IDENTITY_TOKEN_FILE=/var/run/secrets/eks.amazonaws.com/serviceaccount/token # This is how EKS does IAM<>Pod stuff

@cmlad
Copy link

cmlad commented Jul 9, 2020

The issue seems to be infinite recursion due to an interaction between gevent's monkey patching and the botocore library used by S3TaskHandler:

Traceback (most recent call last):
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/utils/log/s3_task_handler.py", line 131, in s3_log_exists
    return self.hook.get_key(remote_log_location) is not None
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/hooks/S3_hook.py", line 224, in get_key
    obj = self.get_resource_type('s3').Object(bucket_name, key)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/airflow/contrib/hooks/aws_hook.py", line 186, in get_resource_type
    config=config, verify=self.verify)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/boto3/session.py", line 389, in resource
    aws_session_token=aws_session_token, config=config)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/boto3/session.py", line 263, in client
    aws_session_token=aws_session_token, config=config)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 823, in create_client
    credentials = self.get_credentials()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 428, in get_credentials
    'credential_provider').load_credentials()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 919, in get_component
    self._components[name] = factory()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/session.py", line 149, in _create_credential_resolver
    self, region_name=self._last_client_region_used
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/credentials.py", line 70, in create_credential_resolver
    container_provider = ContainerProvider()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/credentials.py", line 1803, in __init__
    fetcher = ContainerMetadataFetcher()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/utils.py", line 1578, in __init__
    timeout=self.TIMEOUT_SECONDS
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 180, in __init__
    self._manager = PoolManager(**self._get_pool_manager_kwargs())
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 188, in _get_pool_manager_kwargs
    'ssl_context': self._get_ssl_context(),
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 197, in _get_ssl_context
    return create_urllib3_context()
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/site-packages/botocore/httpsession.py", line 72, in create_urllib3_context
    context.options |= options
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  File "/usr/local/Caskroom/miniconda/base/envs/airflow/lib/python3.7/ssl.py", line 507, in options
    super(SSLContext, SSLContext).options.__set__(self, value)
  [Previous line repeated 409 more times]
RecursionError: maximum recursion depth exceeded

I still need to dive deeper into this

@dimon222
Copy link
Contributor

dimon222 commented Jul 9, 2020

@cmlad I will make very bad suggestion, but would it work with regular HTTP endpoint instead (without SSL)? Just something to try out.

@dimon222
Copy link
Contributor

dimon222 commented Jul 14, 2020

@cmlad yes its the reason for sure. And HTTP vs HTTPS no difference.

I randomly found potential fix in one of previously mentioned issues
#8164 (comment)
This works, prepending monkey patching in dagbag.py. What is invoking dagbag.py so early ? Not sure.

@Siddharthk
Copy link

How can I read logs from worker pods? This is important since we need to see logs in real time to see whats happening. S3 logs are available only when task gets completed. I am getting below error currently:

Log file does not exist: /opt/airflow/logs/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Fetching from: http://taskpod-49ccd964791a4740b199:8793/log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='taskpod-49ccd964791a4740b199', port=8793): Max retries exceeded with url: /log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f486c9b8be0>: Failed to establish a new connection: [Errno -3] Temporary failure in name resolution',))

@TRReeve
Copy link

TRReeve commented Jul 29, 2020

@Siddharthk If you're using any kind of distributed execution (including kubernetes) you wont be able to read logs from the airflow UI while the executor is running using airflow "out of the box". Airflow pushes the log to remote storage when the task is completed and then the UI reads them from there.

I would guess if you do need to get real time logs from batch tasks while they are running maybe you can set up some sort of worker image with fluentd installed or something similar to scrape stdout and push it somewhere else while airflow is running on it? Either way there'll have to be some sort of external integration

@dimon222
Copy link
Contributor

Solution mentioned above by me

@cmlad yes its the reason for sure. And HTTP vs HTTPS no difference.

I randomly found potential fix in one of previously mentioned issues
#8164 (comment)
This works, prepending monkey patching in dagbag.py. What is invoking dagbag.py so early ? Not sure.

This no longer works in 1.10.12. Perhaps, something gets loaded earlier again and messes up urllib3.
The issue is still present.

@dimon222
Copy link
Contributor

Update - new solution is adding gevent monkey patching to top of config_templates/airflow_local_settings.py. It works.

@peycho
Copy link

peycho commented Sep 2, 2020

We're facing the same recursion error when using gevent workers.

RecursionError: maximum recursion depth exceeded while calling a Python object

Airflow version 1.10.9

@toderesa97
Copy link

We are experiencing the same issue here. Airflow does sucessfully write logs to S3, but we are getting:

Log file does not exist: /opt/airflow/logs/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Fetching from: http://taskpod-49ccd964791a4740b199:8793/log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log
*** Failed to fetch log file from worker. HTTPConnectionPool(host='taskpod-49ccd964791a4740b199', port=8793): Max retries exceeded with url: /log/mydag/mytask/2020-07-21T11:58:55.019748+00:00/2.log (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f486c9b8be0>: Failed to establish a new connection: [Errno -3] Connection refused',))

I have configured my airflow.cfg as follows:

remote_logging = True
remote_log_conn_id = S3Conn
remote_base_log_folder = s3://dlakeufd-dev-audit/logs/airflow
encrypt_s3_logs = False
logging_config_class = airflow.utils.log.s3_task_handler.S3TaskHandler
task_log_reader = s3.task

But we are getting this exception when the docker image is being provisioned into an ECS container (ie, nothing to do with ECS):

...
File "/usr/local/lib/python3.7/site-packages/airflow/logging_config.py", line 53, in configure_logging
2020-09-03 21:16:15.format(logging_class_path, err)
2020-09-03 21:16:15ImportError: Unable to load custom logging from airflow.utils.log.s3_task_handler.S3TaskHandler due to
...

(There is no further explanation after the due to)

I don't get to understand why after extracting the attribute of the class it is expected to be a dict type, which clearly fails. See this class where the exception is triggered in line 43.

@potiuk
Copy link
Member

potiuk commented Nov 7, 2022

I only handed gevent as this was the only one I knew about. But once you confirm it works for you for gevent I see that there is a similar issue #15116 and seems we can use similar technique by adding eventlet configuration.

    import eventlet
    eventlet.monkey_patch()

But this should be a separate PR once we get confirmation that the approach works for you. Since you have s3 loigging configured and very interested, I will hold on until I get confirmation.

@potiuk
Copy link
Member

potiuk commented Nov 7, 2022

Did you try gevent with my fixes?

potiuk added a commit to potiuk/airflow that referenced this issue Nov 7, 2022
This change makes sure that gevent performs monkey_patching before
everything else. Gevent should make sure to patch all the classes
that are non-cooperative via gevent into cooperative ones and it
should be done as the first thing in the forked gunicorn process.

Usually GeventWorker does it automatically, however this happens
after the configuration of gunicorn gets imported. In our case it
means that it happens after airflow setting are loaded - and for
example it means that if S3 remote logging is configured, then boto
is initialzed before patch_all() and it breaks ssl that is patched
by boto itself. Reversing the sequence and making gevent patches
the ssl connection first, fixes the problem.

We could convert airflow settings to local imports, but this
does not guarantee a success because some of the initilization
methods might be executed before GeventWorker starts and it is
also prone to mistakes (adding top-level import to settings
broke it at some point in time and it went unnoticed).

This change does it slightly differently - in case of gevent
worker, we use a different configuration for gunicorn and make
sure that patch_all() is always executed first before any other
import and initialization. This should fix the problem and be
future-proof.

Fixes: apache#8212
@potiuk
Copy link
Member

potiuk commented Nov 8, 2022

The PR is green @aa3pankaj @dimon222 -> would love some confirmation that it solves the problem - if we do it quickly then it will get a chance to get into 2.4.3 (together with corresponding eventlet change) and will solve the long standing problem.

@aa3pankaj
Copy link
Contributor

@potiuk will test this out

@potiuk
Copy link
Member

potiuk commented Nov 8, 2022

Did you also add --preload flag ? The solution will only work if --preload is enabled (either via env variable as described above) or by applying also #27297 from @ephraimbuddy

@aa3pankaj
Copy link
Contributor

let me try with that also

@aa3pankaj
Copy link
Contributor

aa3pankaj commented Nov 8, 2022

export GUNICORN_CMD_ARGS='--preload'

/opt/airflow_venv/venv/lib/python3.7/site-packages/airflow/www/gunicorn_config_gevent.py:22 MonkeyPatchWarning: Monkey-patching ssl after ssl has already been imported may lead to errors, including RecursionError on Python 3.6. It may also silently lead to incorrect behaviour on Python 3.7. Please monkey-patch earlier. See gevent/gevent#1016. Modules that had direct imports (NOT patched): ['botocore.httpsession (/opt/airflow_venv/venv/lib/python3.7/site-packages/botocore/httpsession.py)', 'urllib3.contrib.pyopenssl (/opt/airflow_venv/venv/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py)'].

@potiuk getting this warning now on webserver bootup.
And, same RecursionError while fetching airflow logs from S3.

@potiuk
Copy link
Member

potiuk commented Nov 13, 2022

Too bad. I will try other things soon.

@EinavDanielDX
Copy link

Any news on this?
We're facing the same problem right now

@potiuk
Copy link
Member

potiuk commented Nov 22, 2022

For now monkey patching is the only solution that works (and seems it works) so i suggest you do not wait for any other solution and apply this. We might or might not find a better one. But there is a workaround that works, so use it above all.

@potiuk
Copy link
Member

potiuk commented Nov 27, 2022

Does monkeypatching work for you @EinavDanielDX ?

@dimon222
Copy link
Contributor

dimon222 commented Dec 1, 2022

Only got to test patches now. I'm on 2.4.3

  1. Out of the box still doesn't work
  2. --preload doesn't help
  3. Monkey patching of default settings.py still works with new minor adjustment - new annotation import have to come before monkey patching, otherwise webserver will fail to start.

I didn't test the new draft PR from @potiuk

@potiuk
Copy link
Member

potiuk commented Dec 10, 2022

Hey @dimon222 @EinavDanielDX @aa3pankaj - can you please try my (new attempt to) fix from #28283 - it's generally a "brute-force" kinda fix. It requires you to set the _AIRFLOW_PATCH_GEVENT environment variable at your webserver though - independently from configuring gevent in the configuration (precisely BEACAUSE reading configuration might trigger 3rd-party libraries to be imported and run their monkeypatching before.

Not the nicest solution, but if it works, you might have a stable way of gevent patching without doing any weird and unpredictable monkeypatching yourself.

@potiuk
Copy link
Member

potiuk commented Dec 14, 2022

Anyone managed to test my fix yet :) ?

potiuk added a commit to potiuk/airflow that referenced this issue Dec 19, 2022
Gevent needs to monkeypatch a number of system libraries as soon
as possible when Python interpreter starts, in order to avoid
other libraries monkey-patching them before. We should do it before
any other initialization and it needs to be only run on webserver.

So far it was done by local_settings monkeypatching but that has
been rather brittle and some changes in Airflow made previous attempts
to stop working because the "other" packages could be loaded by
Airflow before - depending on installed providers and configuration
(for example when you had AWS configured as logger, boto could have
been loaded before and it could have monkey patch networking before
gevent had a chance to do so.

This change introduces different mechanism of triggering the
patching - it could be triggered by setting an environment variable.
This has the benefit that we do not need to initialize anything
(including reading settings or setting up logging) before we determine
if gevent patching should be performed.

It has also the drawback that the user will have to set the environment
variable in their deployment manually. However this is a small price to
pay if they will get a stable and future-proof gevent monkeypatching
built-in in Airflow.

Fixes: apache#8212
@PApostol
Copy link
Contributor

PApostol commented Dec 21, 2022

Hi @potiuk, we've also had the same issue with S3 remote logs and I've tried out your fix. While the fix succeeds in fetching the logs from S3 on the UI, it seems to break the scheduler upon restart. The error is reproducible every time I try to start the scheduler after the fix is applied (note that webserver works fine). Removing the fix from airflow/__init__.py resolves the error. I'm attaching the traceback below. Using Airflow: 2.3.4, Python 3.7.3, LocalExecutor.

Traceback (most recent call last):
  File "/home/ec2-user/venv/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/ec2-user/venv/lib/python3.7/site-packages/airflow/__main__.py", line 38, in main
    args.func(args)
  File "/home/ec2-user/venv/lib/python3.7/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/home/ec2-user/venv/lib/python3.7/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/home/ec2-user/venv/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 76, in scheduler
    _run_scheduler_job(args=args)
  File "/home/ec2-user/venv/lib/python3.7/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
    job.run()
  File "/home/ec2-user/venv/lib/python3.7/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/home/ec2-user/venv/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 743, in _execute
    self.executor.start()
  File "/home/ec2-user/venv/lib/python3.7/site-packages/airflow/executors/local_executor.py", line 348, in start
    self.result_queue = self.manager.Queue()
  File "/usr/lib64/python3.7/multiprocessing/managers.py", line 724, in temp
    token, exp = self._create(typeid, *args, **kwds)
  File "/usr/lib64/python3.7/multiprocessing/managers.py", line 607, in _create
    conn = self._Client(self._address, authkey=self._authkey)
  File "/usr/lib64/python3.7/multiprocessing/connection.py", line 498, in Client
    answer_challenge(c, authkey)
  File "/usr/lib64/python3.7/multiprocessing/connection.py", line 742, in answer_challenge
    message = connection.recv_bytes(256)         # reject large message
  File "/usr/lib64/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/lib64/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/lib64/python3.7/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
BlockingIOError: [Errno 11] Resource temporarily unavailable

UPDATE:
The above has been resolved by using patch_all(thread=False, socket=False) instead of patch_all(), as explained here.

potiuk added a commit that referenced this issue Dec 21, 2022
Gevent needs to monkeypatch a number of system libraries as soon
as possible when Python interpreter starts, in order to avoid
other libraries monkey-patching them before. We should do it before
any other initialization and it needs to be only run on webserver.

So far it was done by local_settings monkeypatching but that has
been rather brittle and some changes in Airflow made previous attempts
to stop working because the "other" packages could be loaded by
Airflow before - depending on installed providers and configuration
(for example when you had AWS configured as logger, boto could have
been loaded before and it could have monkey patch networking before
gevent had a chance to do so.

This change introduces different mechanism of triggering the
patching - it could be triggered by setting an environment variable.
This has the benefit that we do not need to initialize anything
(including reading settings or setting up logging) before we determine
if gevent patching should be performed.

It has also the drawback that the user will have to set the environment
variable in their deployment manually. However this is a small price to
pay if they will get a stable and future-proof gevent monkeypatching
built-in in Airflow.

Fixes: #8212
ephraimbuddy pushed a commit that referenced this issue Jan 11, 2023
Gevent needs to monkeypatch a number of system libraries as soon
as possible when Python interpreter starts, in order to avoid
other libraries monkey-patching them before. We should do it before
any other initialization and it needs to be only run on webserver.

So far it was done by local_settings monkeypatching but that has
been rather brittle and some changes in Airflow made previous attempts
to stop working because the "other" packages could be loaded by
Airflow before - depending on installed providers and configuration
(for example when you had AWS configured as logger, boto could have
been loaded before and it could have monkey patch networking before
gevent had a chance to do so.

This change introduces different mechanism of triggering the
patching - it could be triggered by setting an environment variable.
This has the benefit that we do not need to initialize anything
(including reading settings or setting up logging) before we determine
if gevent patching should be performed.

It has also the drawback that the user will have to set the environment
variable in their deployment manually. However this is a small price to
pay if they will get a stable and future-proof gevent monkeypatching
built-in in Airflow.

Fixes: #8212
(cherry picked from commit 2429d07)
@dimon222
Copy link
Contributor

dimon222 commented Jan 27, 2023

Just wanted to post my final note that 945fc072fa903c8b96c70d34749bf98dabf8a654 did indeed solve it, and functions properly in 2.5.1 without any extra voodoo.

@potiuk
Copy link
Member

potiuk commented Jan 27, 2023

Just wanted to post my final note that 945fc07 did indeed solve it, and functions properly in 2.5.1 without any extra voodoo.

Cool thanks for that confirmation!

na-wu pushed a commit to Affirm/airflow that referenced this issue Jan 30, 2023
* Handle transient state errors in `RedshiftResumeClusterOperator` and `RedshiftPauseClusterOperator` (#27276)

* Modify RedshiftPauseClusterOperator and RedshiftResumeClusterOperator to attempt to pause and resume multiple times to avoid edge cases of state changes

* Improve "sensor timeout" messaging (#27733)

It's been around a long, long time, but that doesn't make it any less confusing.  I think it's time to do away with the "SNAP. Time is OUT" message, and replace it with something less cute / more clear / direct.

* Completed D400 for airflow/lineage/* directory (#27732)

* Completed D400 for airflow/listener/* directory (#27731)

* Completed D400 for airflow/api_connexion/* directory (#27718)

* Improve task deadlock messaging (#27734)

When one encounters "deadlock" in the logs, it's confusing because most people will immediately think database deadlock.

But it's really about task dependencies.  I thought about changing it to just "Dag has no runnable tasks", but the deadlong naming kindof has a meaning in the codebase so I tried to bridge the gap by clarifying it's specifically a _task_ deadlock.

* Allow datasets to be used in taskflow (#27540)

* Allow datasets to be used in taskflow

Datasets could not be passed as parameters to taskflow
functions as they could not be serialized. This commit:

1) changes the xcom serializer so that it now can serialize
objects that have attr, dataclass or a custom serializer
2) removes the need to for a custom serializer in lineage
3) adds a version check to the serializer/deserializer
4) registers any datasets as either inlets or outlets in the task
5) inlets or outlets that cannot be serialized now raise an error

* Update docs/apache-airflow/concepts/taskflow.rst

Co-authored-by: Bas Harenslak <BasPH@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Bas Harenslak <BasPH@users.noreply.github.com>
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>

* scheduler_job, add metric for scheduler loop timer (#27605)

* Use Boto waiters instead of customer _await_status method for RDS Operators (#27410)

* Fix formatting leftovers (#27750)

PR 27540 left some formattng issues which weren't caught

* System Test for EMR (AIP-47) (#27286)

* Make custom env vars optional for job templates (#27148)

* Make custom env vars optional for job tempaltes

This addresses issue #26045, where an additional field is added to the
job to determine whether or not any extra env fields are added to the
job specification.

This helps remove the issue currently with using CI/CD platforms like
Argo that attempt to apply the env overrides to the job templates post
their initial deploy which are immutable.

Apply feedback from failing test

* Replace helm chart icon (#27704)

* Allow SSL mode in MySQL provider (#27717)

* completed D400 for airflow/callbacks/* airflow/cli/* (#27721)

* reset commits, clean submodules (#27560)

* Expand tasks in mapped group at run time (#27491)

* Align TaskGroup semantics to AbstractOperator (#27723)

* Limit the full tests to representative items in Python/Backend matrix (#27754)

This chnage limts the matrix of tests run on CI when full tests are run
for all our tests - in the way that each of the items is for sure
present but not repeated when unnecessary.

* [FEATURE] add postgres 15 support (#27444)

* Use context managers to simplify log serve management (#27756)

* simplify notes accordion test (#27757)

* Fix typing problem revealed after recent Neo4J release (#27759)

* Typescript files are also now triggering UI/WWW tests (#27760)

* Add: #23880 :: Audit log for AirflowModelViews(Variables/Connection) (#24079)

* add audit log for AirflowModelViews
this includes connections, variables, pools, sla, xcom. pools

* Replace `unittests` in microsoft providers tests by pure `pytest` [Wave-4] (#27735)

* Fix  backfill  queued  task getting reset to scheduled state.  (#23720)

* optimise task instances filtering (#27102)

* Add critical section query duration metric (#27700)

* Completed D400 for multiple folders (#27748)

* Sync context.py with context.pyi (#27770)

* Fix a deprecation warning related to airflow.utils.helpers.chain (#27769)

* Unify "full tests needed" and "run everything" in CI (#27761)

After recent changes in https://github.com/apache/airflow/pull/27754
where representative tests now are run even in "full tests needed",
run_everything became practically equivalent to full tests needed.

This change removes "run_everything" mode and replaces it with
"full tests needed" - either automatically detected by set of
changes or type of PR or manually set by PR label.

* Fixups of small mistakes in release providers documentation (#27773)

* Document dag_file_processor_timeouts metric as deprecated (#27067)

This metric was deprecated in 2.0, removed from the documentation,
users were notified in changelog, but the metric itself was still in the
code base. It cannot be removed just yet, so mark it as deprecated for
now.

fixes #15834

* Allow hyphens in pod id used by k8s executor (#27737)

Makes things more readable.  E.g. my-dag-my-task instead of mydagmytask.

* Fix doc strings for bigquery tests (#27794)

PR https://github.com/apache/airflow/pull/26433 removed the dedicated classes but changes to doc strings were not made

* Fix menu border color (#27789)

* Clear TaskGroup (#26658)

* Clear TaskGroup WIP

* Make get_task_dict iterative and update type check

* Add UI part

* Add test for get_task_dict

* Raise exception for unhandled DAGNode instance.

* Update task_id for test_get_task_dict

* Convert get_task_dict to iter_tasks

* Lock dag runs to prevent dead locks

* Add warning to user

* Update following code review

* Installation instruction to use Colima instead of Docker Desktop (#27804)

* Add logic for XComArg to pull specific map indexes (#27771)

* Optimize TI.xcom_pull() with explicit task_ids and map_indexes (#27699)

* Add a missing space in a print of initialize_virtualenv.py (#27793)

* Make views requiring session, keyword only args (#27790)

* Don't explicitly set include_examples to False on task run command (#27813)

The speed optimization in #26750 where I set include_examples to False
is making it impossible to run the example dag's tasks on the command line.

Removing this to now depend on load_examples config setting.

* Instruct users to upgrade docker-desktop instead of work around (#27809)

* Improve TriggerRuleDep typing and readability (#27810)

* Metric for raw task return codes (#27155)

Co-authored-by: Igor Kholopov <kholopovus@gmail.com>

* Switch (back) to late imports (#27730)

* Add CI step to run prepare provider documentation. (#27832)

Adds CI step that was likely missing during refactoring when we
run prepare-provider-documentation step to test that it is
still working.

We are running pretty much all our `breeze` commnd in CI even those
that we are using for manual release management, just to make sure
they are still working and that none of the refactorings
break it. This is very reassuring that whenever you attempt to do
the release, most likely the tools that we use for that are still
working.

* Add better diagnostics in case of outdated images for pre-commits (#27835)

When you have out-dated Python 3.7 image, some of the pre-commits
might fail when using it and produce strange stacktraces which
are difficult to understand the root cause of.

We do not want to check if the image is fresh in those cases - that
will often lead to unnecessary time needed for rebuilding image just
to get the pre-commit runs, instead we explicitly suggest the user
what can be done in case they are lost.

* Add DagRun state change to the Listener plugin system(#27113)

This PR expands listeners API to be notified about DagRun state changes.

PR #20443 introduced Listener API to Airflow, enabling users and developers
to write Pluggy plugins. Here the same mechanism is used to provide
notifications about DagRun states - whether DagRun started, succeeded or
failed.

Additionally, this PR adds lifecycle methods for plugins - when Airflow Job - 
whether SchedulerJob, BackfillJob or LocalTaskJob runs, it notifies plugin that it
starts or finishes.

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>

* Correct job name matching in SagemakerProcessingOperator (#27634)

* Correct job name matching in SagemakerProcessingOperator

SagemakerProcessingOperator "increment if job name exists" was matching any job name that contains the proposed name when really it should have been matching names which fit the specific pattern ^{proposed_name}{suffix_template}$

* Make DagRun state updates for paused DAGs faster (#27725)

* Make DagRun state updates for paused DAGs faster

If you have a large number of paused DAGs with running DagRuns in your instance,
the SchedulerJob function that tries to update the state for those DagRuns
can be really slow (slow enough to cause the scheduler to be killed by
k8s livenessprobes, for example).

This PR does a few things:

- Only check paused DAGs that have running DagRuns
- Only check DagRuns that have TI's updated since the last
  DagRun.last_scheduling_decision
- Only get the serialized DAG if we are ready to try updating the DagRun
  state

This prevents Airflow from trying to update every "paused" DagRun over
and over again, and doesn't do expensive work until we know it makes
sense to try updating the state.

* Restore removed (but used) methods in common.sql (#27843)

* Restore removed (but used) methods in common.sql

Unfortunately two of the methods that have been removed in #26761
have been used in Google Provider 8.4.0

Those methods should be restored and 1.3.0 version of the
common.sql has to be yanked to get rid of the problem when
Bigquery is broken by common.sql

Fixes: #27838

* Use virtual env python in the script shebang of retag_docker_images.py (#27857)

* UI: Update offset height if data changes (#27865)

* Fix old-style typing in Base Sensor (#27871)

The new autoflake removes unused Iterable from sensor - because
it is used via old-style typing. Subsequently MyPy complains that
Iterable is not imported.

This PR converts typing to new-style to fix it.

* Fix new warning appearing after new snowflake-connector-python (#27870)

The release started to generate a new warning.

* Fix templating fields and do_xcom_push in DatabricksSQLOperator (#27868)

When SQLExecuteQueryOperator has been introduced in #25717, it
introduced some errors in the Databricks SQL operator:

* The templated "schema" field has not been set as field in the
  operator.
* The do_xcom_push parameter was ignored

This PR fixes it by:

* storing schema as field and using it via self reference
* do_xcom_push is removed (and BaseOperator's one is used).

* Simplify FAB table resetting (#27869)

We can simplify how we reset FAB tables now that everything is an
"Airflow" table.

* Refresh next run datasets info in dags view (#27839)

* Notes stored in separate table (#27849)

* wip

* try revert rename

* simplify

* working, minimally

* more reverting of notes -> note rename

* more reverting of notes -> note rename

* more reverting of notes -> note rename

* remove scratch code

* remove test speedup

* restore admin view

* add migration

* add migration

* tod

* fix migration

* Add DagRunNote

* add migration file

* disamble notes in search

* fix dagrun tests

* fix some tests and tighten up relationships, i think

* remove notes from create_dagrun method

* more cleanup

* fix collation

* fix db cleanup test

* more test fixup

* more test fixup

* rename to tinote

* rename fixup

* Don't import FAB user models just to define FK rel

We don't (currently) define any relationships it's just for making the
FK match the migration, so for now we can have the FK col defined as a
string.

When we eventually add a relationship to the get the creator of the
note, we should move the FAB User model into airflow.models and change
Security manager code to import from there instead.

* Avoid touching test file unnecessarily

* fix import

* Apply suggestions from code review

* Test that a user_id is set when creating note via api

* Fix static checks

Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Co-authored-by: Jed Cunningham <jedcunningham@apache.org>

* Fix suffix for pre-release cross-dependent packages (#27727)

We used .* as suffix for dependent pre-release packages but it
turned out to be misunderstanding of the dependencies and PEP440.

According to PEP440 the dev/a/b/c(rc) versions are strictly ordered
and ">=X.Y.Z.dev0" is equivalent of "depends on any pre-release
package of X.Y.Z and all packages that follow".

Result of discusion in https://github.com/python-poetry/poetry/issues/7047

* Make MappedTaskGroup depend on its expand inputs (#27876)

* Allow depth-first execution (#27827)

* Fix errors in Databricks SQL operator introduced when refactoring (#27854)

When SQLExecuteQueryOperator has been introduced in #25717, it
introduced some errors in the Databricks SQL operator:

* The schema (description) parameter has been passed as _process_output
  parameter from Hook's output
* The run() method of DatabricksHook was not conforming to other
  run methods of the Hook - it was returning Tuple of the
  result/description
* The _process_output type was not specified - if scalar was used
  it returned different output than without it and it was not
  specified in the DBApiHook.

This PR fixes it by:

* the Databricks Hook is now conformant to the other DBAPIHooks in
  terms of value returned by Hook (backwards incompatible so we
  need to bump major version of the provider)
* the DBApiHook now has "last_description" field which on one hand
  makes it stateless, on the other, the state reflects the
  description of the last run method and is not a problem to keep.
  This implies 1.4 version of common-sql provider as this is a new
  feature for the provider
* the DBApiHook now has "scalar_return_last" field that indicates
  if scalar output was specified.
* Python dbapi's "description" is properly named now - previously it was
  "schema" which clashed with the "schema" name passed to hook
  initialisation - the actual database schema

* Redirect to home view when there are no valid tags in the URL (#25715)

* Redirect to home view when there are no valid tags in the URL

* Add tags to test dags

* Add tag tests

* Fix home page query count

* Bump common.sql provider to 1.3.1 (#27888)

The common.sql provider should be bumped to 1.3.1 in order to handle
some of the problems found in 1.3.0 - mainly about consistency of
common SQLExecuteQueryOperator that is used in multiple providers.

We are going to yank 1.3.0 - this PR also bumps min dependency
for all providers that use common.sql to make sure we can release
them in sync with the common.sql provider.

* Use "note" instead of "notes" in API re dag run / task instance notes (#27867)

* rename notes to note in endpoint code

* rename notes to note in model code

* update test code

* update react code

* fixups

* fixup! fixups

* fix rebase mistake

Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>

* Explicitly name the primary keys in ORM for task & dagrun notes table (#27886)

This was omitted but it's important for future migrations

* task runner: notify of component start and finish (#27855)

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>

* Update default branches for 2-5

* tests: always cleanup registered test listeners (#27896)

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
(cherry picked from commit eba04d7c400c0d89492d75a7c81d21073933cd0c)

* Remove is_mapped attribute (#27881)

(cherry picked from commit 3e288abd0bc3e5788dcd7f6d9f6bef26ec4c7281)

* Soft delete datasets that are no longer referenced in DAG schedules or task outlets (#27828)

* Soft delete datasets that are no longer referenced anywhere

* Move the orphaning logic into the scheduler and adjust config option name accordingly

* Rename config option scheduler.cleanup_interval -> scheduler.parsing_cleanup_interval

* Include default column value in migration

* deprecate old interval config; move code to scheduler_job

* First pass at a test

* Fix migration

* Apply suggestions from code review

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>

* Don't batch migrations if we don't need to

* Revert "Don't batch migrations if we don't need to" - gotta batch migrations for SQLite

This reverts commit 652f7452d3b418c991d409a2b0fc041443048545.

* Tweak migrations

* Use sqlalchemy.sql.True_() to support all DB backends

* Various cleanups

* Add test for un-orphaning datasets once they are referenced again

* Use sqlalchemy.sql.expression.true()

Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>

* Fix orphaning datasets on MSSQL

* Comment the un-orphan process and use sqla.sql.expression.false()

* Add newsfragment about renamed config option

* add mssql_drop_default flag

* Use server_default in the ORM as well

* Defensively clear datasets before and after DAG tests

* Reconcile migration with ORM model

* Remove now erroneous comment

* Change to use server_default='0'

* Update airflow/configuration.py

Co-authored-by: Jed Cunningham <jedcunningham@apache.org>
Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
(cherry picked from commit 3fef6a47834b89b99523db6d97d6aa530657a008)

* Mask audit log extra field (#27923)

(cherry picked from commit 1e73b1cea2d507d6d09f5eac6a16b649f8b52522)

* Add allow list for imports during deserialization (#27887)

During deserialization Airflow can instantiate arbitrary
objects for which it imports modules. This can be dangerous
as it could lead to unwanted effects. With this change
administrators can now limit what objects can be deserialized.
It defaults to Airflow's own only.

(cherry picked from commit 542cfdc270aeb9b05ad150df28bc48b0a84c0f38)

* Update version to 2.5.0

* Add release notes

* Include correct meta tag to get dataset events (#27927)

Without this the section on "Dataset updates caused by this task
instance" would show no events!

(cherry picked from commit 8588d134746045cc1d2c3621fb34e18c883cbf67)

* Fix deserializing Params where the default is an array (#27944)

In a previous change we deserialized Param values inside a list, but the
tests didn't previously cover an array of plain values (`[True]` for
instance)

This caused the webserver to 500 (bad, but only affected a single DAG)
but it _also_ caused the scheduler to crash when it tried to process
this DAG (bad-bordering on terrible! Nothing should ever bring down the
whole scheduler)

(cherry picked from commit a5d5bd0232b98c6b39e587dd144086f4b7d8664d)

* Add release notes

* Pass in session appropriately to _clear_dag_tis (#28003)

This makes the session argument required instead, and pass it from the
dagrun_clear view correctly.

Some type annotations are added also to the function for future
maintainability.

(cherry picked from commit f43f50e3f11fa02a2025b4b68b8770d6456ba95d)

* Fix failing test case for db clean in newer sqlalchemy (#28004)

Fixing of ambiguity of treating "*" in SQLAlchemy 4.1.42 made our
tests to fail, because they were trying to execute the query
from aliased table as a standalone queryi for verificaiton,
where in "reality" the same query would be executed as part of "CREATE
AS" statement.

The tests started to fail with the new SQLAlchemy and the fix was
to change our tests to also run "CREATE AS" statement and count
number of rows in the created temporary table.

(cherry picked from commit 122d60b5b4547f7380d58eea148552607264122e)

* Apply more masking on audit logs (#27994)

This fixes variables val and connection extra field masking in the audit log table

(cherry picked from commit 924725855134288bae52f6568d2b8c4fad393c3d)

* allow scroll in triggered dag runs modal (#27965)

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
(cherry picked from commit 5e4f4a3556db5111c2ae36af1716719a8494efc7)

* Gracefully handle whole config sections being renamed (#28008)

* wip: support for old section/key gets

* Gracefully handle whole config sections being renamed

We renamed the whole kubernetes section to kubernetes_executor, but if
you tried to get the _old_ value it would cause an exception.

This has been a problem for basically ever, but we've never noticed
before because we upgrade the access in core at the same time, but in
the case of kubernetes_executor/namespace it is "possible" (and maybe
even reasonable) to access that config setting in a DAG.

Without this change we will break some DAGs on upgrade, which we should
avoid.

* fixup! Gracefully handle whole config sections being renamed

* Update airflow/configuration.py

Co-authored-by: blag <blag@users.noreply.github.com>

* fixup! Gracefully handle whole config sections being renamed

* fixup! fixup! Gracefully handle whole config sections being renamed

* fixup! Gracefully handle whole config sections being renamed

* Remove test side effects

* autouse restore_env fixture

Co-authored-by: Jed Cunningham <jedcunningham@apache.org>
Co-authored-by: blag <blag@users.noreply.github.com>
(cherry picked from commit 3df03cc9331cb8984f39c5dbf0c9775ac362421e)

* Add release notes

* Update 2.5.0 release date (#28081)

(cherry picked from commit 24745c71f246b48b7981d7f4af93f097080c8afd)

* Allow generation of connection URI to work when no conn type (#26765)

Previously if get_uri was called it would fail with `NoneType not iterable`, because of the check `if '-' in conn_type`.

(cherry picked from commit b124d6cac1180e21d21408f74729e77f75a46cb9)

* fix: continue checking sla (#26968)

Co-authored-by: doiken <doiken@users.noreply.github.com>
(cherry picked from commit e350b6d44d6d34a1759b9c2d925b7d27532436ab)

* Additional info about Segmentation Fault in LocalTaskJob (#27381)

(cherry picked from commit 76f81cd4a7433b7eeddb863b2ae6ee59176cf816)

* Handle DAG disappearing mid-flight when dag verification happens (#27720)

When scheduler schedules a DAG and it disappears mid-flight by
DagFileProcessor, it might lead to scheduler crashing in the
verify_integrity method.

This PR simply skips scheduling the DAG in such case rather than
attempting to schedule it.

Fixes: #27622
(cherry picked from commit 15e842da56d9b3a1c2f47f9dec7682a4230dbc41)

* Note which versions of Python are supported (#27798)

Copied from Prerequisites page which versions of Python that Airflow is tested with. Noted explicitly that 3.11 is not yet supported.

As suggested in #27676.

(cherry picked from commit 017ed9ac662d50b6e2767f297f36cb01bf79d825)

* dagrun, next_dagruns_to_examine, add MySQL index hint (#27821)

* dagrun, next_dagruns_to_examine, add MySQL index hint

* resolve static check errors

(cherry picked from commit 5e45cb019995e8b80104b33da1c93eefae12d161)

* Make sure we can get out of a faulty scheduler state (#27834)

* Make sure we can get out of a faulty scheduler state

This PR fixed the case where we have a faulty state in the database.
The state that is fixed is that both the unmapped task instance and mapped task instances exist at the same time.

So we have instances with map_index [-1, 0, 1].
The -1 task instances should be removed in this case.

(cherry picked from commit 73d9352225bcc1f086b63f1c767d25b2d7c4c221)

* Don't log CLI actions if db not initialized (#27851)

(cherry picked from commit d97a249404e2fa6854e182965274df83aa35bdb9)

* fix: current_state method on TaskInstance doesn't filter by map_index (#27898)

(cherry picked from commit 51c70a5d6990a6af1188aab080ae2cbe7b935eb2)

* Order TIs by map_index (#27904)

(cherry picked from commit 820c5bbad9e3fb2c6eb19a51eafc800267746eae)

* Fix deadlock when chaining multiple empty mapped tasks (#27964)

The fix here was to set changed_tis to True if there was an expansion.

(cherry picked from commit f89ca94c3e60bfae888dfac60c7472d207f60f22)

* Documentation for the LocalTaskJob return code counter (#27972)

Co-authored-by: Igor Kholopov <kholopovus@gmail.com>
(cherry picked from commit 4a391150aae346d011f5016e2bcea0ed2f44d23b)

* Ignore Blackification commit from Git Blame (#27981)

(cherry picked from commit 405087df3db8ca7c7333def64f8e96209117066a)

* Simplify dataset subgraph logic (#27987)

* fix merging connected dataset graphs

* refactor graph calculation

(cherry picked from commit f1c4c27e4aed79eef01f2873fab3a66af2aa3fa0)

* Prevent double loading of providers from local paths (#27988)

I noticed a case where the same local providers were loaded more than
once, and it turned out to be caused by having `.` in the python search
path.

The fix for this is to canonicalize the path before looking for
providers in it, and not searching in a path more than once.

(cherry picked from commit 1a02ad9e1d73e4c33d48b25ec9781c54af91f748)

* Add documentation for [core] mp_start_method config (#27993)

(cherry picked from commit 56b5f3f4eed6a48180e9d15ba9bb9664656077b1)

* docs: copyedit DAG (#27995)

(cherry picked from commit 893253a4c36634c17810dd6fd0a44fb1fc174939)

* Handle bad zip files nicely when parsing DAGs. (#28011)

(cherry picked from commit 8924cf1751e5190a1a7b4e33bb40de604b8b76b2)

* Use asserts instead of exceptions for executor not started (#28019)

(cherry picked from commit abe3b318b525cca703cd6c0cda25af87cdf19b1b)

* Add Andrey as committer (#28050)

(cherry picked from commit ada91b686508218752fee176d29d63334364a7f2)

* Return list of tasks that will be changed (#28066)

Ensure that when a user clicks on 'Queue up new tasks' a list of tasks that will be run is returned.

(cherry picked from commit af29ff0a8aa133f0476bf6662e6c06c67de21dd5)

* Resolve false warning about calling conf.get on moved item (#28075)

Calling `items` on config has the effect of calling `get` on each item. If we call `get` on a moved item, we will falsely get a warning letting us know to update our code.  So, we suppress such warnings when iterating the config.

(cherry picked from commit 2de613121b0d882bcf26ea944c91f3e915e3ae3f)

* Bump decode-uri-component from 0.2.0 to 0.2.2 in /airflow/www (#28080)

Bumps [decode-uri-component](https://github.com/SamVerschueren/decode-uri-component) from 0.2.0 to 0.2.2.
- [Release notes](https://github.com/SamVerschueren/decode-uri-component/releases)
- [Commits](https://github.com/SamVerschueren/decode-uri-component/compare/v0.2.0...v0.2.2)

---
updated-dependencies:
- dependency-name: decode-uri-component
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
(cherry picked from commit c4cb5b3d35f1982df2144d645ef1d191b4b1df85)

* Add airflow.api.auth.backend.session to backend sessions in compose (#28094)

The default backend seting in Docker compose needs to be updated
to get rid of the warning introduced in #21640

Fixes: #28091
(cherry picked from commit 9d73830209aa1de03f2de6e6461b8416011c6ba6)

* Improve handling of warnings in CI (#28096)

Warnings printed in CI have been making it difficult to see what
is going on (they were taking far too much space after the test
results and GitHub CI UI rendered those multi-line warnings slowly.

Also we did not have the right tools to capture the number and list
of warnings that we should deal with.

We are usign pytest-capture-warnings plugin now that improves
the situation twofold:

* warning summary printed by the plugin in the output is
  shorter - each warning is at most one line
* the warning text files are uploaded as artifacts which make them
  usable in any kind of approach where we want to attempt to
  start an effort to remove all warnings

(cherry picked from commit 16fddbae83d03c9b3e2d249cc8852fb006c65c3b)

* Don't emit FutureWarning when code not calling old key (#28109)

Tried fixing this before using simplefilter but it doesn't work when application threaded.
See here https://docs.python.org/3/library/warnings.html#temporarily-suppressing-warnings.

It was tricky to solve.  When "actually" reading the values we call super().get. You'd think this would not invoke airflow config parser `get` right? But because of config parser interpolation, ultimately, it invokes `get` again on airflow config parser.

So, we can manipulate an attr to signal when we are _accessing_ a deprecated key but only because we're retrieving the backcompat val (not because the user requested it).

Additionally we have to handle the case where `items` is called (e.g. from within as_dict) which calls `get` for every option you have in your config.

(cherry picked from commit 27a84637b3ab9b5f7d0e93252ef93656bc6907ea)

* Make BaseJob.most_recent_job favor "running" jobs (#28119)

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
(cherry picked from commit 56c0871dce2fb2b7ed2252e4b2d1d8d5d0c07c58)

* Improve run/task grid view actions (#28130)

* improve run/task changes from grid

* fix confirm return type

(cherry picked from commit a0c85022b45c355f50ee2f820b7a32fd97c275e7)

* Ensure the dagbag_size metric decreases when files are deleted (#28135)

(cherry picked from commit 2c7bd921635fc2e6fdc5315d90769e355a67d7e6)

* Do not warn when airflow dags tests command is used (#28138)

When you run airflow dags test command on a fresh, never run dag,
the command tries to emit true scheduling delay but it fails as
there are no task instances created in the DB.

This change skips generation of the metrics if task instance cannot
be found.

Fixes: #28106
(cherry picked from commit b3d7e17e72c05fd149a5514e3796d46a241ac4f7)

* Ensure that warnings from `@dag` decorator are reported in dag file (#28153)

It's no use to a user if the warnings appear from
`airflow/models/dag.py`!

This hacky magic has been in place for the apply_defaults metaclass for
a couple of releases, and for all it's evil hackiness, it works, I've
just extracted it out to be reusable.

Also I wish Python had something like this built in, as it's _really_
hard to get this right otherwise

(cherry picked from commit 5fec7873a6b4d86fb2a1e66fa32f12cbd42431e0)

* fix next run datasets error (#28165)

(cherry picked from commit 196bab483655a24f0ef12a1049d21b8a9dead691)

* Make invalid characters exception more readable (#28181)

The InvalidStatsNameException message when a stats name contains invalid
characters currently includes the set of allow characters, which is
quite long and of underterministic order (bein formatted from a set),
making the message impossible to decipher for a human.

This commit changes the message to describe what characters are allowed
in a human-readable fashion instead.

(cherry picked from commit e3ae69e9a779f14ee70c5646d6e072bb2365e54f)

* Fix typo in Best Practice documentation (#28190)

(cherry picked from commit fd4ab1dc35fc0b4bb53b63080e58fe7829274293)

* Add custom pickling hooks to LazyXComAccess (#28191)

fixes https://github.com/apache/airflow/issues/28146

(cherry picked from commit e981dfab4e0f4faf1fb932ac6993c3ecbd5318b2)

* Make live logs reading work for "other" k8s executors (#28213)

(cherry picked from commit cbfbf8b843f178de1e1aa1066e5ea3377a8de774)

* Make arguments 'offset' and 'length' not required (#28234)

* Make arguments 'offset' and 'length' not required

* Disable implicit optional for azure-storage (mypy)

(cherry picked from commit 7e776db254953076e932ed2183e1ed49a5ca20a6)

* Fix db clean warnings (#28243)

RTIF is already cleaned by TI's FK, and doesn't have a column to use as
a recency column on it's own. db clean was added for 2.3, but cleaning
RTIF was broken before 2.3 was released, meaning this never actually
worked anyways. In addition, if users explicitly try and clear this
table with `--tables`, it gets ignored, making this a safe change to
make.

Dataset event's created_at column was renamed to timestamp before 2.4
was released, so this one also never worked.

Closes #26581

(cherry picked from commit 171ca66142887f59b1808fcdd6b19e7141a08d17)

* Convert test_task_command to Pytest and unquarantine tests in it (#28247)

(cherry picked from commit 2f5c77b0baa0ab26d2c51fa010850653ded80a46)

* Trigger gevent monkeypatching via environment variable (#28283)

Gevent needs to monkeypatch a number of system libraries as soon
as possible when Python interpreter starts, in order to avoid
other libraries monkey-patching them before. We should do it before
any other initialization and it needs to be only run on webserver.

So far it was done by local_settings monkeypatching but that has
been rather brittle and some changes in Airflow made previous attempts
to stop working because the "other" packages could be loaded by
Airflow before - depending on installed providers and configuration
(for example when you had AWS configured as logger, boto could have
been loaded before and it could have monkey patch networking before
gevent had a chance to do so.

This change introduces different mechanism of triggering the
patching - it could be triggered by setting an environment variable.
This has the benefit that we do not need to initialize anything
(including reading settings or setting up logging) before we determine
if gevent patching should be performed.

It has also the drawback that the user will have to set the environment
variable in their deployment manually. However this is a small price to
pay if they will get a stable and future-proof gevent monkeypatching
built-in in Airflow.

Fixes: #8212
(cherry picked from commit 2429d077d8c59299487562c8867cfc63cd969b9d)

* Add SIGUSR2 handler for LocalTaskJob and workers to aid debugging (#28309)

There have been multiple reports of people with tasks stuck in the
running state, and no obvious activity from the running task, but the
supervisor is still actively heart beating.

In order to make it easier/possibly to tell _where_ the process is stuck
we add a SIGUSR2 handler to the Task supervisor (that is purposefully
inherited to the actual task process itself) that will print the current
stack trace on receiving USR2 - is the same signal we use for
causing a debug dump in the Scheduler.

(cherry picked from commit f9e8969ff77d9a6498a779a912f78ca309c95aaa)

* Separate ER diagram dependencies to doc_gen extra (#28346)

The eralchemy dependency requires pygraphviz which does not play
well with MacOS/M1. For those who want to easily install `doc`
extra, it is a blocker.

Now the extra is separated.

(cherry picked from commit b6bc318f6fb55fbaf61928ba67343d33e6fed73b)

* Remove docs package from Publish_docs.py and switch to python in shebang (#28347)

(cherry picked from commit 81271424e364822ff094ff9a75d213b223ccd0be)

* Correctly select a mapped task's "previous" task (#28379)

(cherry picked from commit 8aac56656d29009dbca24a5948c2a2097043f4f3)

* Maintain manual scroll position in task logs (#28386)

(cherry picked from commit 5c80d985a3102a46f198aec1c57a255e00784c51)

* Dont show task/run durations when there is no start_date (#28395)

* Dont show broken task/run duration

* add test

(cherry picked from commit 11f30a887c77f9636e88e31dffd969056132ae8c)

* A manual run can't look like a scheduled one (#28397)

Fix https://github.com/apache/airflow/issues/27818

(cherry picked from commit 7ccbe4e7eaa529641052779a89e34d54c5a20f72)

* separate callModal from dag.js (#28410)

(cherry picked from commit 2f0f02536f7773dd782bd980ae932091b7badc61)

* Fix ti._try_number for deferred and up_for_reschedule tasks (#26993)

* have a consistent try_count util

* Update airflow/www/utils.py

Co-authored-by: pierrejeambrun <pierrejbrun@gmail.com>

* use _try_number, default to 1

* properly handle up_for_reschedule task try number

* fix tests

* add State.waiting for deferred and up for reschedule tasks

* add State.pending var

* remove confusing state aliases

Co-authored-by: pierrejeambrun <pierrejbrun@gmail.com>
(cherry picked from commit f110cb11bf6fdf6ca9d0deecef9bd51fe370660a)

* Add setters to MappedOperator on_*_callbacks (#28313)

(cherry picked from commit 105dbd2ed140380b9af9e0c2d3086c3f052b9951)

* Re-enable Plyvel on ARM as it now builds cleanly (#28443)

Previously, Plyvel had to be disabled in order to get
ARM compatibility (it did not have
binary wheels released and it failed to compile cleanly on debian).
But the last problem is fixed now, so we can re-enable it for ARM.

(cherry picked from commit bea91b90180f075c974d58be438b80e3da8607ca)

* Add `ensure_ascii=False` in trigger dag run API (#28451)

* Add ensure_ascii=False in trigger dag run api

* Fix static checks

(cherry picked from commit c3eee4372556f9b09d3395a3f251c9ee21278846)

* Fix example import sentence in dates.py (#28453)

The example import sentence in line 51  is incorrect

(cherry picked from commit 0d12062198b2d4b0642b51da3c8f71e6f934d20d)

* Fix bad pods pickled in executor_config (#28454)

We used to pickle raw pods objects but found that when unpickling across k8s lib versions we would get missing attr errors.

Now, we serialize to json.

But we still get reports of issues when people upgrade because it only solves the issue on a go-forward basis.

But we can fix these old bad executor configs that keep popping up by roundtripping the pod to json in a more tolerant fashion than is done by the openapi-generated code, i.e. by populating missing attrs with None.

(cherry picked from commit 27f07b0bf5ed088c4186296668a36dc89da25617)

* Ensure correct log dir in file task handler (#28477)

since Path.mkdir combines with the process’ umask value to determine
the file mode and access flags, thus the newly created folder isn't
0o777

(cherry picked from commit bda39188bd127d0dd933cdff6c7e8d11ec6bf41b)

* Add retry to purge_inactive_dag_warnings (#28481)

Co-authored-by: Ephraim Anierobi <splendidzigy24@gmail.com>
(cherry picked from commit 5289938ec1d9011a9ff8625705cffd1708f9274d)

* Move MyPY plugins of ours to dev folder (#28498)

The Plugins are only used in the static check phase. The problem with
having them in "airflow" package is that mypy imports "airlfow" during
loading of the plugins and it means that it has to have fully working
Airflow configuration to work - otherwise this import fails while
reading the configuration values.

Moving the whole mypy plugins to dev solves the problem entirely.

(cherry picked from commit 1f75e9ffcf0e61115ea141bc1c5de5002ef8f2c0)

* Remove outdated Optional Provider Feature outdated documentation (#28506)

After bumping min_airflow_version to 2.3 the section about optional
provider feature and the way to add it for pre-2.3 compatible providers
is outdated and should be removed.

(cherry picked from commit 9ac76ec52604486d41d0c70984fea51ab2764525)

* Add AIRFLOW_PROJ_DIR to docker-compose example (#28517)

Add an environment variable called AIRFLOW_PROJ_DIR that allows
controlling the base directory for volumes.
This allows custom folder structure when working with the example
docker-compose.

(cherry picked from commit 3f88148080157d1d8f9e0495e7c79ad81d475fca)

* Add MSSQL support for ARM Docker images (#28533)

The #28530 re-enabled support for pymssql installation on ARM. Even
if we have no binary wheels, pymssql seems to build cleanly.

This is a follow up, to enable mssql client installation for ARM
for our images. It also adds docuemntation in Docker image docs
to explain it.

(cherry picked from commit 9c3734bb127ff0d71a0321d0578e556552cfc934)

* Consider previous run in CronTriggerTimetable (#28532)

(cherry picked from commit 6dc28fb0278c1bdb096b75b6e19acbcb1019db02)

* Emit warnings for `conf.get*` from the right source location (#28543)

`getboolean` and other typed get functions were issuing warnings from
"inside" themselves.

Before:

```
$ python ./airflow/airflow/kubernetes/kube_client.py
/home/ash/code/airflow/airflow/airflow/configuration.py:722 DeprecationWarning: The in_cluster option in [kubernetes] has been moved to the in_cluster option in [kubernetes_executor] - the old setting has been used, but please update your config.
```

After:

```
$ python ./airflow/airflow/kubernetes/kube_client.py
/home/ash/code/airflow/airflow/airflow/kubernetes/kube_client.py:89 DeprecationWarning: The in_cluster option in [kubernetes] has been moved to the in_cluster option in [kubernetes_executor] - the old setting has been used, but please update your config.
```

(cherry picked from commit f0ae250527c0494d32227ad8433c15e691f004d1)

* navbar, cap dropdown size, and add scroll bar (#28561)

* navbar, cap dropdown size, and add scroll bar

* Formatting css

* main.css, remove extra newline

(cherry picked from commit 2aa52f4ce78e1be7f34b0995d40be996b4826f26)

* Move TI setNote endpoints under TaskInstance in OpenAPI (#28566)

These endpoints were accidentally under DAG instead of TaskInstance
where they belong.

(cherry picked from commit b3e26560c7fd835570a0b3a9d65670c87c8cfe0a)

* Update baseoperator.py (#28574)

Fix baseoperator chain docstring

(cherry picked from commit 9fefbad54fe50b4ddb9d4eaba46ed051ef7960b3)

* Guard not-yet-expanded ti in trigger rule dep (#28592)

Previously, if a mapped task is not yet expanded when the trigger rule
dep is evaluated, it would raise an exception and fail the scheduler.
This adds an additional try-except to guard against this.

The problematic scenario is when a mapped task depends on another mapped
task, and its trigger rule is evaluated before that other mapped task is
expanded (e.g. the other task also has a task-mapping dependency that is
not yet finished). Since we can be certain the upstream task has not yet
satisfy the expansion dep, we can simply declare the task we're checking
as unsatisfied.

(cherry picked from commit d4dbb0077aec33e5b3b4793bf9e2902e6cbdaa7f)

* Fix ``Connection.get_extra`` type (#28594)

(cherry picked from commit 5dcbbd6f17c6b287a512b461d16a8a473b194ed9)

* Use docker compose v2 command (#28605)

(cherry picked from commit cb1d798fb80f1e7e38e4300eb7fe9b1e1a5bcee8)

* Fix Incorrect Example (#28609)

Following the existing example to create a decorator results in this error:
AttributeError: 'list' object has no attribute 'rsplit'

Changing it to a list fixes this

(cherry picked from commit 761aa59f9256fb2901039ac8ef7fd6e69af8528f)

* Minor update to Scheduler documentation (#28620)

(cherry picked from commit 48a051acc40469ce43e28353fffd265e5326926a)

* Fix code docstrings (#28622)

Fix docstrings grammar

Co-authored-by: kazanau <stanislau.kazanau@rtl-extern.de>
(cherry picked from commit 76186bb58854d851e3599fac3ff9f20feff43bfd)

* Fix typo (#28623)

(cherry picked from commit 4f7ac623c881ae0d5304d6a9d57e3e0f2aa65865)

* Fix UI caret direction (#28624)

* Fix UI caret direction

* Rename ids and fix tests

(cherry picked from commit 0ab881a4ab78ca7d30712c893a6f01b83eb60e9e)

* Add doc-strings and small improvement to email util (#28634)

(cherry picked from commit 906264dd904a8f44a1533ffb70ec0cd6b9f92a4b)

* Nest header blocks in divs to fix dagid copy nit on dag.html (#28643)

(cherry picked from commit 9aea857343c231319df4c5f47e8b4d9c8c3975e6)

* Remove outdated comments from base hook (#28649)

(cherry picked from commit 29a74699eb740f2df3bc6160fe3d747b175c76cd)

* Fix `DetachedInstanceError` when finding zombies in Dag Parsing process (#28198)

```
[2022-12-06T14:20:21.622+0000] {base_job.py:229} DEBUG - [heartbeat]
[2022-12-06T14:20:21.623+0000] {scheduler_job.py:1495} DEBUG - Finding 'running' jobs without a recent heartbeat
[2022-12-06T14:20:21.637+0000] {scheduler_job.py:1515} WARNING - Failing (2) jobs without heartbeat after 2022-12-06 14:15:21.623199+00:00
[2022-12-06T14:20:21.641+0000] {scheduler_job.py:1526} ERROR - Detected zombie job: {'full_filepath': '/opt/airflow/dags/xxx_dag.py', 'processor_subdir': '/opt/airflow/dags', 'msg': "{'DAG Id': 'xxx', 'Task Id': 'xxx', 'Run Id': 'scheduled__2022-12-05T00:15:00+00:00', 'Hostname': 'airflow-worker-0.airflow-worker.airflow2.svc.cluster.local', 'External Executor Id': '9520cb9f-3245-497a-8e17-e9dec29d4549'}", 'simple_task_instance': <airflow.models.taskinstance.SimpleTaskInstance object at 0x7f1cd4de4130>, 'is_failure_callback': True}
[2022-12-06T14:20:21.645+0000] {scheduler_job.py:763} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/local/lib/python3.10/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
    action(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
    processor_subdir=ti.dag_model.processor_subdir,
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
    return self.callable_(state, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
    raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
[2022-12-06T14:20:21.647+0000] {celery_executor.py:443} DEBUG - Inquiring about 5 celery task(s)
[2022-12-06T14:20:21.669+0000] {celery_executor.py:602} DEBUG - Fetched 5 state(s) for 5 task(s)
[2022-12-06T14:20:21.669+0000] {celery_executor.py:446} DEBUG - Inquiries completed.
[2022-12-06T14:20:21.669+0000] {scheduler_job.py:775} INFO - Exited execute loop
[2022-12-06T14:20:21.674+0000] {cli_action_loggers.py:83} DEBUG - Calling callbacks: []
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 39, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 52, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 103, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 85, in scheduler
    _run_scheduler_job(args=args)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 50, in _run_scheduler_job
    job.run()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 247, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 746, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 878, in _run_scheduler_loop
    next_event = timers.run(blocking=False)
  File "/usr/local/lib/python3.10/sched.py", line 151, in run
    action(*argument, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/event_scheduler.py", line 37, in repeat
    action(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 75, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1522, in _find_zombies
    processor_subdir=ti.dag_model.processor_subdir,
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 481, in __get__
    return self.impl.get(state, dict_)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 926, in get
    value = self._fire_loader_callables(state, key, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/attributes.py", line 962, in _fire_loader_callables
    return self.callable_(state, passive)
  File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/strategies.py", line 861, in _load_for_state
    raise orm_exc.DetachedInstanceError(
sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <TaskInstance at 0x7f1ccc3e8520> is not bound to a Session; lazy load operation of attribute 'dag_model' cannot proceed (Background on this error at: https://sqlalche.me/e/14/bhk3)
```

When in standalone dag processor mode, will use `DatabaseCallbackSink`

`_find_zombies` func call `self.executor.send_callback(request)` func.
But not propagation orm `session` , provide_session in `send` func again.

```
class DatabaseCallbackSink(BaseCallbackSink):
    """Sends callbacks to database."""

    @provide_session
    def send(self, callback: CallbackRequest, session: Session = NEW_SESSION) -> None:
        """Sends callback for execution."""
        db_callback = DbCallbackRequest(callback=callback, priority_weight=10)
        session.add(db_callback)
```

Signed-off-by: BobDu <i@bobdu.cc>
(cherry picked from commit 4b340b7561e9db0055bf69ad0fc8b3a508ea7667)

* Adding an example dag for dynamic task mapping (#28325)

(cherry picked from commit b263dbcb0f84fd9029591d1447a7c843cb970f15)

* After running the DAG the employees table is empty. (#28353)

The reason is the sql syntax error:

ERROR: subquery in FROM must have an alias
LINE 3: FROM (
^
HINT: For example, FROM (SELECT ...) [AS] foo.
SQL state: 42601
Character: 37

The change fixes the sql syntax in merge_data.

(cherry picked from commit 53893f13b2391c005eacf33115e5e50fd558a396)

* Fix calendar view for CronTriggerTimeTable dags (#28411)

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
(cherry picked from commit 467a5e3ab287013db2a5381ef4a642e912f8b45b)

* Improve provider validation pre-commit (#28516)

(cherry picked from commit e47c472e632effbfe3ddc784788a956c4ca44122)

* Add back join to zombie query that was dropped in #28198 (#28544)

#28198 accidentally dropped a join in a query, leading to this:

    airflow/jobs/scheduler_job.py:1547 SAWarning: SELECT statement has a
    cartesian product between FROM element(s) "dag_run_1", "task_instance",
    "job" and FROM element "dag". Apply join condition(s) between each element to resolve.

(cherry picked from commit a24d18a534ddbcefbcf0d8790d140ff496781f8b)

* Update pre-commit hooks (#28567)

(cherry picked from commit 837e0fe2ea8859ae879d8382142c29a6416f02b9)

* Change Architecture and OperatingSystem classies into Enums (#28627)

Since they are objects already, there is a very little overhead
into making them Enums and it has the nice property of being able
to add type hinting for the returned values.

(cherry picked from commit 8a15557f6fe73feab0e49f97b295160820ad7cfd)

* Fix minor typo in taskflow.rst (#28656)

Case change to match logging API. getlogger -> getLogger

(cherry picked from commit 068886231ac0759d3ae9dd13fc2b2727d87b2f60)

* Fix description of output redirection for access_log for gunicorn (#28672)

As of gunicorn 19.7.0, default for access_log is stdout not stderr
and our documentation has not been updated to reflect that. We are
already past that (min version of gunicorn is 20.1.0, so the
documentation of access-log flag of ours was wrong. Having the
access_log in stdout rather than stderr also allows to redirect
the access log to a separate log sink in deployments like K8S.

(cherry picked from commit 675af73ceb5bc8b03d46a7cd903a73f9b8faba6f)

* Handle ConnectionReset exception in Executor cleanup (#28685)

(cherry picked from commit a3de721e2f084913e853aff39d04adc00f0b82ea)

* Fixed typo (#28687)

(cherry picked from commit e598a1b294956448928c82a444e081ff67c6aa47)

* Row-lock TIs to be removed during mapped task expansion (#28689)

Instead of query-update, we row lock the TI to apply the update.
This protects against updating a row that has been updated by another process.

(cherry picked from commit a055d8fd9b42ae662e0c696e29066926b5346f6a)

* Allow XComArgs for external_task_ids of ExternalTaskSensor (#28692)

(cherry picked from commit 7f18fa96e434c64288d801904caf1fcde18e2cbf)

* Fix "airflow tasks render" cli command for mapped task instances (#28698)

The fix was to use the 'template_fields' attr directly since both mapped and unmapped
tasks now have that attribute.
I also had to use ti.task instead of the task from dag.get_task due to this error:
`AttributeError: 'DecoratedMappedOperator' object has no attribute 'templates_dict'` and
I wonder if this is a bug

(cherry picked from commit 1da17be37627385fed7fc06584d72e0abda6a1b5)

* Fix some docs on using sensors with taskflow (#28708)

Also add in testing to ensure that returning bool from taskflow sensors works as expected

(cherry picked from commit 12a065a38d19f4b5698962db67f5fe9ab50d420a)

* Add Niko to committers (#28712)

(cherry picked from commit 56fb1f1b8cd73b4328df5b6fc6d232788b1f7d13)

* Bump json5 from 1.0.1 to 1.0.2 in /airflow/www (#28715)

Bumps [json5](https://github.com/json5/json5) from 1.0.1 to 1.0.2.
- [Release notes](https://github.com/json5/json5/releases)
- [Changelog](https://github.com/json5/json5/blob/main/CHANGELOG.md)
- [Commits](https://github.com/json5/json5/compare/v1.0.1...v1.0.2)

---
updated-dependencies:
- dependency-name: json5
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
(cherry picked from commit e4bc5e54b1f41c991542850045bcfd060bac7395)

* Ignore Blackification commit from Blame (#28719)

(cherry picked from commit 8cb69bb05417075adebef19cd28b2409dbba3f33)

* Limit SQLAlchemy to below 2.0 (#28725)

SQLAlchemy is about to release 2.0 version and in 1.46 version it
started to warn about deprecated features that are used. This
(nicely) started to fail our builds - so our canary tests caught
it early and gave us a chance to prepare for the 2.0 release and
limit Airflow's dependencies beforehand.

This PR adds the deprecation as "known" and limits SQLAlchemy to
be <2.0 (and links to appropriate issues and documentation)

(cherry picked from commit 93fed0cf5eeed5dbea9f261370149206232fca98)

* Clarify about docker compose (#28729)

We got several requests to update syntax https://github.com/apache/airflow/pull/28728 https://github.com/apache/airflow/pull/27792 https://github.com/apache/airflow/pull/28194
lets clarify that this is not a mistake

(cherry picked from commit df0e4c9ad447377073af1ed60fb0dfad731be059)

* Update CSRF token to expire with session (#28730)

(cherry picked from commit 543e9a592e6b9dc81467c55169725e192fe95e89)

* Clarify that versioned constraints are fixed at release time (#28762)

We received a number of requests to upgrade individual dependencies in
the constraint files (mostly due to those dependencies releasing version
with vulnerabilities fixed). This is not how our constraint works, their
main purpose is to provide "consistent installation" mechanism for
anyone who installs airflow from the scratch, we are not going to keep
such relased versions up-to-date with versions of dependencies released
after the release.

This PR provides additional explanation about that in both constraint
files as well as in reference container images which follow similar
patterns.

(cherry picked from commit 8290ade26deba02ca6cf3d8254981b31cf89ee5b)

* Only patch single label when adopting pod (#28776)

When KubernetesExecutor adopts pods, it was patching the pod with the
pod it retrieved from the k8s api, while just updating a single label.
Normally this works just fine, but there are cases where the pod you
pull from the k8s api can't be used as-is when patching - it results
in a 422 `Forbidden: pod updates may not change fields other than ...`.

Instead we now just pass the single label we need to update to patch,
allowing us to avoid accidentally "updating" other fields.

Closes #24015

(cherry picked from commit 9922953bcd9e11a1412a3528aef938444d62f7fe)

* Fix UIAlert should_show when AUTH_ROLE_PUBLIC set (#28781)

* Fix UIAlert should_show when AUTH_ROLE_PUBLIC set

Co-authored-by: Tzu-ping Chung <uranusjr@gmail.com>
(cherry picked from commit f17e2ba48b59525655a92e04684db664a672918f)

* Remove swagger-ui extra from connexion and install swagger-ui-dist via npm package (#28788)

(cherry picked from commit 35ad16dc0f6b764322b1eb289709e493fbbb0ae0)

* Update dynamic-task-mapping.rst (#28797)

(cherry picked from commit 6ca67ba98ee74c1b42a93f9812ddb8a0e02c041d)

* Fix masking of non-sensitive environment variables (#28802)

Environment variables are hidden even when we set expose-config to non-sensitive-only.
This PR changes it to work like every other source, the items are only
hidden when they are sensitive

(cherry picked from commit 0a8d0ab56689c341e65a36c0287c9d635bae1242)

* Update scheduler docs about low priority tasks (#28831)

Gathered insights from discussion in https://github.com/apache/airflow/issues/26933 into a paragraph in scheduler docs to clarify why sometimes low priority tasks are scheduled before high priority tasks

(cherry picked from commit 493b433ad57088a5f5cabc466c949445e500b4c1)

* Fix taskflow.rst duplicated "or" (#28839)

Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com>
(cherry picked from commit 7d57f5696885eb2a4cd64d56bf79d6a8e5a5d638)

* Update config hash in Breeze's README.md during reinstalllation (#28148)

Previously we updated Breeze's config hash using pre-commit whenever
setup files changed. This has proven to be brittle.

When you locally work and add new dependencies, breeze would keep
reinstalling every time you run it locally - without the README
being updated. You'd have to manually run pre-commit in order to
get it regenerated.

This PR adds a new flow. Whenever you automatically
re-install breeze, the README.md file of the folder from which
you reinstall breeze gets updated with the new hash **just** before
reinstalling. This means that after installation the new hash is
already present in the package, and next time you run breeze it
will match the changed hash of your dependencies.

The only thing left is to commit the changed README to the repo
together with setup.py/cfg changes of yours.

Pre-commit is still run on commit to verify that the hash of
the config files is good.

(cherry picked from commit 5bac5b39ffa415d535d629ddc4992337317a9c0e)

* Add inputimeout as dependency to breeze-cmd-line pre-commit deps (#28299)

(cherry picked from commit 504e2c29ef1ea070291f14d1284de403a433f157)

* Show mapped task groups in graph (#28392)

* Show mapped task groups in graph

* generate correct group tooltip summary

* fix tests

(cherry picked from commit 78b72f4fa07cac009ddd6d43d54627381e3e9c21)

* Only get changelog for core commits (#27900)

(cherry picked from commit bad875b58d7768581f97cd432295ed303d4e4c09)

* Strip provider test details for mssql tests in CI (#27938)

When we have a big number of providers changed in a PR and test
are run in self-hosted environment, the file path generated for
mssql data volume was too long - because it contained all
providers that were changed. However this is not necessary because
there is at most one Providers test type run during those tests
and "Providers" is enough to guarantee uniqueness of the volume
name.

This PR strips provider details from the volume name.

(cherry picked from commit 0d0a62de94668530ae64a2a183a8e88718d6069f)

* Make updating breeze deps via breeze easier. (#27958)

In trying to update the deps of breeze, I made a change to `setup.cfg`
and then rand `breeze static-checks update-breeze-readme-config-hash`
and ran in to two problems that this fixes

1. It prevents a self-update loop, where the change to `setup.cfg` was
   detected, but the hash in the README hasn't been updated, so it just
   came around again and just tried to reinstall again and again and
   again.

2. This correctly sets/maintains `argv[0]` for the re-exec'd process
   (which is what `sys.executable` gets set to) so that when we do
   `assert_pre_commit_installed` and try to find the pre-commit version,
   we don't invoke breeze again by mistake!

(cherry picked from commit 67b958c4e3bd720abac3a81cf0348427740f3627)

* Make breeze SVG output more stable (#27960)

Rich 12.6.0 introduces a new feature that lets us control the prefix
used for class names/ids in the generated SVG -- which should mean that
after this change is written a single word change to help text will not
cause the _entire_ SVG to be re-generated

(cherry picked from commit b5084bcef9c4b0304a1643f891e1b8c80c8b0621)

* better warning messages when running breeze in rosetta (#27874)

(cherry picked from commit 527fbce462429fc9836837378f801eed4e9d194f)

* Fix UnboundLocalError in breeze (#28126)

* Fix UnboundLocalError in breeze

I got the following error after installing breeze:
`UnboundLocalError: local variable 'TimeoutOccurred' referenced before assignment`
It seems that changing the import place fixes the issue.

(cherry picked from commit 82af388de2032d1c44f0c11a1759fcb96b245d0d)

* Check if ghcr.io is reachable before network-bound breeze operations (#28137)

Unfortunately, ghcr.io behaves nasty when token you logged in with to it
expired. It refuses to pull the images, even if they are public.

This PR adds extra check for all network-bound commands that require
ghcr.io access.

(cherry picked from commit c4224e28fc94219cde2c15b5e7993cf76772ad7a)

* Summarize all warnings from all builds (#28151)

(cherry picked from commit 1bb594e99cf0c363eed9736260dcb6201aa010e6)

* Don't check rosetta on i386 (#28169)

(cherry picked from commit 2d86a123da8deea139164094b9bdd31cb063817d)

* Force higher parallelism for waiting for images in CI (#28209)

Default parallelism for public runners is 2 because they have 2
CPUS. However image pulling is mostly about I/O (CPU is only really
used when images are extracted and when tests are run - a bit).

With parallelism = 2 the 4 images are serializing (first 2 images
are pulled and tested and then 2 remaining ones)

By setting parallelism to 6 we are allowing all 4 images to run in
parallel and we are safe for 3.11 when it is out to also run in
parallel). That should save ~2 minutes for image pulling.

(cherry picked from commit 1ed01b58752650985d67127acfb19705ca0c967f)

* Improve caching for pre-commits in CI (#28240)

Configuration of caching for pre-commits in CI has been broken:

* full pre-commit cache had `pre-commit-` instead of `pre-commit-full-`
* basic checks nev…
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug provider:amazon-aws AWS/Amazon - related issues
Projects
None yet