Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run out of ports when 65000+ messages are sent via 65000+ threads #520

Closed
7 tasks done
manjusfi opened this issue Jan 4, 2023 · 6 comments
Closed
7 tasks done

Run out of ports when 65000+ messages are sent via 65000+ threads #520

manjusfi opened this issue Jan 4, 2023 · 6 comments

Comments

@manjusfi
Copy link

manjusfi commented Jan 4, 2023

Issues

GitHub issues are for bugs. If you have questions, please ask them on the discussion board.

Checklist

  • Does your title concisely summarize the problem?
  • Did you include a minimal, reproducible example?
  • What OS are you using?
  • What version of Dramatiq are you using?
  • What did you do?
  • What did you expect would happen?
  • What happened?

What OS are you using?

Happens both on Ubuntu 16.04 or macOS 10.13.3

What version of Dramatiq are you using?

1.13.0

What did you do?

Tried to create 65000+ threads and send one message per thread

What did you expect would happen?

Messages to be dispatched without any error

What happened?

Got an exception - OSError98/99

@manjusfi
Copy link
Author

manjusfi commented Jan 7, 2023

import os
import sys, traceback
import pika
import dramatiq
from queue import Queue
from dramatiq.brokers.rabbitmq import RabbitmqBroker
import threading
from threading import Thread
from concurrent.futures import ThreadPoolExecutor

from ssl import SSLContext, PROTOCOL_TLSv1_2
credentials=pika.PlainCredentials(os.environ["RABBITMQ_USERNAME"], os.environ["RABBITMQ_PASSWORD"])
host = os.environ["RABBITMQ_HOST"]
port = os.environ["RABBITMQ_PORT"]
ssl_options = pika.SSLOptions(SSLContext(PROTOCOL_TLSv1_2))

broker = RabbitmqBroker(credentials=credentials, host=host, port=port, ssl_options=ssl_options, heartbeat=600)
dramatiq.set_broker(broker)

@dramatiq.actor
def hello_world(input, input2=None):
	print(f"hello world -> {input} {input2}")

def target_func(input, input2, input3="dummy"):
	try:
		print(f"{threading.current_thread().getName()} -> invoking async send with input -> {input}")
		hello_world.send(input, input2)
	except Exception as e:
		print(f"{threading.current_thread().getName()} -> Exception in target_func: -> {traceback.format_exc()}")

max_range = 70000

def repro_issue():
	threads = []
	count = 0
	for i in range(max_range):
	# for i in range(6000):
		t = Thread(target=target_func, args=[str(i), str(i+1)])
		# t = Thread(target=hello_world, args=[str(i)])
		threads.append(t)
		if count % 300 == 0:
			for p in threads:
				p.start()
			for p in threads:
				p.join()
			threads = []
		count = count + 1
	for p in threads:
		p.start()
	for p in threads:
		p.join()

repro_issue()

@Bogdanp
Copy link
Owner

Bogdanp commented Jan 8, 2023

Spawning so many OS threads is likely to be counterproductive. Regardless, you can solve your problem by calling broker.close() (or, better, del broker.connection) in a finally block in target_func.

@Bogdanp Bogdanp closed this as completed Jan 8, 2023
@manjusfi
Copy link
Author

manjusfi commented Jan 8, 2023

I don't think broker.close() would work though as we don't want to close all the connections and channels for every message that is sent. Your comment on spawning so many OS threads is valid though.

@manjusfi
Copy link
Author

manjusfi commented Jan 8, 2023

#523 - PR with a fix

@Bogdanp
Copy link
Owner

Bogdanp commented Jan 8, 2023

I don't think broker.close() would work though as we don't want to close all the connections and channels for every message that is sent. Your comment on spawning so many OS threads is valid though.

del broker.connection (suggested in the edit) would close only the connections used to publish the messages.

@manjusfi
Copy link
Author

manjusfi commented Jan 8, 2023

@Bogdanp - isn't closing a connection for every message sent a very sub-optimal way of handling this issue? The problem stems from Pika connections not being thread safe. Hence in my PR i am creating a pool of connections (one per publisher thread), that way the number of Pika connections that dramatiq opens will almost remain a constant (limited by TCP/IP state management)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants