Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fanout to filesystem #1499

Merged
merged 7 commits into from Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 62 additions & 1 deletion kombu/transport/filesystem.py
Expand Up @@ -65,7 +65,7 @@ def callback(body, message):
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Fanout: Yes
* Supports Priority: No
* Supports TTL: No

Expand All @@ -86,12 +86,16 @@ def callback(body, message):
* ``store_processed`` - if set to True, all processed messages are backed up to
``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
* ``control_folder`` - directory where are exchange-queue table stored.
"""

import os
import shutil
import tempfile
import uuid
from collections import namedtuple
from contextlib import contextmanager
from pathlib import Path
from queue import Empty
from time import monotonic

Expand Down Expand Up @@ -128,6 +132,7 @@ def unlock(file):
hfile = win32file._get_osfhandle(file.fileno())
win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped)


elif os.name == 'posix':

import fcntl
Expand All @@ -140,14 +145,61 @@ def lock(file, flags):
def unlock(file):
"""Remove file lock."""
fcntl.flock(file.fileno(), fcntl.LOCK_UN)


else:
raise RuntimeError(
'Filesystem plugin only defined for NT and POSIX platforms')


exchange_queue_t = namedtuple("exchange_queue_t",
["routing_key", "pattern", "queue"])


class Channel(virtual.Channel):
"""Filesystem Channel."""

supports_fanout = True

@contextmanager
def _get_exchange_file_obj(self, exchange, mode="rb"):
file = self.control_folder / f"{exchange}.exchange"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use pathlib for path management. but didn't expand this to other parts.

if "w" in mode:
self.control_folder.mkdir(exist_ok=True)
f_obj = file.open(mode)

try:
if "w" in mode:
lock(f_obj, LOCK_EX)
yield f_obj
except OSError:
raise ChannelError(f"Cannot open {file}")
finally:
if "w" in mode:
unlock(f_obj)
f_obj.close()

def get_table(self, exchange):
try:
with self._get_exchange_file_obj(exchange) as f_obj:
exchange_table = loads(bytes_to_str(f_obj.read()))
return [exchange_queue_t(*q) for q in exchange_table]
matusvalo marked this conversation as resolved.
Show resolved Hide resolved
except FileNotFoundError:
return []

def _queue_bind(self, exchange, routing_key, pattern, queue):
matusvalo marked this conversation as resolved.
Show resolved Hide resolved
queues = self.get_table(exchange)
matusvalo marked this conversation as resolved.
Show resolved Hide resolved
queue_val = exchange_queue_t(routing_key or "", pattern or "",
queue or "")
if queue_val not in queues:
queues.insert(0, queue_val)
with self._get_exchange_file_obj(exchange, "wb") as f_obj:
f_obj.write(str_to_bytes(dumps(queues)))

def _put_fanout(self, exchange, payload, routing_key, **kwargs):
for q in self.get_table(exchange):
self._put(q.queue, payload, **kwargs)

def _put(self, queue, payload, **kwargs):
"""Put `message` onto `queue`."""
filename = '{}_{}.{}.msg'.format(int(round(monotonic() * 1000)),
Expand Down Expand Up @@ -266,10 +318,19 @@ def store_processed(self):
def processed_folder(self):
return self.transport_options.get('processed_folder', 'processed')

@property
def control_folder(self):
return Path(self.transport_options.get('control_folder', 'control'))


class Transport(virtual.Transport):
"""Filesystem Transport."""

implements = virtual.Transport.implements.extend(
asynchronous=False,
exchange_type=frozenset(['direct', 'topic', 'fanout'])
)

Channel = Channel
# filesystem backend state is global.
global_state = virtual.BrokerState()
Expand Down
94 changes: 94 additions & 0 deletions t/unit/transport/test_filesystem.py
@@ -1,4 +1,5 @@
import tempfile
from queue import Empty

import pytest

Expand Down Expand Up @@ -138,3 +139,96 @@ def callback2(message_data, message):
assert self.q2(consumer_channel).get()
self.q2(consumer_channel).purge()
assert self.q2(consumer_channel).get() is None


@t.skip.if_win32
class test_FilesystemFanout:
def setup(self):
try:
data_folder_in = tempfile.mkdtemp()
data_folder_out = tempfile.mkdtemp()
control_folder = tempfile.mkdtemp()
except Exception:
pytest.skip("filesystem transport: cannot create tempfiles")

self.consumer_connection = Connection(
transport="filesystem",
transport_options={
"data_folder_in": data_folder_in,
"data_folder_out": data_folder_out,
"control_folder": control_folder,
},
)
self.consume_channel = self.consumer_connection.channel()
self.produce_connection = Connection(
transport="filesystem",
transport_options={
"data_folder_in": data_folder_out,
"data_folder_out": data_folder_in,
"control_folder": control_folder,
},
)
self.producer_channel = self.produce_connection.channel()
self.exchange = Exchange("filesystem_exchange_fanout", type="fanout")
self.q1 = Queue("queue1", exchange=self.exchange)
self.q2 = Queue("queue2", exchange=self.exchange)

def teardown(self):
# make sure we don't attempt to restore messages at shutdown.
for channel in [self.producer_channel, self.consumer_connection]:
try:
channel._qos._dirty.clear()
except AttributeError:
pass
try:
channel._qos._delivered.clear()
except AttributeError:
pass

def test_produce_consume(self):

producer = Producer(self.producer_channel, self.exchange)
consumer1 = Consumer(self.consume_channel, self.q1)
consumer2 = Consumer(self.consume_channel, self.q2)
self.q2(self.consume_channel).declare()

for i in range(10):
producer.publish({"foo": i})

_received1 = []
_received2 = []

def callback1(message_data, message):
_received1.append(message)
message.ack()

def callback2(message_data, message):
_received2.append(message)
message.ack()

consumer1.register_callback(callback1)
consumer2.register_callback(callback2)

consumer1.consume()
consumer2.consume()

while 1:
try:
self.consume_channel.drain_events()
except Empty:
break

assert len(_received1) + len(_received2) == 20

# queue.delete
for i in range(10):
producer.publish({"foo": i})
assert self.q1(self.consume_channel).get()
self.q1(self.consume_channel).delete()
self.q1(self.consume_channel).declare()
assert self.q1(self.consume_channel).get() is None

# queue.purge
assert self.q2(self.consume_channel).get()
self.q2(self.consume_channel).purge()
assert self.q2(self.consume_channel).get() is None