Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"Closing the transport connection timed out." caused by race condition #1975

Closed
robin-mader-bis opened this issue Feb 22, 2024 · 9 comments · Fixed by #2085
Closed

"Closing the transport connection timed out." caused by race condition #1975

robin-mader-bis opened this issue Feb 22, 2024 · 9 comments · Fixed by #2085
Labels
agent-python bug community Issues opened by the community

Comments

@robin-mader-bis
Copy link

robin-mader-bis commented Feb 22, 2024

Describe the bug: Occasionally, when using the elasticapm.Client (without a framework), during process shutdown (in the atexit handler), the transport thread will block forever while trying to send data to the APM server and subsequently be killed by the thread manager, after the configured timeout is reached. This causes "Closing the transport connection timed out." to be printed to the command line and the messages remaining in the buffer to be lost.

This seems to be caused by a race condition involving the atexit handler of the elasticapm.Client and the weakref.finalize of urllib3.connectionpool.HTTPConnectionPool (which uses an atexit handler under the hood) which calls _close_pool_connections. A timeline causing this bugs looks like this:

  1. The process is about to shutdown. atexit handlers are called.
  2. _close_pool_connections is called while all connections are in the pool. All existing connections are disposed.
  3. The elasticapm.Client atexit handler is called, sending the "close" event to the transport thread.
  4. The transport thread handles the "close" event, flushing the buffer and trying to send remaining data to the APM server.
  5. urlopen will block the transport thread forever while waiting to get a connection from the connection pool (since the poolmanager uses block=True and no pool timeout is configured, this will block forever, because the only way to get a connection from the pool this way, is for someone else to put a connection into the pool).
  6. The thread manager kills the thread after the configured timeout is reached, printing the error message and loosing all data in the buffer

The reason why this does occur consistently, is because _close_pool_connections will not clean up connections which are currently in use (e.g. connections being used in another thread). If a request is in progress when _close_pool_connections is called, the associated connection "survives" the cleanup and will be added back to the pool afterwards and can be reused by the transport thread (which may be a bug/unintended behavior of urllib3 since it claims HTTPConnectionPool is thread safe).

To Reproduce

The following minimal example reproduces the issue:

import time

import elasticapm

# NOTE: You should be able to remove the "config" argument in your environment
client = elasticapm.Client(
    service_name="<SERVICE>",
    server_url="<APM_SERVER_URL>",
    secret_token="<SECRET_TOKEN>",
    config={
        "SERVER_CA_CERT_FILE": "<INTERNAL_CA_FILE_PATH>",
        "GLOBAL_LABELS": {"Tenant": "<TENANT>"},
    },
)

client.capture_message("Test")

# Give the client time to resolve all internal network requests, ensuring
# that all urllib connections are in the pool when the atexit handlers are called
time.sleep(10)

As is the case with race conditions, you might have to fiddle with the sleep timing a little bit. 10 seconds work quite reliable in my environment, but you may need a few more/less seconds, depending on your environment.

Environment

  • OS: Windows 10
  • Python version: 3.11.7
  • package versions: urllib3==2.2.1
  • APM Server version: 8.11.3
  • Agent version: elastic-apm==6.20.0

Additional context

A workaround for my use case is to use a custom Transport class which uses a non-blocking pool. I don't know enough about the elastic-apm code base to know whether or not this causes issues in other parts of the package, but it seems to resolve the issue for me without causing any other major issues.

from elasticapm.transport.http import Transport

def get_import_string(cls) -> str:
    module = cls.__module__
    if module == "builtins":
        # avoid outputs like 'builtins.str'
        return cls.__qualname__
    return module + "." + cls.__qualname__

class NonBlockingTransport(Transport):
    def __init__(self, *args, **kwargs) -> None:
        super(NonBlockingTransport, self).__init__(*args, **kwargs)
        self._pool_kwargs["block"] = False

# Use like this:
client = elasticapm.Client(
    ...,
    config={
        ...,
        "TRANSPORT_CLASS": get_import_string(NonBlockingTransport),
    },
)
@github-actions github-actions bot added agent-python community Issues opened by the community triage Issues awaiting triage labels Feb 22, 2024
@basepi basepi added bug and removed triage Issues awaiting triage labels Feb 22, 2024
@basepi
Copy link
Contributor

basepi commented Feb 22, 2024

Thanks for the report! This isn't very high priority since it only happens on shutdown and emits an error. But we should definitely get this fixed.

@ngocmac
Copy link

ngocmac commented May 7, 2024

Hello,

I encountered the same problem when trying to monitor a Python script in APM Elastic.
Here is my code:

if __name__ == "__main__":

    client = Client(config = ES_APM_CONFIGURATION)
    elasticapm.instrument()
    client.capture_message("Start job do_something »)
    client.begin_transaction(transaction_type="script")
    do_something()
    client.end_transaction(name=__name__, result="success")

In ES_APM_CONFIGURATION, I have: SERVICE_NAME, SECRET_TOKEN, SERVER_URL, SERVICE_VERSION, ENABLED, ENVIRONMENT

I tried to add function get_import_string as suggested by @robin-mader-bis but I got an error
AttributeError: 'NonBlockingTransport' object has no attribute '_pool_kwargs'

I just received the message Start job do_something and nothing else. I don’t know how to resolve this problem.

Thanks,

@mortenbohne
Copy link

Hello,

I encountered the same problem when trying to monitor a Python script in APM Elastic. Here is my code:

if __name__ == "__main__":

    client = Client(config = ES_APM_CONFIGURATION)
    elasticapm.instrument()
    client.capture_message("Start job do_something »)
    client.begin_transaction(transaction_type="script")
    do_something()
    client.end_transaction(name=__name__, result="success")

In ES_APM_CONFIGURATION, I have: SERVICE_NAME, SECRET_TOKEN, SERVER_URL, SERVICE_VERSION, ENABLED, ENVIRONMENT

I tried to add function get_import_string as suggested by @robin-mader-bis but I got an error AttributeError: 'NonBlockingTransport' object has no attribute '_pool_kwargs'

I just received the message Start job do_something and nothing else. I don’t know how to resolve this problem.

Thanks,

I had the same issue at first, but make sure the Transport class you are inheriting from is elasticapm.transport.http.Transport (my first attempt was from elasticapm.transport.base.Transport and got the same error as you)

@xrmx
Copy link
Member

xrmx commented Jun 6, 2024

Modified the original poster message to include the import

@fxdgear
Copy link

fxdgear commented Jun 13, 2024

I'm having this same issue as well.

I've implemented the workaround listed in the description, but now I'm seeing some warnings coming from urllib

Connection pool is full, discarding connection: xxx.apm.eastus.aws.elastic-cloud.com. Connection pool size: 1

@WolfEYc
Copy link

WolfEYc commented Jun 17, 2024

+1

@xrmx
Copy link
Member

xrmx commented Jun 20, 2024

@robin-mader-bis It has been suggested to me to set the pools to non-blocking only at shutdown. That could be something like the following untested patch.

diff --git a/elasticapm/transport/http.py b/elasticapm/transport/http.py
index 17ae50ba..fe10fb7f 100644
--- a/elasticapm/transport/http.py
+++ b/elasticapm/transport/http.py
@@ -250,6 +250,18 @@ class Transport(HTTPTransportBase):
             return self._server_ca_cert_file
         return certifi.where() if (certifi and self.client.config.use_certifi) else None
 
+    def close(self) -> None:
+        """
+        Cleans up resources and closes connection
+        :return:
+        """
+        if self._closed or (not self._thread or self._thread.pid != os.getpid()):
+            return
+        # Set the pool to non-blocking to avoid a race condition
+        if self._http:
+            self._http.block = False
+        super().close()
+
 
 def version_string_to_tuple(version):
     if version:

@xrmx
Copy link
Member

xrmx commented Jul 17, 2024

Ok, spent some time on this today, the previous snippet will not work because the urllib3 connection pool weakref.finalize callback will be called before our atexit callback and so the pool would be already gone. Instead we can force the creation of a new one as below:

diff --git a/elasticapm/transport/http.py b/elasticapm/transport/http.py
index 17ae50ba..91e52e7e 100644
--- a/elasticapm/transport/http.py
+++ b/elasticapm/transport/http.py
@@ -32,6 +32,7 @@
 
 import hashlib
 import json
+import os
 import re
 import ssl
 import urllib.parse
@@ -250,6 +251,23 @@ class Transport(HTTPTransportBase):
             return self._server_ca_cert_file
         return certifi.where() if (certifi and self.client.config.use_certifi) else None
 
+    def close(self):
+        if self._closed or (not self._thread or self._thread.pid != os.getpid()):
+            return
+
+        self._closed = True
+        # we are racing against urllib3 ConnectionPool weakref finalizer that leads to having the pool closed
+        # and we hanging trying to send any eventual queued data.
+        # Force the creation of a new PoolManager so that we are always able to flush our queue
+        self._http = None
+        self.queue("close", None)
+        if not self._flushed.wait(timeout=self._max_flush_time_seconds):
+            logger.error("Closing the transport connection timed out.")
+
+        # FIXME: does it make sense to try to cleanup the urrlib3 pool manually?
+
+    stop_thread = close
+
 
 def version_string_to_tuple(version):
     if version:

@pquentin what do you think?

@xrmx
Copy link
Member

xrmx commented Jul 18, 2024

@robin-mader-bis would be great if you can take a look at #2085

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
agent-python bug community Issues opened by the community
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants