diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index d66c42d68..d29d06b84 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -65,7 +65,7 @@ def callback(body, message): * Type: Virtual * Supports Direct: Yes * Supports Topic: Yes -* Supports Fanout: No +* Supports Fanout: Yes * Supports Priority: No * Supports TTL: No @@ -86,12 +86,16 @@ def callback(body, message): * ``store_processed`` - if set to True, all processed messages are backed up to ``processed_folder``. * ``processed_folder`` - directory where are backed up processed files. +* ``control_folder`` - directory where are exchange-queue table stored. """ import os import shutil import tempfile import uuid +from collections import namedtuple +from contextlib import contextmanager +from pathlib import Path from queue import Empty from time import monotonic @@ -128,6 +132,7 @@ def unlock(file): hfile = win32file._get_osfhandle(file.fileno()) win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped) + elif os.name == 'posix': import fcntl @@ -140,14 +145,61 @@ def lock(file, flags): def unlock(file): """Remove file lock.""" fcntl.flock(file.fileno(), fcntl.LOCK_UN) + + else: raise RuntimeError( 'Filesystem plugin only defined for NT and POSIX platforms') +exchange_queue_t = namedtuple("exchange_queue_t", + ["routing_key", "pattern", "queue"]) + + class Channel(virtual.Channel): """Filesystem Channel.""" + supports_fanout = True + + @contextmanager + def _get_exchange_file_obj(self, exchange, mode="rb"): + file = self.control_folder / f"{exchange}.exchange" + if "w" in mode: + self.control_folder.mkdir(exist_ok=True) + f_obj = file.open(mode) + + try: + if "w" in mode: + lock(f_obj, LOCK_EX) + yield f_obj + except OSError: + raise ChannelError(f"Cannot open {file}") + finally: + if "w" in mode: + unlock(f_obj) + f_obj.close() + + def get_table(self, exchange): + try: + with self._get_exchange_file_obj(exchange) as f_obj: + exchange_table = loads(bytes_to_str(f_obj.read())) + return [exchange_queue_t(*q) for q in exchange_table] + except FileNotFoundError: + return [] + + def _queue_bind(self, exchange, routing_key, pattern, queue): + queues = self.get_table(exchange) + queue_val = exchange_queue_t(routing_key or "", pattern or "", + queue or "") + if queue_val not in queues: + queues.insert(0, queue_val) + with self._get_exchange_file_obj(exchange, "wb") as f_obj: + f_obj.write(str_to_bytes(dumps(queues))) + + def _put_fanout(self, exchange, payload, routing_key, **kwargs): + for q in self.get_table(exchange): + self._put(q.queue, payload, **kwargs) + def _put(self, queue, payload, **kwargs): """Put `message` onto `queue`.""" filename = '{}_{}.{}.msg'.format(int(round(monotonic() * 1000)), @@ -266,10 +318,19 @@ def store_processed(self): def processed_folder(self): return self.transport_options.get('processed_folder', 'processed') + @property + def control_folder(self): + return Path(self.transport_options.get('control_folder', 'control')) + class Transport(virtual.Transport): """Filesystem Transport.""" + implements = virtual.Transport.implements.extend( + asynchronous=False, + exchange_type=frozenset(['direct', 'topic', 'fanout']) + ) + Channel = Channel # filesystem backend state is global. global_state = virtual.BrokerState() diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py index a8d1708b2..fdf233ae4 100644 --- a/t/unit/transport/test_filesystem.py +++ b/t/unit/transport/test_filesystem.py @@ -1,4 +1,5 @@ import tempfile +from queue import Empty import pytest @@ -138,3 +139,96 @@ def callback2(message_data, message): assert self.q2(consumer_channel).get() self.q2(consumer_channel).purge() assert self.q2(consumer_channel).get() is None + + +@t.skip.if_win32 +class test_FilesystemFanout: + def setup(self): + try: + data_folder_in = tempfile.mkdtemp() + data_folder_out = tempfile.mkdtemp() + control_folder = tempfile.mkdtemp() + except Exception: + pytest.skip("filesystem transport: cannot create tempfiles") + + self.consumer_connection = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_in, + "data_folder_out": data_folder_out, + "control_folder": control_folder, + }, + ) + self.consume_channel = self.consumer_connection.channel() + self.produce_connection = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_out, + "data_folder_out": data_folder_in, + "control_folder": control_folder, + }, + ) + self.producer_channel = self.produce_connection.channel() + self.exchange = Exchange("filesystem_exchange_fanout", type="fanout") + self.q1 = Queue("queue1", exchange=self.exchange) + self.q2 = Queue("queue2", exchange=self.exchange) + + def teardown(self): + # make sure we don't attempt to restore messages at shutdown. + for channel in [self.producer_channel, self.consumer_connection]: + try: + channel._qos._dirty.clear() + except AttributeError: + pass + try: + channel._qos._delivered.clear() + except AttributeError: + pass + + def test_produce_consume(self): + + producer = Producer(self.producer_channel, self.exchange) + consumer1 = Consumer(self.consume_channel, self.q1) + consumer2 = Consumer(self.consume_channel, self.q2) + self.q2(self.consume_channel).declare() + + for i in range(10): + producer.publish({"foo": i}) + + _received1 = [] + _received2 = [] + + def callback1(message_data, message): + _received1.append(message) + message.ack() + + def callback2(message_data, message): + _received2.append(message) + message.ack() + + consumer1.register_callback(callback1) + consumer2.register_callback(callback2) + + consumer1.consume() + consumer2.consume() + + while 1: + try: + self.consume_channel.drain_events() + except Empty: + break + + assert len(_received1) + len(_received2) == 20 + + # queue.delete + for i in range(10): + producer.publish({"foo": i}) + assert self.q1(self.consume_channel).get() + self.q1(self.consume_channel).delete() + self.q1(self.consume_channel).declare() + assert self.q1(self.consume_channel).get() is None + + # queue.purge + assert self.q2(self.consume_channel).get() + self.q2(self.consume_channel).purge() + assert self.q2(self.consume_channel).get() is None