-
Notifications
You must be signed in to change notification settings - Fork 463
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ADR] 51. Pulsar MailQueue #829
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
incomplete review : the kids woke up :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @chibenwa for this ADR, it's very good.
src/adr/0051-pulsar-mailqueue.md
Outdated
- The pulsar admin client is used to list existing queues. | ||
- Priorities are not yet supported. | ||
- Only metadata transit through Pulsar. The general purpose James blobStore, backed by a S3 compatible API, is used to | ||
store the underlying email content. Saves on top of object-storage is latency prone and exposed to end SMTP clients. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is latency a problem in async handling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enqueue is synchronous
(dequeue is asynchronous)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have seen during the review that pulsar doesn't handle large messages.
Since this is a distributed implementation, it requires a network accessible (and preferably also distributed) storage to allow for enqueue and dequeue to happen on different nodes. I only see two options here :
- a blob store implementation
- a shared distributed filesystem
both of which will incur a latency risk upon enqueue especially for larger blobs
I'm curious if you know of use cases where the enqueue latency is a problem ? can it be mitigated by using enqueueReactive
instead of enqueue
(since that allows multiple enqueues to occur in parallel, reducing the latency impact on the overall enqueue capacity) ?
Such use cases would be a great addition to this ADR since it would help people decide whether this implementation fits their need or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could even change enqueue to be reactive, I can't see any reason not too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have a reactive enqueue, the problem lies down to the fact that SMTP implementation is blocking (that's complex to change).
Here I am not arguing against the blobStore abstraction nor am I arguing against a distributed one. I am arguing that some blobStore implementation (like S3 backed with OVH) have a performance profile poorly suited for a queue usage. THe requirements are just very different from long term storage. While long term storage could accomodate higher write latencies (written asynchronously) in order to get cheaper storage costs (think HDD) on a queue it makes more sense to have fast but expensive storage (think SSD). The trade offs are not the same. The optimal technologies to implement the blobStore in both case might not be the same either... And this is an orthogonal problem to reactive enqueues.
src/adr/0051-pulsar-mailqueue.md
Outdated
[[Question]] | ||
``` | ||
https://github.com/apache/james-project/pull/808#discussion_r780170243 | ||
|
||
> Expired filters are removed. | ||
|
||
Given we could get emails with arbitrary delays, invalidating filters likely lead to bugs, as the underlying emails can | ||
be out of order. | ||
|
||
Maybe it is safer to get rid of this optimisation. Correct? | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mbaechler @jeantil opinion on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worse. Deletes fails on delayed items... Where they are likely the most usefull...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think keeping expired filters in memory is useful.
If we don't expire the filters at all they will continue to apply to all future email which is not how I understood the feature. As I wrote in response to the same comment on the PR:
What we could look into is purging all scheduled messages that have been scheduled before the call to remove(), it requires a bit of rework but should be doable.
With questions answered the ADR is in its final form, ready for a final review. |
src/adr/0051-pulsar-mailqueue.md
Outdated
backing Apache James messaging capabilities. | ||
|
||
To reach this status the following work needs to be under-taken: | ||
- The Pulsar MailQueue need to work on top of a deduplicated blob store. To do this we need to be able to list blobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds more like a nice to have than a strong requirement to me. The mailqueue content is short-lived by nature, therfore the cost of duplication would have a much smalled impact that in the mailbox. While I don't know the exact details of the deduplicating implementation and haven't looked at it it is likely to be much more complex.
Also I don't think this is needed for "broader adoption" in general, but maybe for your use cases :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I buy this!
Then we likely need the hability to use diferent blobstores accross the James application:
- Duplicating blob store to back the mail queue
- Deduplicating blob store to back long lived storage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall that one benefit of using also deduplicated storage is that you can append message bodies once accross the full lifecycle of a message, thus including the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we likely need the hability to use diferent blobstores accross the James application:
isn't that easily achieved in the assembly using named injection ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course ;-)
src/adr/0051-pulsar-mailqueue.md
Outdated
[ADR 4](0004-distributed-tasks-listing.md) [ADR 5](0005-distributed-task-termination-ackowledgement.md) | ||
[ADR 6](0006-task-serialization.md) [ADR 7](0007-distributed-task-cancellation.md) | ||
[ADR 8](0008-distributed-task-await.md), eventually allowing to drop the RabbitMQ technology all-together. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A nice improvement on the current implementation would be to store the current filter state in a compacted topic instead of storing all the individual filters.
This would improve the current system which has a starting time and a storage impact which increase linearly with the number of filters in the command topic.
src/adr/0051-pulsar-mailqueue.md
Outdated
|
||
Priorities are not yet supported by the current implementation. See [JIRA-XXXX](TODO). | ||
|
||
A bug regarding clear not purging delayed messages had been |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this or the additional implementation of priorities should be in the ADR:
- neither are fundamental limitations of the technology used only implementation details
- neither would have been there if the ADR had been written before the implementation ;)
- this will become incorrect if any of this changes in the future
just to let you know there is an easy fix for this bug (at least it passes the additional constraints proposed by @chibenwa) which I will contribute after reviewing it with @mbaechler
src/adr/0051-pulsar-mailqueue.md
Outdated
- The pulsar admin client is used to list existing queues. | ||
- Priorities are not yet supported. | ||
- Only metadata transit through Pulsar. The general purpose James blobStore, backed by a S3 compatible API, is used to | ||
store the underlying email content. Saves on top of object-storage is latency prone and exposed to end SMTP clients. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have seen during the review that pulsar doesn't handle large messages.
Since this is a distributed implementation, it requires a network accessible (and preferably also distributed) storage to allow for enqueue and dequeue to happen on different nodes. I only see two options here :
- a blob store implementation
- a shared distributed filesystem
both of which will incur a latency risk upon enqueue especially for larger blobs
I'm curious if you know of use cases where the enqueue latency is a problem ? can it be mitigated by using enqueueReactive
instead of enqueue
(since that allows multiple enqueues to occur in parallel, reducing the latency impact on the overall enqueue capacity) ?
Such use cases would be a great addition to this ADR since it would help people decide whether this implementation fits their need or not.
src/adr/0051-pulsar-mailqueue.md
Outdated
|
||
We expect an easier to operate, cheaper, more reliable MailQueue. | ||
|
||
We expect delays being supported as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be mentioned that by design the implementation doesn't try to offer absolute consistency guarantees
For example consider the remove filters. If we wanted to ensure that the filters have been distributed to all the nodes, we would need to know the topology of the cluster and wait for a confirmation from all the nodes before we return from the remove method in order to implement a fully consistent system.
size is prone to race conditions
src/adr/0051-pulsar-mailqueue.md
Outdated
|
||
- out topic : contains the mail that are ready to be dequeued. | ||
- scheduled topic: emails that are delayed are first enqueued there. | ||
- filter topic: Deletions (name, sender, recipients) prior a given sequence are synchronized between nodes using this topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The consumers on out topic and scheduled topic use the same subscription name and shared consumers => each node will therefore act as a worker pool for dequeuing messages from scheduled topic or dequeuing messages from out topic.
The consumers
On filter topic, each consumer uses a unique subscription name and will therefore receive a copy of every messages in the topic. this ensures a full distribution of the filter state to all nodes in the cluster.
src/adr/0051-pulsar-mailqueue.md
Outdated
note that in current versions of pulsar there is a scheduled job that handles scheduled messages, the accuracy of scheduling is limited by the frequency at which this job runs. | ||
|
||
|
||
The size of the mail queue can be simply computed from the out and scheduled topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a window where messages go from the scheduled topic to the outtopic where they may be reported twice by the size method since we need to ack the message to remove it from the scheduled topic and we can't do that before the message has been fully published on the out topic. (eventual consistency again)
Applies modifications related to @jeantil review.
ADR for #808
TODO:
epic
and link all JIRAs to it.