Skip to content

Commit

Permalink
JAMES-1945 Parralelize UID update in CassandraUidProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa committed Feb 23, 2017
1 parent f2b7cd7 commit 6d94a8f
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 65 deletions.
Expand Up @@ -19,46 +19,71 @@

package org.apache.james.mailbox.cassandra.mail;

import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.MAILBOX_ID;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.NEXT_UID;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import javax.inject.Inject;

import org.apache.james.backends.cassandra.utils.CassandraConstants;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.cassandra.CassandraId;
import org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.store.mail.UidProvider;
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.james.util.OptionalConverter;

import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.BuiltStatement;
import com.google.common.base.Throwables;

public class CassandraUidProvider implements UidProvider {
public final static int DEFAULT_MAX_RETRY = 100000;
private static final Logger LOG = LoggerFactory.getLogger(CassandraUidProvider.class);
private static final int DEFAULT_MAX_RETRY = 100000;
private static final String CONDITION = "Condition";

private final Session session;
private final CassandraAsyncExecutor executor;
private final FunctionRunnerWithRetry runner;
private final PreparedStatement insertStatement;
private final PreparedStatement updateStatement;
private final PreparedStatement selectStatement;

public CassandraUidProvider(Session session, int maxRetry) {
this.session = session;
this.executor = new CassandraAsyncExecutor(session);
this.runner = new FunctionRunnerWithRetry(maxRetry);
this.selectStatement = prepareSelect(session);
this.updateStatement = prepareUpdate(session);
this.insertStatement = prepareInsert(session);
}

private PreparedStatement prepareSelect(Session session) {
return session.prepare(select(NEXT_UID)
.from(TABLE_NAME)
.where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
}

private PreparedStatement prepareUpdate(Session session) {
return session.prepare(update(TABLE_NAME)
.onlyIf(eq(NEXT_UID, bindMarker(CONDITION)))
.with(set(NEXT_UID, bindMarker(NEXT_UID)))
.where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
}

private PreparedStatement prepareInsert(Session session) {
return session.prepare(insertInto(TABLE_NAME)
.value(NEXT_UID, MessageUid.MIN_VALUE.asLong())
.value(MAILBOX_ID, bindMarker(MAILBOX_ID))
.ifNotExists());
}

@Inject
Expand All @@ -74,75 +99,67 @@ public MessageUid nextUid(MailboxSession mailboxSession, Mailbox mailbox) throws
@Override
public MessageUid nextUid(MailboxSession session, MailboxId mailboxId) throws MailboxException {
CassandraId cassandraId = (CassandraId) mailboxId;
if (! findHighestUid(cassandraId).isPresent()) {
Optional<MessageUid> optional = tryInsertUid(cassandraId, Optional.empty());
if (optional.isPresent()) {
return optional.get();
}
}
return nextUid(cassandraId)
.join()
.orElseThrow(() -> new MailboxException("Error during Uid update"));
}

try {
return runner.executeAndRetrieveObject(
() -> {
try {
return tryUpdateUid(cassandraId, findHighestUid(cassandraId));
} catch (Exception exception) {
LOG.error("Can not retrieve next Uid", exception);
throw Throwables.propagate(exception);
}
});
} catch (LightweightTransactionException e) {
throw new MailboxException("Error during Uid update", e);
}
public CompletableFuture<Optional<MessageUid>> nextUid(CassandraId cassandraId) {
return findHighestUid(cassandraId)
.thenCompose(optional -> {
if (optional.isPresent()) {
return tryUpdateUid(cassandraId, optional);
}
return tryInsert(cassandraId);
})
.thenCompose(optional -> {
if (optional.isPresent()) {
return CompletableFuture.completedFuture(optional);
}
return runner.executeAsyncAndRetrieveObject(
() -> findHighestUid(cassandraId)
.thenCompose(readUid -> tryUpdateUid(cassandraId, readUid)));
});
}

@Override
public com.google.common.base.Optional<MessageUid> lastUid(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
return findHighestUid((CassandraId) mailbox.getMailboxId());
}

private com.google.common.base.Optional<MessageUid> findHighestUid(CassandraId mailboxId) throws MailboxException {
ResultSet result = session.execute(
select(NEXT_UID)
.from(CassandraMessageUidTable.TABLE_NAME)
.where(eq(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid())));
if (result.isExhausted()) {
return com.google.common.base.Optional.absent();
} else {
return com.google.common.base.Optional.of(MessageUid.of(result.one().getLong(NEXT_UID)));
}
return OptionalConverter.toGuava(findHighestUid((CassandraId) mailbox.getMailboxId()).join());
}

private Optional<MessageUid> tryInsertUid(CassandraId mailboxId, Optional<MessageUid> uid) {
MessageUid nextUid = uid.map(MessageUid::next).orElse(MessageUid.MIN_VALUE);
return transactionalStatementToOptionalUid(nextUid,
insertInto(CassandraMessageUidTable.TABLE_NAME)
.value(NEXT_UID, nextUid.asLong())
.value(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid())
.ifNotExists());
private CompletableFuture<Optional<MessageUid>> findHighestUid(CassandraId mailboxId) {
return executor.executeSingleRow(
selectStatement.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid()))
.thenApply(optional -> optional.map(row -> MessageUid.of(row.getLong(NEXT_UID))));
}

private Optional<MessageUid> tryUpdateUid(CassandraId mailboxId, com.google.common.base.Optional<MessageUid> uid) {
private CompletableFuture<Optional<MessageUid>> tryUpdateUid(CassandraId mailboxId, Optional<MessageUid> uid) {
if (uid.isPresent()) {
MessageUid nextUid = uid.get().next();
return transactionalStatementToOptionalUid(nextUid,
update(CassandraMessageUidTable.TABLE_NAME)
.onlyIf(eq(NEXT_UID, uid.get().asLong()))
.with(set(NEXT_UID, nextUid.asLong()))
.where(eq(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid())));
return executor.executeReturnApplied(
updateStatement.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid())
.setLong(CONDITION, uid.get().asLong())
.setLong(NEXT_UID, nextUid.asLong()))
.thenApply(success -> successToUid(nextUid, success));
} else {
return transactionalStatementToOptionalUid(MessageUid.MIN_VALUE,
update(CassandraMessageUidTable.TABLE_NAME)
.onlyIf(eq(NEXT_UID, null))
.with(set(NEXT_UID, MessageUid.MIN_VALUE.asLong()))
.where(eq(CassandraMessageUidTable.MAILBOX_ID, mailboxId.asUuid())));
return tryInsert(mailboxId);
}
}

private Optional<MessageUid> transactionalStatementToOptionalUid(MessageUid uid, BuiltStatement statement) {
if(session.execute(statement).one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED)) {
private CompletableFuture<Optional<MessageUid>> tryInsert(CassandraId mailboxId) {
return executor.executeReturnApplied(
insertStatement.bind()
.setUUID(MAILBOX_ID, mailboxId.asUuid()))
.thenApply(success -> successToUid(MessageUid.MIN_VALUE, success));
}

private Optional<MessageUid> successToUid(MessageUid uid, Boolean success) {
if (success) {
return Optional.of(uid);
}
return Optional.empty();
}

}
Expand Up @@ -34,7 +34,9 @@
import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
import org.apache.james.mailbox.cassandra.modules.CassandraMailboxCounterModule;
import org.apache.james.mailbox.cassandra.modules.CassandraModSeqModule;
import org.apache.james.mailbox.cassandra.modules.CassandraSubscriptionModule;
import org.apache.james.mailbox.cassandra.modules.CassandraUidModule;

/**
* Test Cassandra subscription against some general purpose written code.
Expand All @@ -43,7 +45,10 @@ public class CassandraSubscriptionManagerTest extends AbstractSubscriptionManage

private static final CassandraCluster cassandra = CassandraCluster.create(
new CassandraModuleComposite(
new CassandraSubscriptionModule(), new CassandraMailboxCounterModule()));
new CassandraSubscriptionModule(),
new CassandraMailboxCounterModule(),
new CassandraUidModule(),
new CassandraModSeqModule()));

@Override
public SubscriptionManager createSubscriptionManager() {
Expand Down

0 comments on commit 6d94a8f

Please sign in to comment.