-
-
Notifications
You must be signed in to change notification settings - Fork 928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce global key prefix for redis transport #912
Conversation
Codecov Report
@@ Coverage Diff @@
## master #912 +/- ##
==========================================
+ Coverage 88.58% 88.59% +<.01%
==========================================
Files 63 63
Lines 6508 6512 +4
Branches 774 774
==========================================
+ Hits 5765 5769 +4
Misses 661 661
Partials 82 82
Continue to review full report at Codecov.
|
@wetneb seems correct. This will require a small unit test here, just to verify functionality. We can then possibly add a simple integration test in Celery test suite. Also, perhaps the same needs to be done for the Celery Redis Backend? |
@georgepsarakis thanks for the quick review. I have added a unit test, I think the rest belongs to the Celery repository. |
@wetneb Can you please address the review from @georgepsarakis so we can merge this? |
@thedrow it is on my todo list, but it might have to wait a bit more. If this is urgently required, feel free to take over the PR. |
I have no clue why one test fails for Python 3.5 - can I leave it to the experts @thedrow @georgepsarakis ? |
I have restarted the failing build |
@@ -757,7 +767,8 @@ def _size(self, queue): | |||
|
|||
def _q_for_pri(self, queue, pri): | |||
pri = self.priority(pri) | |||
return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', '')) | |||
key = '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', '')) | |||
return self.global_key_prefix + key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is redundant, don't the queue names (queue
variable here) already contain self.keyprefix_queue
, which in its turn is prefixed with the global namespace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@georgepsarakis I was puzzled by this too!
I assume that because this method comes from the base interface, it is just called with the bare queue name without prefix (because prefixes are introduced by the redis implementation).
kombu/kombu/transport/virtual/base.py
Line 361 in 0af7519
def _put(self, queue, message): |
But I might be wrong! If this method is indeed called with the prefixed queue name, then I guess the second test case becomes trivial…
My analysis is that the original code lacked a concatenation with self.keyprefix_queue
here originally. I can change my PR to fix that (instead of just prepending the self.global_key_prefix
). At the moment, the self.keyprefix_queue
seems to be used only for sets, not lists - this is probably unintentional.
c = chan._create_client = Mock() | ||
message = {'hello': 'world'} | ||
chan._put('queue', message) | ||
c().lpush.assert_called_with('fooqueue', dumps(message)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@georgepsarakis the test case for Channel._put
is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wetneb thanks for addressing the comments and sorry for the late response.
I tried to do an end-to-end test locally using your changes and there are several issues:
- the Consumer does not seem to be able to recognize the queue.
- when messages are not acknowledged two additional keys are added:
unacked
(Hash) andunacked_index
(Sorted Set). You will notice that these are not prefixed, something that is within the scope of this PR.
Would you mind cross-checking the below Producer/Consumer sample:
from kombu import Exchange, Queue, Producer, Connection, Consumer
connection = Connection('redis://')
channel = connection.channel()
producer = Producer(channel)
queue = Queue('a_queue_name', exchange=Exchange('default'))
# Publish a message
producer.publish('message_1', exchange=queue.exchange, declare=[queue])
def printer(message):
print message
# Remove the line below for unacknowledged behavior
message.ack()
with Consumer(connection, [queue], on_message=printer):
connection.drain_events()
Publishing seems to work, the message ends up on a prefixed key (queue), but is not compatible with the Exchange declaration, you can run the following in a Redis CLI in order to inspect:
SMEMBERS _kombu.binding.default
Note that the above example works as expected (message is printed when consuming) with current release.
I did some trials but could not get it to work, so it is likely that I did not get something right. Could you please execute the above example and inspect the Redis keys on each step (using the KEYS command)?
@@ -500,6 +506,10 @@ def __init__(self, *args, **kwargs): | |||
# by default. | |||
self.keyprefix_fanout = '' | |||
|
|||
# Prepend the global key prefix to the two key prefixes in use | |||
self.keyprefix_fanout = self.global_key_prefix + self.keyprefix_fanout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an observation: if you notice the comment:
kombu/kombu/transport/redis.py
Lines 499 to 502 in 0af7519
# previous versions did not set a fanout, so cannot enable | |
# by default. | |
self.keyprefix_fanout = '' | |
In the current implementation, the prefix will not be an empty string, thus now a non-empty pattern will be available for subscription:
kombu/kombu/transport/redis.py
Lines 625 to 627 in 0af7519
def _get_publish_topic(self, exchange, routing_key): | |
if routing_key and self.fanout_patterns: | |
return ''.join([self.keyprefix_fanout, exchange, '/', routing_key]) |
Thus if someone uses a global key prefix they will also implicitly use a non-empty fanout prefix. I think if you move the logic for self.keyprefix_fanout
global prefixing under line 503 it would be safer, either way. How does that sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I am not sure I follow the logic. Yes a global key prefix will induce a non-empty fanout prefix. If we move the prefixing under line 503 then by default the global key prefix will not be applied to fanouts by default, so it is not really global, is it?
Compatibility with previous versions is not a concern here: using a global key prefix will make the channel incompatible with any previous versions anyway.
@georgepsarakis okay I have tried to understand what is happening here and I realized that I just don't have the necessary background on kombu and redis to debug this correctly. This change seemed fairly simple intuitively but I just cannot afford to spend hours learning all the bits involved, given that it is no longer critical for me. I have a lot of other things going on and I need to prioritize, so I will stop working on this. I hope someone knowledgeable will come and finish this PR so that the effort is not lost. |
@wetneb that is perfectly understandable, thank you very much for your efforts so far. Sorry for not giving you better guidelines sooner, unfortunately I am not adequately familiar with certain internals either and this change seems to have much broader scope than I anticipated. @thedrow @auvipy any idea why the integration tests did not catch the above issue? Perhaps we should open an issue to review these test cases? I suggest we close this for the time being and leave #853 open. |
Yes I agree. |
This helped me:
|
@georgepsarakis here is a proposal for #853. Do you think this is the right approach?