-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Describe the bug
In Python, I have to reconnect Client objects a lot. Usually due to bugs. client.close and client.shutdown are sufficient to disconnect a client object. However, a disconnected client object can't be reconnected, so instead I re-instantiate a brand new Client object to replace it.
While doing this, some of my production services started crashing due to file descriptor exhaustion.
It turns out that, if you have called .subscribe at least once, closing a consumer object and closing a client do not close all file handles opened on the system. Those handles (a fair number of them--one or two per worker thread per partition, it looks like) leak, consuming resources and putting the process closer to exhaustion.
This seems to only happen if:
- The topic is partitioned.
- The topic is not automatically created by the subscription, and is pre-existing.
To Reproduce
- Create a partitioned, persistent topic with at least 2 partitions.
- Update the below code to use the name of the topic you created.
- Run the below code.
- Observe the filehandle diff printed. Observe that the number of open filehandles on the system grows over time.
Expected behavior
- Creating then
.closeing aConsumerobject should result in net zero file handle changes on the system. - Creating then
.closeing aProducerobject should result in net zero file handle changes on the system. - Creating then
closeing aClientobject should result in net zero file handle changes on the system.
Code to reproduce
from contextlib import contextmanager
import logging
from pulsar import Client, ConsumerType
TOPIC_NAME = "persistent://your topic name here"
def get_open_filehandles():
import os
import subprocess
lines = subprocess.run(['lsof', '-p', str(os.getpid())], capture_output=True)
return sorted(lines.stdout.decode().split('\n'))
@contextmanager
def diff_file_handles():
initial = set(get_open_filehandles())
try:
yield
finally:
final = set(get_open_filehandles())
for fh in final:
if fh not in initial:
print("NEW FILEHANDLE", fh)
for fh in initial:
if fh not in final:
print("CLOSED FILEHANDL", fh)
print("Handles before:", len(initial), "Handles after:", len(final))
def consume_and_toss():
client = Client(
service_url='pulsar://localhost',
logger=logging.getLogger(),
io_threads=1,
message_listener_threads=1,
)
sub = client.subscribe(
topic=TOPIC_NAME,
subscription_name='testsub',
receiver_queue_size=1,
max_total_receiver_queue_size_across_partitions=1,
consumer_type=ConsumerType.KeyShared,
replicate_subscription_state_enabled=False,
)
sub.close()
del sub
client.shutdown()
client.close()
del client
def main():
# Prime caches, load dylibs:
consume_and_toss()
for _ in range(4):
print("ITERATING")
with diff_file_handles():
consume_and_toss()
if __name__ == '__main__':
main()Desktop (please complete the following information):
- OS: MacOS monterey.
- Pulsar: 2.9.1 via Homebrew.
- Client: 2.9.1.