Skip to content

Commit

Permalink
MAILBOX-297 Avoid reading message table upon flags updates
Browse files Browse the repository at this point in the history
MailboxMessage metadata (UID, mailboxID, modseq, MessageId and Flags) are enough
  • Loading branch information
chibenwa committed May 29, 2017
1 parent d496bb2 commit 0535d4b
Showing 1 changed file with 17 additions and 22 deletions.
Expand Up @@ -280,7 +280,7 @@ public MessageMetaData add(Mailbox mailbox, MailboxMessage message) throws Mailb
@Override
public Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MessageRange set) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
return retrieveMessages(retrieveMessageIds(mailboxId, set), FetchType.Metadata, Optional.empty())
return messageIdDAO.retrieveMessages(mailboxId, set)
.join()
.flatMap(message -> updateFlagsOnMessage(mailbox, flagUpdateCalculator, message))
.map((UpdatedFlags updatedFlags) -> indexTableHandler.updateIndexOnFlagsUpdate(mailboxId, updatedFlags)
Expand Down Expand Up @@ -329,26 +329,24 @@ private CompletableFuture<Void> insertIds(MailboxMessage message, CassandraId ma
imapUidDAO.insert(composedMessageIdWithMetaData));
}

private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MailboxMessage message) {
return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, message)
private Stream<UpdatedFlags> updateFlagsOnMessage(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData composedMessageIdWithMetaData) {
return tryMessageFlagsUpdate(flagUpdateCalculator, mailbox, composedMessageIdWithMetaData)
.map(Stream::of)
.orElse(handleRetries(mailbox, flagUpdateCalculator, message));
.orElse(handleRetries(mailbox, flagUpdateCalculator, composedMessageIdWithMetaData));
}

private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, Mailbox mailbox, MailboxMessage message) {
private Optional<UpdatedFlags> tryMessageFlagsUpdate(FlagsUpdateCalculator flagUpdateCalculator, Mailbox mailbox, ComposedMessageIdWithMetaData oldMetaData) {
try {
long oldModSeq = message.getModSeq();
Flags oldFlags = message.createFlags();
long oldModSeq = oldMetaData.getModSeq();
Flags oldFlags = oldMetaData.getFlags();
Flags newFlags = flagUpdateCalculator.buildNewFlags(oldFlags);

boolean involveFlagsChanges = !identicalFlags(oldFlags, newFlags);
long newModSeq = generateNewModSeqIfNeeded(mailbox, oldModSeq, involveFlagsChanges);

message.setFlags(newFlags);
message.setModSeq(newModSeq);
if (updateFlags(message, oldModSeq)) {
if (updateFlags(oldMetaData, newFlags, newModSeq)) {
return Optional.of(UpdatedFlags.builder()
.uid(message.getUid())
.uid(oldMetaData.getComposedMessageId().getUid())
.modSeq(newModSeq)
.oldFlags(oldFlags)
.newFlags(newFlags)
Expand All @@ -372,13 +370,13 @@ private boolean identicalFlags(Flags oldFlags, Flags newFlags) {
return oldFlags.equals(newFlags);
}

private boolean updateFlags(MailboxMessage message, long oldModSeq) {
private boolean updateFlags(ComposedMessageIdWithMetaData oldMetadata, Flags newFlags, long newModSeq) {
ComposedMessageIdWithMetaData composedMessageIdWithMetaData = ComposedMessageIdWithMetaData.builder()
.composedMessageId(new ComposedMessageId(message.getMailboxId(), message.getMessageId(), message.getUid()))
.modSeq(message.getModSeq())
.flags(message.createFlags())
.composedMessageId(oldMetadata.getComposedMessageId())
.modSeq(newModSeq)
.flags(newFlags)
.build();
return imapUidDAO.updateMetadata(composedMessageIdWithMetaData, oldModSeq)
return imapUidDAO.updateMetadata(composedMessageIdWithMetaData, oldMetadata.getModSeq())
.thenCompose(success -> Optional.of(success)
.filter(b -> b)
.map((Boolean any) -> messageIdDAO.updateMetadata(composedMessageIdWithMetaData)
Expand All @@ -387,12 +385,12 @@ private boolean updateFlags(MailboxMessage message, long oldModSeq) {
.join();
}

private Stream<UpdatedFlags> handleRetries(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, MailboxMessage message) {
private Stream<UpdatedFlags> handleRetries(Mailbox mailbox, FlagsUpdateCalculator flagUpdateCalculator, ComposedMessageIdWithMetaData oldMetaData) {
try {
return Stream.of(
new FunctionRunnerWithRetry(maxRetries)
.executeAndRetrieveObject(() -> retryMessageFlagsUpdate(mailbox,
message.getMessageId(),
oldMetaData.getComposedMessageId().getMessageId(),
flagUpdateCalculator)));
} catch (MessageDeletedDuringFlagsUpdateException e) {
mailboxSession.getLog().warn(e.getMessage());
Expand All @@ -413,9 +411,6 @@ private Optional<UpdatedFlags> retryMessageFlagsUpdate(Mailbox mailbox, MessageI
.orElseThrow(MailboxDeleteDuringUpdateException::new);
return tryMessageFlagsUpdate(flagUpdateCalculator,
mailbox,
messageDAO.retrieveMessages(ImmutableList.of(composedMessageIdWithMetaData), FetchType.Metadata, Optional.empty()).join()
.findFirst()
.map(pair -> pair.getLeft().toMailboxMessage(ImmutableList.of()))
.orElseThrow(() -> new MessageDeletedDuringFlagsUpdateException(cassandraId, (CassandraMessageId) messageId)));
composedMessageIdWithMetaData);
}
}

0 comments on commit 0535d4b

Please sign in to comment.