Skip to content

Commit

Permalink
MAILBOX-307 Implement setAcl with a Cassandra read before writes
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa authored and mbaechler committed Sep 29, 2017
1 parent 998446b commit d5e1cf8
Showing 1 changed file with 34 additions and 41 deletions.
Expand Up @@ -29,10 +29,10 @@
import java.io.IOException; import java.io.IOException;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.Function;


import org.apache.james.backends.cassandra.init.CassandraConfiguration; import org.apache.james.backends.cassandra.init.CassandraConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.CassandraConstants;
import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry; import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
import org.apache.james.backends.cassandra.utils.LightweightTransactionException; import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraId;
Expand All @@ -49,7 +49,6 @@
import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row; import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session; import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;


Expand All @@ -66,7 +65,6 @@ public interface CodeInjector {
private final CassandraAsyncExecutor executor; private final CassandraAsyncExecutor executor;
private final int maxRetry; private final int maxRetry;
private final CodeInjector codeInjector; private final CodeInjector codeInjector;
private final PreparedStatement insertStatement;
private final PreparedStatement conditionalInsertStatement; private final PreparedStatement conditionalInsertStatement;
private final PreparedStatement conditionalUpdateStatement; private final PreparedStatement conditionalUpdateStatement;
private final PreparedStatement readStatement; private final PreparedStatement readStatement;
Expand All @@ -79,12 +77,20 @@ public CassandraACLMapper(Session session, CassandraConfiguration cassandraConfi
this.executor = new CassandraAsyncExecutor(session); this.executor = new CassandraAsyncExecutor(session);
this.maxRetry = cassandraConfiguration.getAclMaxRetry(); this.maxRetry = cassandraConfiguration.getAclMaxRetry();
this.codeInjector = codeInjector; this.codeInjector = codeInjector;
this.insertStatement = session.prepare(insertCqlBase()); this.conditionalInsertStatement = prepareConditionalInsert(session);
this.conditionalInsertStatement = session.prepare(insertCqlBase().ifNotExists());
this.conditionalUpdateStatement = prepareConditionalUpdate(session); this.conditionalUpdateStatement = prepareConditionalUpdate(session);
this.readStatement = prepareReadStatement(session); this.readStatement = prepareReadStatement(session);
} }


private PreparedStatement prepareConditionalInsert(Session session) {
return session.prepare(
insertInto(CassandraACLTable.TABLE_NAME)
.value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
.value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
.value(CassandraACLTable.VERSION, INITIAL_VALUE)
.ifNotExists());
}

private PreparedStatement prepareConditionalUpdate(Session session) { private PreparedStatement prepareConditionalUpdate(Session session) {
return session.prepare( return session.prepare(
update(CassandraACLTable.TABLE_NAME) update(CassandraACLTable.TABLE_NAME)
Expand All @@ -101,13 +107,6 @@ private PreparedStatement prepareReadStatement(Session session) {
.where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID)))); .where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID))));
} }


private Insert insertCqlBase() {
return insertInto(CassandraACLTable.TABLE_NAME)
.value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
.value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
.value(CassandraACLTable.VERSION, INITIAL_VALUE);
}

public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) { public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) {
return getStoredACLRow(cassandraId) return getStoredACLRow(cassandraId)
.thenApply(resultSet -> getAcl(cassandraId, resultSet)); .thenApply(resultSet -> getAcl(cassandraId, resultSet));
Expand All @@ -122,51 +121,42 @@ private MailboxACL getAcl(CassandraId cassandraId, ResultSet resultSet) {
} }


public void updateACL(CassandraId cassandraId, MailboxACL.ACLCommand command) throws MailboxException { public void updateACL(CassandraId cassandraId, MailboxACL.ACLCommand command) throws MailboxException {
MailboxACL replacement = MailboxACL.EMPTY.apply(command);

updateAcl(cassandraId, aclWithVersion -> aclWithVersion.apply(command), replacement);
}

public void setACL(CassandraId cassandraId, MailboxACL mailboxACL) throws MailboxException {
updateAcl(cassandraId,
acl -> new ACLWithVersion(acl.version, mailboxACL),
mailboxACL);
}

private void updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException {
try { try {
new FunctionRunnerWithRetry(maxRetry) new FunctionRunnerWithRetry(maxRetry)
.execute( .execute(
() -> { () -> {
codeInjector.inject(); codeInjector.inject();
ResultSet resultSet = getAclWithVersion(cassandraId) return getAclWithVersion(cassandraId)
.map(aclWithVersion -> aclWithVersion.apply(command)) .map(aclTransformation)
.map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion)) .map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion))
.orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command))); .orElseGet(() -> insertACL(cassandraId, replacement));
return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
}); });
} catch (LightweightTransactionException e) { } catch (LightweightTransactionException e) {
throw new MailboxException("Exception during lightweight transaction", e); throw new MailboxException("Exception during lightweight transaction", e);
} }
} }


public void setACL(CassandraId cassandraId, MailboxACL mailboxACL) {
try {
executor.executeVoid(
insertStatement.bind()
.setUUID(CassandraACLTable.ID, cassandraId.asUuid())
.setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(mailboxACL)))
.join();
} catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}

private MailboxACL applyCommandOnEmptyACL(MailboxACL.ACLCommand command) {
try {
return MailboxACL.EMPTY.apply(command);
} catch (UnsupportedRightException exception) {
throw Throwables.propagate(exception);
}
}

private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) { private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) {
return executor.execute( return executor.execute(
readStatement.bind() readStatement.bind()
.setUUID(CassandraACLTable.ID, cassandraId.asUuid())); .setUUID(CassandraACLTable.ID, cassandraId.asUuid()));
} }


private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) { private boolean updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
try { try {
return executor.execute( return executor.executeReturnApplied(
conditionalUpdateStatement.bind() conditionalUpdateStatement.bind()
.setUUID(CassandraACLTable.ID, cassandraId.asUuid()) .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
.setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL)) .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL))
Expand All @@ -178,9 +168,9 @@ private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWit
} }
} }


private ResultSet insertACL(CassandraId cassandraId, MailboxACL acl) { private boolean insertACL(CassandraId cassandraId, MailboxACL acl) {
try { try {
return executor.execute( return executor.executeReturnApplied(
conditionalInsertStatement.bind() conditionalInsertStatement.bind()
.setUUID(CassandraACLTable.ID, cassandraId.asUuid()) .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
.setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(acl))) .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(acl)))
Expand All @@ -196,7 +186,10 @@ private Optional<ACLWithVersion> getAclWithVersion(CassandraId cassandraId) {
return Optional.empty(); return Optional.empty();
} }
Row row = resultSet.one(); Row row = resultSet.one();
return Optional.of(new ACLWithVersion(row.getLong(CassandraACLTable.VERSION), deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL)))); return Optional.of(
new ACLWithVersion(
row.getLong(CassandraACLTable.VERSION),
deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL))));
} }


private MailboxACL deserializeACL(CassandraId cassandraId, String serializedACL) { private MailboxACL deserializeACL(CassandraId cassandraId, String serializedACL) {
Expand Down

0 comments on commit d5e1cf8

Please sign in to comment.