From 520c8c07a5d98d7ddef2424aec2f8c89a17781d4 Mon Sep 17 00:00:00 2001 From: Yuriy Halytskyy Date: Sun, 6 Dec 2020 15:17:50 +1300 Subject: [PATCH 1/7] Create a folder for each queue when using filesystem transport and add fanout support --- kombu/transport/filesystem.py | 152 ++++++++++++++++++++++------------ 1 file changed, 98 insertions(+), 54 deletions(-) diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index d66c42d68..121cb0ff9 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -65,7 +65,7 @@ def callback(body, message): * Type: Virtual * Supports Direct: Yes * Supports Topic: Yes -* Supports Fanout: No +* Supports Fanout: Yes * Supports Priority: No * Supports TTL: No @@ -94,6 +94,7 @@ def callback(body, message): import uuid from queue import Empty from time import monotonic +from collections import namedtuple from kombu.exceptions import ChannelError from kombu.utils.encoding import bytes_to_str, str_to_bytes @@ -148,43 +149,94 @@ def unlock(file): class Channel(virtual.Channel): """Filesystem Channel.""" + supports_fanout = True + + def get_table(self, exchange): + filename = '{}.exchange'.format(exchange) + filename = os.path.join(self.control_folder,filename) + try: + f = open(filename,'r') + exchange_table = loads(bytes_to_str(f.read())) + result = [tuple(q) for q in exchange_table] + return result + except FileNotFoundError: + return [] + except OSError: + raise ChannelError( + f'Cannot open {filename}') + finally: + f.close() + + def _queue_bind(self, exchange, routing_key, pattern, queue): + filename = '{}.exchange'.format(exchange) + filename = os.path.join(self.control_folder,filename) + try: + d_fileno = os.open(self.control_folder,os.O_RDONLY) + d = namedtuple('Directory',[]) + d.fileno = lambda: d_fileno + + lock(d,LOCK_EX) + if os.path.isfile(filename): + f = open(filename,'r') + exchange_table = loads(bytes_to_str(f.read())) + f.close() + else: + exchange_table = [] + queues = [tuple(q) for q in exchange_table] + queue_val = (routing_key or '', pattern or '', queue or '' ) + if (queue_val not in queues): + exchange_table = [queue_val] + queues + f = open(filename,'wb') + f.write(str_to_bytes(dumps(exchange_table))) + except OSError: + raise ChannelError( + f'Cannot open {filename!r}') + finally: + unlock(d) + if (f): + f.close() + + def _put_fanout(self, exchange, payload, routing_key, **kwargs): + for q in self.get_table(exchange): + self._put(q[2], payload, **kwargs) + def _put(self, queue, payload, **kwargs): """Put `message` onto `queue`.""" - filename = '{}_{}.{}.msg'.format(int(round(monotonic() * 1000)), - uuid.uuid4(), queue) - filename = os.path.join(self.data_folder_out, filename) - + queue_folder = os.path.join(self.data_folder_out, queue) + filename = '{}_{}.msg'.format(int(round(monotonic() * 1000)),uuid.uuid4()) + filename = os.path.join(queue_folder, filename) + f = None # define file descriptor try: + os.makedirs(queue_folder, exist_ok = True) f = open(filename, 'wb') lock(f, LOCK_EX) f.write(str_to_bytes(dumps(payload))) except OSError: - raise ChannelError( - f'Cannot add file {filename!r} to directory') + raise ChannelError(f'Cannot create {filename}') finally: - unlock(f) - f.close() + if (f): + unlock(f) + f.close() def _get(self, queue): """Get next message from `queue`.""" - queue_find = '.' + queue + '.msg' - folder = os.listdir(self.data_folder_in) - folder = sorted(folder) - while len(folder) > 0: - filename = folder.pop(0) - - # only handle message for the requested queue - if filename.find(queue_find) < 0: - continue - + queue_folder = os.path.join(self.data_folder_in, queue) + if (not os.path.exists(queue_folder)): + raise Empty() + + messages = os.listdir(queue_folder) + messages = sorted(messages) + while len(messages) > 0: + filename = messages.pop(0) if self.store_processed: - processed_folder = self.processed_folder + processed_folder = os.path.join(self.processed_folder, queue) + os.makedirs(processed_folder, exist_ok = True) else: processed_folder = tempfile.gettempdir() try: # move the file to the tmp/processed folder - shutil.move(os.path.join(self.data_folder_in, filename), + shutil.move(os.path.join(queue_folder, filename), processed_folder) except OSError: pass # file could be locked, or removed in meantime so ignore @@ -208,43 +260,24 @@ def _purge(self, queue): """Remove all messages from `queue`.""" count = 0 queue_find = '.' + queue + '.msg' + queue_folder = os.path.join(self.data_folder_out, queue) - folder = os.listdir(self.data_folder_in) - while len(folder) > 0: - filename = folder.pop() - try: - # only purge messages for the requested queue - if filename.find(queue_find) < 0: - continue - - filename = os.path.join(self.data_folder_in, filename) - os.remove(filename) - - count += 1 - - except OSError: - # we simply ignore its existence, as it was probably - # processed by another worker - pass - - return count - + try: + count = len(os.listdir(queue_folder)) + shutil.rmtree(queue_folder) + return count + except OSError: + # we simply ignore its existence, as it was probably + # processed by another worker + return 0 + def _size(self, queue): """Return the number of messages in `queue` as an :class:`int`.""" - count = 0 - - queue_find = f'.{queue}.msg' - folder = os.listdir(self.data_folder_in) - while len(folder) > 0: - filename = folder.pop() - - # only handle message for the requested queue - if filename.find(queue_find) < 0: - continue - - count += 1 + try: + return len(os.listdir(self.data_folder_in)) + except OSError: + return 0 - return count @property def transport_options(self): @@ -266,10 +299,21 @@ def store_processed(self): def processed_folder(self): return self.transport_options.get('processed_folder', 'processed') + @cached_property + def control_folder(self): + return self.transport_options.get('control_folder', 'control') + + class Transport(virtual.Transport): """Filesystem Transport.""" + implements = virtual.Transport.implements.extend( + asynchronous=False, + exchange_type=frozenset(['direct', 'topic', 'fanout']) + ) + + Channel = Channel # filesystem backend state is global. global_state = virtual.BrokerState() From 306d9220bc08db2fab6645480f425ade0de451e2 Mon Sep 17 00:00:00 2001 From: Yuriy Halytskyy Date: Sun, 6 Dec 2020 15:53:59 +1300 Subject: [PATCH 2/7] clean up unused variables --- kombu/transport/filesystem.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index 121cb0ff9..bdbd404d9 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -258,8 +258,6 @@ def _get(self, queue): def _purge(self, queue): """Remove all messages from `queue`.""" - count = 0 - queue_find = '.' + queue + '.msg' queue_folder = os.path.join(self.data_folder_out, queue) try: From 4961e261f8927b851eafb1b1b84feed7cc6da1db Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Thu, 3 Mar 2022 19:47:35 +0800 Subject: [PATCH 3/7] Add fanout support to filesystem transport filesystem transport lacks of fanout support. 1. Add fanout support to filesystem transport. 2. Add a unit test for it. --- kombu/transport/filesystem.py | 183 +++++++++++++--------------- t/unit/transport/test_filesystem.py | 153 +++++++++++++++++------ 2 files changed, 203 insertions(+), 133 deletions(-) diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index bdbd404d9..0af250056 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -89,12 +89,14 @@ def callback(body, message): """ import os +import glob import shutil import tempfile import uuid from queue import Empty from time import monotonic from collections import namedtuple +from contextlib import contextmanager from kombu.exceptions import ChannelError from kombu.utils.encoding import bytes_to_str, str_to_bytes @@ -104,10 +106,10 @@ def callback(body, message): from . import virtual VERSION = (1, 0, 0) -__version__ = '.'.join(map(str, VERSION)) +__version__ = ".".join(map(str, VERSION)) # needs win32all to work on Windows -if os.name == 'nt': +if os.name == "nt": import pywintypes import win32con @@ -122,14 +124,15 @@ def callback(body, message): def lock(file, flags): """Create file lock.""" hfile = win32file._get_osfhandle(file.fileno()) - win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped) + win32file.LockFileEx(hfile, flags, 0, 0xFFFF0000, __overlapped) def unlock(file): """Remove file lock.""" hfile = win32file._get_osfhandle(file.fileno()) - win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped) + win32file.UnlockFileEx(hfile, 0, 0xFFFF0000, __overlapped) -elif os.name == 'posix': + +elif os.name == "posix": import fcntl from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa @@ -141,9 +144,13 @@ def lock(file, flags): def unlock(file): """Remove file lock.""" fcntl.flock(file.fileno(), fcntl.LOCK_UN) + + else: - raise RuntimeError( - 'Filesystem plugin only defined for NT and POSIX platforms') + raise RuntimeError("Filesystem plugin only defined for NT and POSIX platforms") + + +exchange_queue_t = namedtuple("exchange_queue_t", ["routing_key", "pattern", "queue"]) class Channel(virtual.Channel): @@ -151,131 +158,114 @@ class Channel(virtual.Channel): supports_fanout = True + @contextmanager + def _get_exchange_file_obj(self, exchange, mode="rb"): + filename = "{}.exchange".format(exchange) + filename = os.path.join(self.control_folder, filename) + try: + f_obj = open(filename, mode) + if "w" in mode: + lock(f_obj, LOCK_EX) + yield f_obj + except OSError: + raise ChannelError(f"Cannot open {filename}") + finally: + if "w" in mode: + unlock(f_obj) + f_obj.close() + def get_table(self, exchange): - filename = '{}.exchange'.format(exchange) - filename = os.path.join(self.control_folder,filename) try: - f = open(filename,'r') - exchange_table = loads(bytes_to_str(f.read())) - result = [tuple(q) for q in exchange_table] - return result + with self._get_exchange_file_obj(exchange) as f_obj: + exchange_table = loads(bytes_to_str(f_obj.read())) + return [exchange_queue_t(*q) for q in exchange_table] except FileNotFoundError: return [] - except OSError: - raise ChannelError( - f'Cannot open {filename}') - finally: - f.close() def _queue_bind(self, exchange, routing_key, pattern, queue): - filename = '{}.exchange'.format(exchange) - filename = os.path.join(self.control_folder,filename) - try: - d_fileno = os.open(self.control_folder,os.O_RDONLY) - d = namedtuple('Directory',[]) - d.fileno = lambda: d_fileno - - lock(d,LOCK_EX) - if os.path.isfile(filename): - f = open(filename,'r') - exchange_table = loads(bytes_to_str(f.read())) - f.close() - else: - exchange_table = [] - queues = [tuple(q) for q in exchange_table] - queue_val = (routing_key or '', pattern or '', queue or '' ) - if (queue_val not in queues): - exchange_table = [queue_val] + queues - f = open(filename,'wb') - f.write(str_to_bytes(dumps(exchange_table))) - except OSError: - raise ChannelError( - f'Cannot open {filename!r}') - finally: - unlock(d) - if (f): - f.close() + queues = self.get_table(exchange) + queue_val = exchange_queue_t(routing_key or "", pattern or "", queue or "") + if queue_val not in queues: + queues.insert(0, queue_val) + with self._get_exchange_file_obj(exchange, "wb") as f_obj: + f_obj.write(str_to_bytes(dumps(queues))) def _put_fanout(self, exchange, payload, routing_key, **kwargs): for q in self.get_table(exchange): - self._put(q[2], payload, **kwargs) + self._put(q.queue, payload, **kwargs) def _put(self, queue, payload, **kwargs): """Put `message` onto `queue`.""" - queue_folder = os.path.join(self.data_folder_out, queue) - filename = '{}_{}.msg'.format(int(round(monotonic() * 1000)),uuid.uuid4()) - filename = os.path.join(queue_folder, filename) - f = None # define file descriptor + filename = "{}_{}.{}.msg".format( + int(round(monotonic() * 1000)), uuid.uuid4(), queue + ) + filename = os.path.join(self.data_folder_out, filename) + try: - os.makedirs(queue_folder, exist_ok = True) - f = open(filename, 'wb') + f = open(filename, "wb") lock(f, LOCK_EX) f.write(str_to_bytes(dumps(payload))) except OSError: - raise ChannelError(f'Cannot create {filename}') + raise ChannelError(f"Cannot add file {filename!r} to directory") finally: - if (f): - unlock(f) - f.close() + unlock(f) + f.close() def _get(self, queue): """Get next message from `queue`.""" - queue_folder = os.path.join(self.data_folder_in, queue) - if (not os.path.exists(queue_folder)): - raise Empty() - - messages = os.listdir(queue_folder) - messages = sorted(messages) - while len(messages) > 0: - filename = messages.pop(0) + folder = self._ls_queue(queue) + folder = sorted(folder) + while len(folder) > 0: + filename = folder.pop(0) + if self.store_processed: - processed_folder = os.path.join(self.processed_folder, queue) - os.makedirs(processed_folder, exist_ok = True) + processed_folder = self.processed_folder else: processed_folder = tempfile.gettempdir() try: # move the file to the tmp/processed folder - shutil.move(os.path.join(queue_folder, filename), - processed_folder) + shutil.move(filename, processed_folder) except OSError: pass # file could be locked, or removed in meantime so ignore filename = os.path.join(processed_folder, filename) try: - f = open(filename, 'rb') + f = open(filename, "rb") payload = f.read() f.close() if not self.store_processed: os.remove(filename) except OSError: - raise ChannelError( - f'Cannot read file {filename!r} from queue.') + raise ChannelError(f"Cannot read file {filename!r} from queue.") return loads(bytes_to_str(payload)) raise Empty() + def _ls_queue(self, queue): + """List all messages in the `queue`""" + queue_pattern = os.path.join(self.data_folder_in, f"*.{queue}.msg") + return glob.glob(queue_pattern) + def _purge(self, queue): """Remove all messages from `queue`.""" - queue_folder = os.path.join(self.data_folder_out, queue) + count = 0 + queue_messages = self._ls_queue(queue) + for filename in queue_messages: + try: + os.remove(filename) + count += 1 + except OSError: + # we simply ignore its existence, as it was probably + # processed by another worker + pass + + return count - try: - count = len(os.listdir(queue_folder)) - shutil.rmtree(queue_folder) - return count - except OSError: - # we simply ignore its existence, as it was probably - # processed by another worker - return 0 - def _size(self, queue): """Return the number of messages in `queue` as an :class:`int`.""" - try: - return len(os.listdir(self.data_folder_in)) - except OSError: - return 0 - + return self._ls_queue(queue) @property def transport_options(self): @@ -283,45 +273,42 @@ def transport_options(self): @cached_property def data_folder_in(self): - return self.transport_options.get('data_folder_in', 'data_in') + return self.transport_options.get("data_folder_in", "data_in") @cached_property def data_folder_out(self): - return self.transport_options.get('data_folder_out', 'data_out') + return self.transport_options.get("data_folder_out", "data_out") @cached_property def store_processed(self): - return self.transport_options.get('store_processed', False) + return self.transport_options.get("store_processed", False) @cached_property def processed_folder(self): - return self.transport_options.get('processed_folder', 'processed') + return self.transport_options.get("processed_folder", "processed") @cached_property def control_folder(self): - return self.transport_options.get('control_folder', 'control') - + return self.transport_options.get("control_folder", "control") class Transport(virtual.Transport): """Filesystem Transport.""" implements = virtual.Transport.implements.extend( - asynchronous=False, - exchange_type=frozenset(['direct', 'topic', 'fanout']) + asynchronous=False, exchange_type=frozenset(["direct", "topic", "fanout"]) ) - Channel = Channel # filesystem backend state is global. global_state = virtual.BrokerState() default_port = 0 - driver_type = 'filesystem' - driver_name = 'filesystem' + driver_type = "filesystem" + driver_name = "filesystem" def __init__(self, client, **kwargs): super().__init__(client, **kwargs) self.state = self.global_state def driver_version(self): - return 'N/A' + return "N/A" diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py index a8d1708b2..90fe2332c 100644 --- a/t/unit/transport/test_filesystem.py +++ b/t/unit/transport/test_filesystem.py @@ -8,33 +8,40 @@ @t.skip.if_win32 class test_FilesystemTransport: - def setup(self): self.channels = set() try: data_folder_in = tempfile.mkdtemp() data_folder_out = tempfile.mkdtemp() except Exception: - pytest.skip('filesystem transport: cannot create tempfiles') - self.c = Connection(transport='filesystem', - transport_options={ - 'data_folder_in': data_folder_in, - 'data_folder_out': data_folder_out, - }) + pytest.skip("filesystem transport: cannot create tempfiles") + self.c = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_in, + "data_folder_out": data_folder_out, + }, + ) self.channels.add(self.c.default_channel) - self.p = Connection(transport='filesystem', - transport_options={ - 'data_folder_in': data_folder_out, - 'data_folder_out': data_folder_in, - }) + self.p = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_out, + "data_folder_out": data_folder_in, + }, + ) self.channels.add(self.p.default_channel) - self.e = Exchange('test_transport_filesystem') - self.q = Queue('test_transport_filesystem', - exchange=self.e, - routing_key='test_transport_filesystem') - self.q2 = Queue('test_transport_filesystem2', - exchange=self.e, - routing_key='test_transport_filesystem2') + self.e = Exchange("test_transport_filesystem") + self.q = Queue( + "test_transport_filesystem", + exchange=self.e, + routing_key="test_transport_filesystem", + ) + self.q2 = Queue( + "test_transport_filesystem2", + exchange=self.e, + routing_key="test_transport_filesystem2", + ) def teardown(self): # make sure we don't attempt to restore messages at shutdown. @@ -54,12 +61,10 @@ def _add_channel(self, channel): def test_produce_consume_noack(self): producer = Producer(self._add_channel(self.p.channel()), self.e) - consumer = Consumer(self._add_channel(self.c.channel()), self.q, - no_ack=True) + consumer = Consumer(self._add_channel(self.c.channel()), self.q, no_ack=True) for i in range(10): - producer.publish({'foo': i}, - routing_key='test_transport_filesystem') + producer.publish({"foo": i}, routing_key="test_transport_filesystem") _received = [] @@ -85,11 +90,9 @@ def test_produce_consume(self): self.q2(consumer_channel).declare() for i in range(10): - producer.publish({'foo': i}, - routing_key='test_transport_filesystem') + producer.publish({"foo": i}, routing_key="test_transport_filesystem") for i in range(10): - producer.publish({'foo': i}, - routing_key='test_transport_filesystem2') + producer.publish({"foo": i}, routing_key="test_transport_filesystem2") _received1 = [] _received2 = [] @@ -116,16 +119,17 @@ def callback2(message_data, message): assert len(_received1) + len(_received2) == 20 # compression - producer.publish({'compressed': True}, - routing_key='test_transport_filesystem', - compression='zlib') + producer.publish( + {"compressed": True}, + routing_key="test_transport_filesystem", + compression="zlib", + ) m = self.q(consumer_channel).get() - assert m.payload == {'compressed': True} + assert m.payload == {"compressed": True} # queue.delete for i in range(10): - producer.publish({'foo': i}, - routing_key='test_transport_filesystem') + producer.publish({"foo": i}, routing_key="test_transport_filesystem") assert self.q(consumer_channel).get() self.q(consumer_channel).delete() self.q(consumer_channel).declare() @@ -133,8 +137,87 @@ def callback2(message_data, message): # queue.purge for i in range(10): - producer.publish({'foo': i}, - routing_key='test_transport_filesystem2') + producer.publish({"foo": i}, routing_key="test_transport_filesystem2") assert self.q2(consumer_channel).get() self.q2(consumer_channel).purge() assert self.q2(consumer_channel).get() is None + + +@t.skip.if_win32 +class test_FilesystemFanout(test_FilesystemTransport): + def setup(self): + try: + data_folder_in = tempfile.mkdtemp() + data_folder_out = tempfile.mkdtemp() + control_folder = tempfile.mkdtemp() + except Exception: + pytest.skip("filesystem transport: cannot create tempfiles") + + self.consume_connection = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_in, + "data_folder_out": data_folder_out, + "control_folder": control_folder, + }, + ) + self.consume_channel = self.consume_connection.default_channel + self.produce_connection = Connection( + transport="filesystem", + transport_options={ + "data_folder_in": data_folder_out, + "data_folder_out": data_folder_in, + "control_folder": control_folder, + }, + ) + self.produce_channel = self.produce_connection.default_channel + self.exchange = Exchange("test_transport_filesystem", "fanout") + self.q1 = Queue("queue1", exchange=self.exchange, routing_key="") + self.q2 = Queue("queue2", exchange=self.exchange, routing_key="") + + def test_produce_consume(self): + producer = Producer(self.produce_channel, self.exchange) + consumer1 = Consumer(self.consume_channel, self.q1, no_ack=True) + consumer2 = Consumer(self.consume_channel, self.q2, no_ack=True) + self.q2(self.consume_channel).declare() + + for i in range(10): + producer.publish({"foo": i}) + + _received1 = [] + _received2 = [] + + def callback1(message_data, message): + _received1.append(message) + message.ack() + + def callback2(message_data, message): + _received2.append(message) + message.ack() + + consumer1.register_callback(callback1) + consumer2.register_callback(callback2) + + consumer1.consume() + consumer2.consume() + + while 1: + try: + self.consume_channel.drain_events() + except: + break + + assert len(_received1) + len(_received2) == 20 + + # queue.delete + for i in range(10): + producer.publish({"foo": i}) + assert self.q1(self.consume_channel).get() + self.q1(self.consume_channel).delete() + self.q1(self.consume_channel).declare() + assert self.q1(self.consume_channel).get() is None + + # queue.purge + assert self.q2(self.consume_channel).get() + self.q2(self.consume_channel).purge() + assert self.q2(self.consume_channel).get() is None From ddff81396862de619522115f6174f7aee964bf0a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 7 Mar 2022 13:25:35 +0000 Subject: [PATCH 4/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- kombu/transport/filesystem.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index 0af250056..ce2c163fe 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -88,15 +88,15 @@ def callback(body, message): * ``processed_folder`` - directory where are backed up processed files. """ -import os import glob +import os import shutil import tempfile import uuid -from queue import Empty -from time import monotonic from collections import namedtuple from contextlib import contextmanager +from queue import Empty +from time import monotonic from kombu.exceptions import ChannelError from kombu.utils.encoding import bytes_to_str, str_to_bytes @@ -160,7 +160,7 @@ class Channel(virtual.Channel): @contextmanager def _get_exchange_file_obj(self, exchange, mode="rb"): - filename = "{}.exchange".format(exchange) + filename = f"{exchange}.exchange" filename = os.path.join(self.control_folder, filename) try: f_obj = open(filename, mode) From aab3d6aa177d45ef9f313e6411812b7a612151fe Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Wed, 9 Mar 2022 17:09:39 +0800 Subject: [PATCH 5/7] Remove the refactoring work and make the test passed 1. Remove all of refactoring work 2. make the test pass --- kombu/transport/filesystem.py | 102 +++++++++++++++++--------- t/unit/transport/test_filesystem.py | 109 +++++++++++++++------------- 2 files changed, 126 insertions(+), 85 deletions(-) diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index ce2c163fe..b7b284885 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -88,7 +88,6 @@ def callback(body, message): * ``processed_folder`` - directory where are backed up processed files. """ -import glob import os import shutil import tempfile @@ -106,10 +105,10 @@ def callback(body, message): from . import virtual VERSION = (1, 0, 0) -__version__ = ".".join(map(str, VERSION)) +__version__ = '.'.join(map(str, VERSION)) # needs win32all to work on Windows -if os.name == "nt": +if os.name == 'nt': import pywintypes import win32con @@ -124,15 +123,15 @@ def callback(body, message): def lock(file, flags): """Create file lock.""" hfile = win32file._get_osfhandle(file.fileno()) - win32file.LockFileEx(hfile, flags, 0, 0xFFFF0000, __overlapped) + win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped) def unlock(file): """Remove file lock.""" hfile = win32file._get_osfhandle(file.fileno()) - win32file.UnlockFileEx(hfile, 0, 0xFFFF0000, __overlapped) + win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped) -elif os.name == "posix": +elif os.name == 'posix': import fcntl from fcntl import LOCK_EX, LOCK_NB, LOCK_SH # noqa @@ -147,10 +146,12 @@ def unlock(file): else: - raise RuntimeError("Filesystem plugin only defined for NT and POSIX platforms") + raise RuntimeError( + 'Filesystem plugin only defined for NT and POSIX platforms') -exchange_queue_t = namedtuple("exchange_queue_t", ["routing_key", "pattern", "queue"]) +exchange_queue_t = namedtuple("exchange_queue_t", + ["routing_key", "pattern", "queue"]) class Channel(virtual.Channel): @@ -162,8 +163,10 @@ class Channel(virtual.Channel): def _get_exchange_file_obj(self, exchange, mode="rb"): filename = f"{exchange}.exchange" filename = os.path.join(self.control_folder, filename) + os.makedirs(self.control_folder, exist_ok=True) + f_obj = open(filename, mode) + try: - f_obj = open(filename, mode) if "w" in mode: lock(f_obj, LOCK_EX) yield f_obj @@ -184,7 +187,8 @@ def get_table(self, exchange): def _queue_bind(self, exchange, routing_key, pattern, queue): queues = self.get_table(exchange) - queue_val = exchange_queue_t(routing_key or "", pattern or "", queue or "") + queue_val = exchange_queue_t(routing_key or "", pattern or "", + queue or "") if queue_val not in queues: queues.insert(0, queue_val) with self._get_exchange_file_obj(exchange, "wb") as f_obj: @@ -196,28 +200,33 @@ def _put_fanout(self, exchange, payload, routing_key, **kwargs): def _put(self, queue, payload, **kwargs): """Put `message` onto `queue`.""" - filename = "{}_{}.{}.msg".format( - int(round(monotonic() * 1000)), uuid.uuid4(), queue - ) + filename = '{}_{}.{}.msg'.format(int(round(monotonic() * 1000)), + uuid.uuid4(), queue) filename = os.path.join(self.data_folder_out, filename) try: - f = open(filename, "wb") + f = open(filename, 'wb') lock(f, LOCK_EX) f.write(str_to_bytes(dumps(payload))) except OSError: - raise ChannelError(f"Cannot add file {filename!r} to directory") + raise ChannelError( + f'Cannot add file {filename!r} to directory') finally: unlock(f) f.close() def _get(self, queue): """Get next message from `queue`.""" - folder = self._ls_queue(queue) + queue_find = '.' + queue + '.msg' + folder = os.listdir(self.data_folder_in) folder = sorted(folder) while len(folder) > 0: filename = folder.pop(0) + # only handle message for the requested queue + if filename.find(queue_find) < 0: + continue + if self.store_processed: processed_folder = self.processed_folder else: @@ -225,37 +234,44 @@ def _get(self, queue): try: # move the file to the tmp/processed folder - shutil.move(filename, processed_folder) + shutil.move(os.path.join(self.data_folder_in, filename), + processed_folder) except OSError: pass # file could be locked, or removed in meantime so ignore filename = os.path.join(processed_folder, filename) try: - f = open(filename, "rb") + f = open(filename, 'rb') payload = f.read() f.close() if not self.store_processed: os.remove(filename) except OSError: - raise ChannelError(f"Cannot read file {filename!r} from queue.") + raise ChannelError( + f'Cannot read file {filename!r} from queue.') return loads(bytes_to_str(payload)) raise Empty() - def _ls_queue(self, queue): - """List all messages in the `queue`""" - queue_pattern = os.path.join(self.data_folder_in, f"*.{queue}.msg") - return glob.glob(queue_pattern) - def _purge(self, queue): """Remove all messages from `queue`.""" count = 0 - queue_messages = self._ls_queue(queue) - for filename in queue_messages: + queue_find = '.' + queue + '.msg' + + folder = os.listdir(self.data_folder_in) + while len(folder) > 0: + filename = folder.pop() try: + # only purge messages for the requested queue + if filename.find(queue_find) < 0: + continue + + filename = os.path.join(self.data_folder_in, filename) os.remove(filename) + count += 1 + except OSError: # we simply ignore its existence, as it was probably # processed by another worker @@ -265,7 +281,20 @@ def _purge(self, queue): def _size(self, queue): """Return the number of messages in `queue` as an :class:`int`.""" - return self._ls_queue(queue) + count = 0 + + queue_find = f'.{queue}.msg' + folder = os.listdir(self.data_folder_in) + while len(folder) > 0: + filename = folder.pop() + + # only handle message for the requested queue + if filename.find(queue_find) < 0: + continue + + count += 1 + + return count @property def transport_options(self): @@ -273,42 +302,43 @@ def transport_options(self): @cached_property def data_folder_in(self): - return self.transport_options.get("data_folder_in", "data_in") + return self.transport_options.get('data_folder_in', 'data_in') @cached_property def data_folder_out(self): - return self.transport_options.get("data_folder_out", "data_out") + return self.transport_options.get('data_folder_out', 'data_out') @cached_property def store_processed(self): - return self.transport_options.get("store_processed", False) + return self.transport_options.get('store_processed', False) @cached_property def processed_folder(self): - return self.transport_options.get("processed_folder", "processed") + return self.transport_options.get('processed_folder', 'processed') @cached_property def control_folder(self): - return self.transport_options.get("control_folder", "control") + return self.transport_options.get('control_folder', 'control') class Transport(virtual.Transport): """Filesystem Transport.""" implements = virtual.Transport.implements.extend( - asynchronous=False, exchange_type=frozenset(["direct", "topic", "fanout"]) + asynchronous=False, + exchange_type=frozenset(['direct', 'topic', 'fanout']) ) Channel = Channel # filesystem backend state is global. global_state = virtual.BrokerState() default_port = 0 - driver_type = "filesystem" - driver_name = "filesystem" + driver_type = 'filesystem' + driver_name = 'filesystem' def __init__(self, client, **kwargs): super().__init__(client, **kwargs) self.state = self.global_state def driver_version(self): - return "N/A" + return 'N/A' diff --git a/t/unit/transport/test_filesystem.py b/t/unit/transport/test_filesystem.py index 90fe2332c..fdf233ae4 100644 --- a/t/unit/transport/test_filesystem.py +++ b/t/unit/transport/test_filesystem.py @@ -1,4 +1,5 @@ import tempfile +from queue import Empty import pytest @@ -8,40 +9,33 @@ @t.skip.if_win32 class test_FilesystemTransport: + def setup(self): self.channels = set() try: data_folder_in = tempfile.mkdtemp() data_folder_out = tempfile.mkdtemp() except Exception: - pytest.skip("filesystem transport: cannot create tempfiles") - self.c = Connection( - transport="filesystem", - transport_options={ - "data_folder_in": data_folder_in, - "data_folder_out": data_folder_out, - }, - ) + pytest.skip('filesystem transport: cannot create tempfiles') + self.c = Connection(transport='filesystem', + transport_options={ + 'data_folder_in': data_folder_in, + 'data_folder_out': data_folder_out, + }) self.channels.add(self.c.default_channel) - self.p = Connection( - transport="filesystem", - transport_options={ - "data_folder_in": data_folder_out, - "data_folder_out": data_folder_in, - }, - ) + self.p = Connection(transport='filesystem', + transport_options={ + 'data_folder_in': data_folder_out, + 'data_folder_out': data_folder_in, + }) self.channels.add(self.p.default_channel) - self.e = Exchange("test_transport_filesystem") - self.q = Queue( - "test_transport_filesystem", - exchange=self.e, - routing_key="test_transport_filesystem", - ) - self.q2 = Queue( - "test_transport_filesystem2", - exchange=self.e, - routing_key="test_transport_filesystem2", - ) + self.e = Exchange('test_transport_filesystem') + self.q = Queue('test_transport_filesystem', + exchange=self.e, + routing_key='test_transport_filesystem') + self.q2 = Queue('test_transport_filesystem2', + exchange=self.e, + routing_key='test_transport_filesystem2') def teardown(self): # make sure we don't attempt to restore messages at shutdown. @@ -61,10 +55,12 @@ def _add_channel(self, channel): def test_produce_consume_noack(self): producer = Producer(self._add_channel(self.p.channel()), self.e) - consumer = Consumer(self._add_channel(self.c.channel()), self.q, no_ack=True) + consumer = Consumer(self._add_channel(self.c.channel()), self.q, + no_ack=True) for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_filesystem") + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') _received = [] @@ -90,9 +86,11 @@ def test_produce_consume(self): self.q2(consumer_channel).declare() for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_filesystem") + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_filesystem2") + producer.publish({'foo': i}, + routing_key='test_transport_filesystem2') _received1 = [] _received2 = [] @@ -119,17 +117,16 @@ def callback2(message_data, message): assert len(_received1) + len(_received2) == 20 # compression - producer.publish( - {"compressed": True}, - routing_key="test_transport_filesystem", - compression="zlib", - ) + producer.publish({'compressed': True}, + routing_key='test_transport_filesystem', + compression='zlib') m = self.q(consumer_channel).get() - assert m.payload == {"compressed": True} + assert m.payload == {'compressed': True} # queue.delete for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_filesystem") + producer.publish({'foo': i}, + routing_key='test_transport_filesystem') assert self.q(consumer_channel).get() self.q(consumer_channel).delete() self.q(consumer_channel).declare() @@ -137,14 +134,15 @@ def callback2(message_data, message): # queue.purge for i in range(10): - producer.publish({"foo": i}, routing_key="test_transport_filesystem2") + producer.publish({'foo': i}, + routing_key='test_transport_filesystem2') assert self.q2(consumer_channel).get() self.q2(consumer_channel).purge() assert self.q2(consumer_channel).get() is None @t.skip.if_win32 -class test_FilesystemFanout(test_FilesystemTransport): +class test_FilesystemFanout: def setup(self): try: data_folder_in = tempfile.mkdtemp() @@ -153,7 +151,7 @@ def setup(self): except Exception: pytest.skip("filesystem transport: cannot create tempfiles") - self.consume_connection = Connection( + self.consumer_connection = Connection( transport="filesystem", transport_options={ "data_folder_in": data_folder_in, @@ -161,7 +159,7 @@ def setup(self): "control_folder": control_folder, }, ) - self.consume_channel = self.consume_connection.default_channel + self.consume_channel = self.consumer_connection.channel() self.produce_connection = Connection( transport="filesystem", transport_options={ @@ -170,15 +168,28 @@ def setup(self): "control_folder": control_folder, }, ) - self.produce_channel = self.produce_connection.default_channel - self.exchange = Exchange("test_transport_filesystem", "fanout") - self.q1 = Queue("queue1", exchange=self.exchange, routing_key="") - self.q2 = Queue("queue2", exchange=self.exchange, routing_key="") + self.producer_channel = self.produce_connection.channel() + self.exchange = Exchange("filesystem_exchange_fanout", type="fanout") + self.q1 = Queue("queue1", exchange=self.exchange) + self.q2 = Queue("queue2", exchange=self.exchange) + + def teardown(self): + # make sure we don't attempt to restore messages at shutdown. + for channel in [self.producer_channel, self.consumer_connection]: + try: + channel._qos._dirty.clear() + except AttributeError: + pass + try: + channel._qos._delivered.clear() + except AttributeError: + pass def test_produce_consume(self): - producer = Producer(self.produce_channel, self.exchange) - consumer1 = Consumer(self.consume_channel, self.q1, no_ack=True) - consumer2 = Consumer(self.consume_channel, self.q2, no_ack=True) + + producer = Producer(self.producer_channel, self.exchange) + consumer1 = Consumer(self.consume_channel, self.q1) + consumer2 = Consumer(self.consume_channel, self.q2) self.q2(self.consume_channel).declare() for i in range(10): @@ -204,7 +215,7 @@ def callback2(message_data, message): while 1: try: self.consume_channel.drain_events() - except: + except Empty: break assert len(_received1) + len(_received2) == 20 From a3a4a3dac6738d7fa350f87c9db17ede519db8db Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Thu, 10 Mar 2022 20:49:27 +0800 Subject: [PATCH 6/7] Use pathlib for some Path operation --- kombu/transport/filesystem.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index b7b284885..c3ee5d1c0 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -94,6 +94,7 @@ def callback(body, message): import uuid from collections import namedtuple from contextlib import contextmanager +from pathlib import Path from queue import Empty from time import monotonic @@ -161,17 +162,17 @@ class Channel(virtual.Channel): @contextmanager def _get_exchange_file_obj(self, exchange, mode="rb"): - filename = f"{exchange}.exchange" - filename = os.path.join(self.control_folder, filename) - os.makedirs(self.control_folder, exist_ok=True) - f_obj = open(filename, mode) + file = self.control_folder / f"{exchange}.exchange" + if "w" in mode: + self.control_folder.mkdir(exist_ok=True) + f_obj = file.open(mode) try: if "w" in mode: lock(f_obj, LOCK_EX) yield f_obj except OSError: - raise ChannelError(f"Cannot open {filename}") + raise ChannelError(f"Cannot open {file}") finally: if "w" in mode: unlock(f_obj) @@ -318,7 +319,7 @@ def processed_folder(self): @cached_property def control_folder(self): - return self.transport_options.get('control_folder', 'control') + return Path(self.transport_options.get('control_folder', 'control')) class Transport(virtual.Transport): From 11d5dcc3998379a1d1a05f2a1f1f55f368210d4a Mon Sep 17 00:00:00 2001 From: karajan1001 Date: Thu, 10 Mar 2022 20:49:27 +0800 Subject: [PATCH 7/7] Some reviewed changes --- kombu/transport/filesystem.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kombu/transport/filesystem.py b/kombu/transport/filesystem.py index c3ee5d1c0..d29d06b84 100644 --- a/kombu/transport/filesystem.py +++ b/kombu/transport/filesystem.py @@ -86,6 +86,7 @@ def callback(body, message): * ``store_processed`` - if set to True, all processed messages are backed up to ``processed_folder``. * ``processed_folder`` - directory where are backed up processed files. +* ``control_folder`` - directory where are exchange-queue table stored. """ import os @@ -317,7 +318,7 @@ def store_processed(self): def processed_folder(self): return self.transport_options.get('processed_folder', 'processed') - @cached_property + @property def control_folder(self): return Path(self.transport_options.get('control_folder', 'control'))