Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulsar mailqueue implementation (including delay support) #808

Merged
merged 10 commits into from
Jan 7, 2022

Conversation

mbaechler
Copy link
Member

@mbaechler mbaechler commented Dec 23, 2021

This PR is (we hope) our Christmas gift to the Apache James community after almost two years of work.
It brings a robust mailqueue implementation thanks to Pulsar and lifts limitations of current best implementation (the rabbitmq one).

Copy link
Contributor

@chibenwa chibenwa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks awesome! I am eager to try this ;-)

Few remarks (feel free to ignore them)

  • Do we have some statistics about how this mail queue perform?

  • It would be very nice to have an ADR summurizing this work: why you provided it, what it brings, its architecture, technical details of how Pulsar features are used to implement a mailqueue and the current scope of use (dedicated product...). Maybe with a schema if relevant.

Some architecture remarks from what I could grasp from the review:

  • The remove / browse is done on James side (using an actor model - Akka that do not seem to be distributed) meaning there is implications on the scalability of the size of the queue.

  • The blob store is used to store arriving messages. Which is similar to what had been done with Rabbit. In my experience, (OVH S3 apis) the creation of such blobs is a lateny pain that can easily take 100s of ms. I wonder if a blobstore like S3 is the right place to store short lived objects from the mail queue. Worth sharing feedbacks but I understand there is no really easy solutions.

server/apps/smtp-pulsar-cassandra-app/pom.xml Outdated Show resolved Hide resolved
<artifactId>smtp-relay-1</artifactId>
<packaging>jar</packaging>

<name>Apache James :: Server :: Binaries :: SMTP Relay - C* ES Pular</name>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is ES needed for in an ES relay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's for metrics reporting, it will probably change in the future.
Feel free to reject this artifact entirely, it's very specific to our needs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually I would (personnally) welcome a full migration to Pulsar for the distributed server.

This artifact goes in that direction and thus is IMO welcome.

server/queue/queue-pulsar/pom.xml Outdated Show resolved Hide resolved
@chibenwa
Copy link
Contributor

Also should we propose a task for GSOC to implement a Pulsar based EventBus?

@mbaechler
Copy link
Member Author

Looks awesome! I am eager to try this ;-)

Happy you find it interesting. Thank for the review.

Few remarks (feel free to ignore them)

* Do we have some statistics about how this mail queue perform?

It requires a bunch of hardware to do it properly so no.

* It would be **very** nice to have an ADR summurizing this work: why you provided it, what it brings, its architecture, technical details of how Pulsar features are used to implement a mailqueue and the current scope of use (dedicated product...). Maybe with a schema if relevant.

Of course. We wanted to finally release something so we didn't aim for perfection. I'm not sure we'll find the motivation to write this doc properly. However, we are eager to share what we did, so I propose we do it another way: we could set up a community meeting to discuss this design and take note of that conversation. It probably won't produce a well structured document but the knowledge sharing would at least be done. WDYT?

Some architecture remarks from what I could grasp from the review:

* The remove / browse is done on James side (using an actor model - Akka that do not seem to be distributed) meaning there is implications on the scalability of the size of the queue.

The actor is replicated on every James instance. Each time we dequeue a mail, we ask the local instance if it has been deleted. Actors are kept in sync by distributing deletion commands to them using a Pulsar topic.
We don't foresee any scalability issue as the complexity is bound to the number of deletion, which should be a small amount anyway.
Does it make sense?

* The blob store is used to store arriving messages. Which is similar to what had been done with Rabbit. In my experience, (OVH S3 apis) the creation of such blobs is a lateny pain that can easily take 100s of ms. I wonder if a blobstore like S3 is the right place to store short lived objects from the mail queue. Worth sharing feedbacks but I understand there is no really easy solutions.

I don't have more experience than you regarding object storage. We used the abstraction in place as it's not the problem we tried to solve.

It could make sense to test an implementation where we put emails directly into Pulsar or Bookeeper ... in the future.

@chibenwa
Copy link
Contributor

chibenwa commented Jan 4, 2022

Of course. We wanted to finally release something so we didn't aim for perfection. I'm not sure we'll find the motivation to write this doc properly. However, we are eager to share what we did, so I propose we do it another way: we could set up a community meeting to discuss this design and take note of that conversation. It probably won't produce a well structured document but the knowledge sharing would at least be done. WDYT?

Agree. Feel free to propose a date and link by email. I will take notes and eventually propose such an ADR.

We don't foresee any scalability issue as the complexity is bound to the number of deletion, which should be a small amount anyway.
Does it make sense?

Ok I see.

I think this is rather a strong claim. Eg I ended up to mistakenly configure an open relay and ended up with 30k+ mails in queue and ended up clearing them. Hence the number of deleted emails had an impact.

That being said, I do think this remains a rare operation and that over-engineering is way too easy on the mailQueue component. I'd rather operate a reliable Pulsar implementation that supports only a few deletes rather than an unreliable RabbitMQ...

@chibenwa
Copy link
Contributor

chibenwa commented Jan 4, 2022

Speaking of blob Store....

-> 1. Are blobs well deleted when emails are dequeued/deleted ?

This is important if the duplicating blobstore is used...

-> 2. The BlobReferenceSource API could also be impelented on top of Pulsar for the deduplicating blob store.

@mbaechler mbaechler force-pushed the JAMES-3687-pulsar-mailqueue-1 branch from 2733d15 to 114a1ca Compare January 4, 2022 08:36
@mbaechler
Copy link
Member Author

I think this is rather a strong claim. Eg I ended up to mistakenly configure an open relay and ended up with 30k+ mails in queue and ended up clearing them. Hence the number of deleted emails had an impact.

The queue scalability is not limited by the amount of emails you delete but rather to the number of delete commands you send.

@chibenwa
Copy link
Contributor

chibenwa commented Jan 4, 2022

The queue scalability is not limited by the amount of emails you delete but rather to the number of delete commands you send.

Ok cool. This means more flexibility. And I imagine you could act on pulsar offset to ignore old deletes that are no longer relevant if needed. Low level, ok, but it would enable an admin to completly leverage this limitation.

@chibenwa
Copy link
Contributor

chibenwa commented Jan 4, 2022

BTW we will resume the build here when https://issues.apache.org/jira/browse/INFRA-22697 gets resolved

CF https://github.com/apache/infrastructure-p6/pull/927

@mbaechler
Copy link
Member Author

Speaking of blob Store....

-> 1. Are blobs well deleted when emails are dequeued/deleted ?

This is important if the duplicating blobstore is used...

No, @jeantil figured this out after we opened this PR. It's not hard to fix, we'll fix that soon.

-> 2. The BlobReferenceSource API could also be impelented on top of Pulsar for the deduplicating blob store.

It looks like pulsar is not ok with big messages: https://www.na2021.pulsar-summit.org/all-talks/how-big-is-too-big%3F-message-size-in-pulsar

Bookkeeper is probably ok, I found this project https://github.com/diennea/blobit that is used to store ... mails on bookeeper.

@chibenwa
Copy link
Contributor

chibenwa commented Jan 4, 2022

It looks like pulsar is not ok with big messages

Yes I suspected that.

Bookkeeper is probably ok, I found this project https://github.com/diennea/blobit that is used to store ... mails on bookeeper.

I recall from a talk when I was back in France, of a music streaming billing plateform using HBase before ingesting data into Cassandra/blob storage. I bet you can get pretty inventive....

The BlobReferenceSource API could also be impelented on top of Pulsar for the deduplicating blob store.

Are you planning to take care of this too? It should be easy...

@mbaechler
Copy link
Member Author

The BlobReferenceSource API could also be impelented on top of Pulsar for the deduplicating blob store.

Are you planning to take care of this too? It should be easy...

No, I don't think so, we'll probably rely on S3-like storage for now, latency doesn't sound like a problem to me for mail delivery.

@chibenwa
Copy link
Contributor

chibenwa commented Jan 4, 2022

No, I don't think so, we'll probably rely on S3-like storage for now, latency doesn't sound like a problem to me for mail delivery.

It is more a question of duplicating blob store VS deduplicating blob store.

If you do not want to code this please make sure you hard code the "duplicating" choice in your artifact.

We should document it being missing so that we could get contributions completing your work (hopefully)...

Copy link
Contributor

@Arsnael Arsnael left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No more comments to add.

Was a big work to review, but looks awesome guys!

@chibenwa
Copy link
Contributor

chibenwa commented Jan 5, 2022

Test Result (2 failures / -16)
    org.apache.james.queue.jms.JMSCacheableMailQueueTest.removeShouldNotDeleteFutureEmailsFromBrowse

    Expecting:
  ["name1", "name2"]
to contain exactly (and in same order):
  ["name2"]
but some elements were not expected:
  ["name1"]


    org.apache.james.queue.jms.JMSCacheableMailQueueTest.removeShouldNotDeleteFutureEmails
    

  
  org.opentest4j.AssertionFailedError: 

expected: "name2"
but was : "name1"

Could you have a look?

(I restarted a build to see if this failure is stable....

@chibenwa
Copy link
Contributor

chibenwa commented Jan 6, 2022

The workaround seems to be partially working

org.apache.james.queue.activemq.ActiveMQMailQueueBlobTest.removeByRecipientShouldRemoveSpecificEmailWhenMultipleRecipients

Expecting:
  ["name1", "name2"]
to contain exactly (and in same order):
  ["name2"]
but some elements were not expected:
  ["name1"]

@chibenwa
Copy link
Contributor

chibenwa commented Jan 6, 2022

The JMS fixes are merged and there is now a conflict here.

Shall this PR be rebased?

@jeantil jeantil force-pushed the JAMES-3687-pulsar-mailqueue-1 branch from 083f4d9 to 1b653da Compare January 6, 2022 10:52
@jeantil
Copy link
Contributor

jeantil commented Jan 6, 2022

I just rebased the PR

Comment on lines +78 to +80
binder.bind(BlobStore.class)
.annotatedWith(Names.named(MetricableBlobStore.BLOB_STORE_IMPLEMENTATION))
.to(PassThroughBlobStore.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do not want to code this please make sure you hard code the "duplicating" choice in your artifact.

isn't that what we do here ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is ;-)

@mbaechler
Copy link
Member Author

I just rebased the PR

We could actually drop the last commit now. I'll do it eventually if it's not merged already.

@chibenwa
Copy link
Contributor

chibenwa commented Jan 6, 2022

Removes the mime message blobs Dont forget to add the JIRA to this commit message ;-)

- upon acknowledged dequeue
- upon successful filtering after a remove
@jeantil jeantil force-pushed the JAMES-3687-pulsar-mailqueue-1 branch from 1b653da to 00c3db9 Compare January 6, 2022 13:12
@jeantil
Copy link
Contributor

jeantil commented Jan 6, 2022

We could actually drop the last commit now. I'll do it eventually if it's not merged already.

done

Removes the mime message blobs Dont forget to add the JIRA to this commit message ;-)

and done

@jeantil
Copy link
Contributor

jeantil commented Jan 6, 2022

DistributedPOP3ServerTest seemq to be flaky ( cf failure in pipline 9)

@jeantil
Copy link
Contributor

jeantil commented Jan 7, 2022

Also I don't think the flakyness is related to this PR. The failing test is org.apache.james.Pop3ServerContract#deletingAMessageDeletesOnlyOne
If I read it correctly, the test does the following :

As far as I can tell this is an integration test which spins up a real asynchronous server with a real mailqueue backed by a dockerized rabbitmq instance so this test seems to have a nice race condition between how fast the mailqueue consumes messages vs how fast the test reaches its assertion :)

it could probably be made more stable by changing

Awaitility.await().until(() ->
            server.getProbe(MailboxProbeImpl.class)
                .searchMessage(MultimailboxesSearchQuery.from(SearchQuery.builder().build()).build(), USER, 1).size() == 1);

to

Awaitility.await().until(() ->
            server.getProbe(MailboxProbeImpl.class)
                .searchMessage(MultimailboxesSearchQuery.from(SearchQuery.builder().build()).build(), USER, 50).size() == 50);

@chibenwa
Copy link
Contributor

chibenwa commented Jan 7, 2022

I am merging this.

I will try to write an ADR for this (help welcome!)

Also post-merge reviews are welcomed, and could help fill a JIRA of possible improvments for this work.

@chibenwa chibenwa merged commit 81223ef into apache:master Jan 7, 2022
@Arsnael
Copy link
Contributor

Arsnael commented Jan 7, 2022

@jeantil I agree that the flaky test has likely nothing to do with this work.

However, thanks for checking this, I think your suggestion makes sense to render that test stable. Could you create a PR with this fix if you have time? (as you spent time debugging it you should get the credit^^) I would gladly review it!

Comment on lines +443 to +447
override def clear(): Long = {
val count = getSize()
admin.topics().delete(outTopic.name, true)
count
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't clear also get rid of the scheduled topic?

After clearing, can messages still transit through the mailqueue? (I expect it should, but I do not see how this can work if we nuke the out topic... New producers would re-create automatically a missing topic?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New producers would re-create automatically a missing topic?

yes, n pulsar there is no need to explicitely create topics

Shouldn't clear also get rid of the scheduled topic?

This behavior is not defined by the existing contracts, I'll defer to @mbaechler on which behaviour makes the most sense.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This behavior is not defined by the existing contracts, I'll defer to @mbaechler on which behaviour makes the most sense.

It is a pity. IMO it should.

Comment on lines +271 to +272
* This means that the FilterStage will get slower to start as the number of filter increases, it will also consume
* an increasing amount of RAM until the first mail is processed which will invalidate and purge the expired filters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we could get emails with arbitrary delays, invalidating filters likely lead to bugs, as the underlying emails can be out of order.

Maybe it is safer to get rid of this optimisation. Correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the same as the issue you raised on clear. The behavior for scheduled messages is not explicitly defined by the contracts.
Removing the optimization means introducing an ever increasing list of filters which is akin to a memory leak. What we could look into is purging all scheduled messages that have been scheduled before the call to remove(), it requires a bit of rework but should be doable.

Comment on lines +116 to +117
private val outTopic = Topic(s"persistent://${config.namespace.asString}/James-${name.asString()}")
private val scheduledTopic = Topic(s"persistent://${config.namespace.asString}/${name.asString()}-scheduled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely a stupid question but why using a separate topic for delayed messages?

My take is that it limits the out-of-order impact and yields better performance.

Do you confirm?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants