Skip to content

Commit

Permalink
JAMES-2082 Add configuration options for blob part size and on the fl…
Browse files Browse the repository at this point in the history
…y migration
  • Loading branch information
chibenwa authored and aduprat committed Jul 10, 2017
1 parent 8cb1905 commit 9c46faa
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 27 deletions.
Expand Up @@ -38,6 +38,8 @@ public class CassandraConfiguration {
public static final int DEFAULT_UID_MAX_RETRY = 100000;
public static final int DEFAULT_ACL_MAX_RETRY = 1000;
public static final int DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW = 100;
public static final boolean DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2 = false;
public static final int DEFAULT_BLOB_PART_SIZE = 100 * 1024;

public static class Builder {
private Optional<Integer> messageReadChunkSize = Optional.empty();
Expand All @@ -49,6 +51,8 @@ public static class Builder {
private Optional<Integer> uidMaxRetry = Optional.empty();
private Optional<Integer> aclMaxRetry = Optional.empty();
private Optional<Integer> fetchNextPageInAdvanceRow = Optional.empty();
private Optional<Integer> blobPartSize = Optional.empty();
private Optional<Boolean> onTheFlyV1ToV2Migration = Optional.empty();

public Builder messageReadChunkSize(int value) {
Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive");
Expand Down Expand Up @@ -104,6 +108,17 @@ public Builder fetchNextPageInAdvanceRow(int value) {
return this;
}

public Builder blobPartSize(int value) {
Preconditions.checkArgument(value > 0, "blobPartSize needs to be strictly positive");
this.blobPartSize = Optional.of(value);
return this;
}

public Builder onTheFlyV1ToV2Migration(boolean value) {
this.onTheFlyV1ToV2Migration = Optional.of(value);
return this;
}

public Builder messageReadChunkSize(Optional<Integer> value) {
value.ifPresent(this::messageReadChunkSize);
return this;
Expand Down Expand Up @@ -149,6 +164,16 @@ public Builder fetchNextPageInAdvanceRow(Optional<Integer> value) {
return this;
}

public Builder blobPartSize(Optional<Integer> value) {
value.ifPresent(this::blobPartSize);
return this;
}

public Builder onTheFlyV1ToV2Migration(Optional<Boolean> value) {
value.ifPresent(this::onTheFlyV1ToV2Migration);
return this;
}

public CassandraConfiguration build() {
return new CassandraConfiguration(aclMaxRetry.orElse(DEFAULT_ACL_MAX_RETRY),
messageReadChunkSize.orElse(DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ),
Expand All @@ -158,7 +183,9 @@ public CassandraConfiguration build() {
flagsUpdateMessageMaxRetry.orElse(DEFAULT_FLAGS_UPDATE_MESSAGE_MAX_RETRY),
modSeqMaxRetry.orElse(DEFAULT_MODSEQ_MAX_RETRY),
uidMaxRetry.orElse(DEFAULT_UID_MAX_RETRY),
fetchNextPageInAdvanceRow.orElse(DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW));
fetchNextPageInAdvanceRow.orElse(DEFAULT_FETCH_NEXT_PAGE_ADVANCE_IN_ROW),
blobPartSize.orElse(DEFAULT_BLOB_PART_SIZE),
onTheFlyV1ToV2Migration.orElse(DEFAULT_ON_THE_FLY_MIGRATION_V1_TO_V2));
}
}

Expand All @@ -175,11 +202,13 @@ public static Builder builder() {
private final int uidMaxRetry;
private final int aclMaxRetry;
private final int fetchNextPageInAdvanceRow;
private final int blobPartSize;
private final boolean onTheFlyV1ToV2Migration;

@VisibleForTesting
CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize, int flagsUpdateChunkSize,
int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry, int modSeqMaxRetry,
int uidMaxRetry, int fetchNextPageInAdvanceRow) {
int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry, int modSeqMaxRetry,
int uidMaxRetry, int fetchNextPageInAdvanceRow, int blobPartSize, boolean onTheFlyV1ToV2Migration) {
this.aclMaxRetry = aclMaxRetry;
this.messageReadChunkSize = messageReadChunkSize;
this.expungeChunkSize = expungeChunkSize;
Expand All @@ -189,6 +218,16 @@ public static Builder builder() {
this.uidMaxRetry = uidMaxRetry;
this.fetchNextPageInAdvanceRow = fetchNextPageInAdvanceRow;
this.flagsUpdateChunkSize = flagsUpdateChunkSize;
this.blobPartSize = blobPartSize;
this.onTheFlyV1ToV2Migration = onTheFlyV1ToV2Migration;
}

public int getBlobPartSize() {
return blobPartSize;
}

public boolean isOnTheFlyV1ToV2Migration() {
return onTheFlyV1ToV2Migration;
}

public int getFlagsUpdateChunkSize() {
Expand Down Expand Up @@ -240,15 +279,18 @@ public final boolean equals(Object o) {
&& Objects.equals(this.modSeqMaxRetry, that.modSeqMaxRetry)
&& Objects.equals(this.uidMaxRetry, that.uidMaxRetry)
&& Objects.equals(this.flagsUpdateChunkSize, that.flagsUpdateChunkSize)
&& Objects.equals(this.fetchNextPageInAdvanceRow, that.fetchNextPageInAdvanceRow);
&& Objects.equals(this.fetchNextPageInAdvanceRow, that.fetchNextPageInAdvanceRow)
&& Objects.equals(this.blobPartSize, that.blobPartSize)
&& Objects.equals(this.onTheFlyV1ToV2Migration, that.onTheFlyV1ToV2Migration);
}
return false;
}

@Override
public final int hashCode() {
return Objects.hash(aclMaxRetry, messageReadChunkSize, expungeChunkSize, flagsUpdateMessageIdMaxRetry,
flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow, flagsUpdateChunkSize);
flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow, flagsUpdateChunkSize,
blobPartSize, onTheFlyV1ToV2Migration);
}

@Override
Expand All @@ -263,6 +305,8 @@ public String toString() {
.add("fetchNextPageInAdvanceRow", fetchNextPageInAdvanceRow)
.add("flagsUpdateChunkSize", flagsUpdateChunkSize)
.add("uidMaxRetry", uidMaxRetry)
.add("blobPartSize", blobPartSize)
.add("onTheFlyV1ToV2Migration", onTheFlyV1ToV2Migration)
.toString();
}
}
Expand Up @@ -202,6 +202,8 @@ public void builderShouldCreateTheRightObject() {
int flagsUpdateChunkSize = 7;
int messageReadChunkSize = 8;
int expungeChunkSize = 9;
int blobPartSize = 10;
boolean onTheFlyV1ToV2Migration = true;

CassandraConfiguration configuration = CassandraConfiguration.builder()
.aclMaxRetry(aclMaxRetry)
Expand All @@ -213,6 +215,8 @@ public void builderShouldCreateTheRightObject() {
.flagsUpdateChunkSize(flagsUpdateChunkSize)
.messageReadChunkSize(messageReadChunkSize)
.expungeChunkSize(expungeChunkSize)
.blobPartSize(blobPartSize)
.onTheFlyV1ToV2Migration(onTheFlyV1ToV2Migration)
.build();

softly.assertThat(configuration.getAclMaxRetry()).isEqualTo(aclMaxRetry);
Expand All @@ -224,6 +228,8 @@ public void builderShouldCreateTheRightObject() {
softly.assertThat(configuration.getFlagsUpdateChunkSize()).isEqualTo(flagsUpdateChunkSize);
softly.assertThat(configuration.getMessageReadChunkSize()).isEqualTo(messageReadChunkSize);
softly.assertThat(configuration.getExpungeChunkSize()).isEqualTo(expungeChunkSize);
softly.assertThat(configuration.getBlobPartSize()).isEqualTo(blobPartSize);
softly.assertThat(configuration.isOnTheFlyV1ToV2Migration()).isEqualTo(onTheFlyV1ToV2Migration);
}

}
Expand Up @@ -16,4 +16,6 @@ cassandra.retryConnection.minDelay=5000
# fetch.advance.row.count=1000
# chunk.size.flags.update=20
# chunk.size.message.read=100
# chunk.size.expunge=100
# chunk.size.expunge=100
# mailbox.blob.part.size=102400
# migration.v1.v2.on.the.fly=false
Expand Up @@ -25,4 +25,6 @@ cassandra.retryConnection.minDelay=5000
# fetch.advance.row.count=1000
# chunk.size.flags.update=20
# chunk.size.message.read=100
# chunk.size.expunge=100
# chunk.size.expunge=100
# mailbox.blob.part.size=102400
# migration.v1.v2.on.the.fly=false
Expand Up @@ -33,6 +33,7 @@
import javax.inject.Inject;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.CassandraConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.mailbox.cassandra.ids.BlobId;
import org.apache.james.mailbox.cassandra.mail.utils.DataChunker;
Expand All @@ -45,22 +46,23 @@
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Bytes;

public class CassandraBlobsDAO {

public static final int CHUNK_SIZE = 1024 * 100;
private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final PreparedStatement insert;
private final PreparedStatement insertPart;
private final PreparedStatement select;
private final PreparedStatement selectPart;
private final DataChunker dataChunker;
private final CassandraConfiguration configuration;

@Inject
public CassandraBlobsDAO(Session session) {
public CassandraBlobsDAO(Session session, CassandraConfiguration cassandraConfiguration) {
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
this.configuration = cassandraConfiguration;
this.dataChunker = new DataChunker();
this.insert = prepareInsert(session);
this.select = prepareSelect(session);
Expand All @@ -69,6 +71,11 @@ public CassandraBlobsDAO(Session session) {
this.selectPart = prepareSelectPart(session);
}

@VisibleForTesting
public CassandraBlobsDAO(Session session) {
this(session, CassandraConfiguration.DEFAULT_CONFIGURATION);
}

private PreparedStatement prepareSelect(Session session) {
return session.prepare(select()
.from(BlobTable.TABLE_NAME)
Expand Down Expand Up @@ -106,8 +113,8 @@ public CompletableFuture<Optional<BlobId>> save(byte[] data) {
}

private CompletableFuture<Integer> saveBlobParts(byte[] data, BlobId blobId) {
return FluentFutureStream.of(
dataChunker.chunk(data, CHUNK_SIZE)
return FluentFutureStream.<Pair<Integer, Void>> of(
dataChunker.chunk(data, configuration.getBlobPartSize())
.map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
.thenApply(partId -> Pair.of(pair.getKey(), partId))))
.completableFuture()
Expand Down
Expand Up @@ -53,6 +53,7 @@

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.CassandraConfiguration;
import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.mailbox.cassandra.ids.BlobId;
Expand Down Expand Up @@ -82,17 +83,18 @@
import com.datastax.driver.core.UDTValue;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.github.steveash.guavate.Guavate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Bytes;

public class CassandraMessageDAOV2 {
public static final int CHUNK_SIZE_ON_READ = 100;
public static final long DEFAULT_LONG_VALUE = 0L;
public static final String DEFAULT_OBJECT_VALUE = null;
private static final byte[] EMPTY_BYTE_ARRAY = {};

private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final CassandraTypesProvider typesProvider;
private final CassandraBlobsDAO blobsDAO;
private final CassandraConfiguration configuration;
private final PreparedStatement insert;
private final PreparedStatement delete;
private final PreparedStatement selectMetadata;
Expand All @@ -101,10 +103,11 @@ public class CassandraMessageDAOV2 {
private final PreparedStatement selectBody;

@Inject
public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO) {
public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO, CassandraConfiguration cassandraConfiguration) {
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
this.typesProvider = typesProvider;
this.blobsDAO = blobsDAO;
this.configuration = cassandraConfiguration;
this.insert = prepareInsert(session);
this.delete = prepareDelete(session);
this.selectMetadata = prepareSelect(session, METADATA);
Expand All @@ -113,6 +116,11 @@ public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvid
this.selectBody = prepareSelect(session, BODY);
}

@VisibleForTesting
public CassandraMessageDAOV2(Session session, CassandraTypesProvider typesProvider, CassandraBlobsDAO blobsDAO) {
this(session, typesProvider, blobsDAO, CassandraConfiguration.DEFAULT_CONFIGURATION);
}

private PreparedStatement prepareSelect(Session session, String[] fields) {
return session.prepare(select(fields)
.from(TABLE_NAME)
Expand Down Expand Up @@ -196,7 +204,7 @@ private UDTValue toUDT(MessageAttachment messageAttachment) {
public CompletableFuture<Stream<MessageResult>> retrieveMessages(List<ComposedMessageIdWithMetaData> messageIds, FetchType fetchType, Limit limit) {
return CompletableFutureUtil.chainAll(
limit.applyOnStream(messageIds.stream().distinct())
.collect(JamesCollectors.chunker(CHUNK_SIZE_ON_READ)),
.collect(JamesCollectors.chunker(configuration.getMessageReadChunkSize())),
ids -> rowToMessages(fetchType, ids))
.thenApply(stream -> stream.flatMap(Function.identity()));
}
Expand Down
Expand Up @@ -89,7 +89,7 @@ public CassandraMessageIdMapper(MailboxMapper mailboxMapper, CassandraMailboxDAO
this.mailboxSession = mailboxSession;
this.attachmentLoader = new AttachmentLoader(attachmentMapper);
this.cassandraConfiguration = cassandraConfiguration;
this.v1ToV2Migration = new V1ToV2Migration(messageDAOV1, messageDAOV2, attachmentMapper);
this.v1ToV2Migration = new V1ToV2Migration(messageDAOV1, messageDAOV2, attachmentMapper, cassandraConfiguration);
}

@Override
Expand Down
Expand Up @@ -55,7 +55,6 @@
import org.apache.james.mailbox.store.mail.model.Mailbox;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.util.CompletableFutureUtil;
import org.apache.james.util.FluentFutureStream;
import org.apache.james.util.streams.JamesCollectors;
import org.slf4j.Logger;
Expand Down Expand Up @@ -109,7 +108,7 @@ public CassandraMessageMapper(CassandraUidProvider uidProvider, CassandraModSeqP
this.attachmentLoader = new AttachmentLoader(attachmentMapper);
this.applicableFlagDAO = applicableFlagDAO;
this.deletedMessageDAO = deletedMessageDAO;
this.v1ToV2Migration = new V1ToV2Migration(messageDAO, messageDAOV2, attachmentMapper);
this.v1ToV2Migration = new V1ToV2Migration(messageDAO, messageDAOV2, attachmentMapper, cassandraConfiguration);
this.cassandraConfiguration = cassandraConfiguration;
}

Expand Down
Expand Up @@ -21,18 +21,18 @@

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.CassandraConfiguration;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
import org.apache.james.mailbox.cassandra.mail.utils.Limit;
import org.apache.james.mailbox.cassandra.mail.AttachmentLoader;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment;
import org.apache.james.mailbox.cassandra.mail.utils.Limit;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
Expand All @@ -47,11 +47,14 @@ public class V1ToV2Migration {
private final CassandraMessageDAO messageDAOV1;
private final CassandraMessageDAOV2 messageDAOV2;
private final AttachmentLoader attachmentLoader;
private final CassandraConfiguration cassandraConfiguration;

public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, CassandraAttachmentMapper attachmentMapper) {
public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2,
CassandraAttachmentMapper attachmentMapper, CassandraConfiguration cassandraConfiguration) {
this.messageDAOV1 = messageDAOV1;
this.messageDAOV2 = messageDAOV2;
this.attachmentLoader = new AttachmentLoader(attachmentMapper);
this.cassandraConfiguration = cassandraConfiguration;
}

public CompletableFuture<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>>
Expand All @@ -70,19 +73,26 @@ public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 m
private CompletableFuture<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> performV1ToV2Migration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) {
return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), MessageMapper.FetchType.Full)
.thenApply(stream -> stream.findAny().get())
.thenCompose(this::saveInV2FromV1)
.thenCompose(this::deleteInV1)
.thenCompose(this::performV1ToV2Migration)
.thenApply(any -> messageV1);
}

private CompletableFuture<Void> performV1ToV2Migration(SimpleMailboxMessage message) {
if (!cassandraConfiguration.isOnTheFlyV1ToV2Migration()) {
return CompletableFuture.completedFuture(null);
}
return saveInV2FromV1(message)
.thenCompose(this::deleteInV1);
}

private CompletableFuture<Void> deleteInV1(Optional<SimpleMailboxMessage> optional) {
return optional.map(SimpleMailboxMessage::getMessageId)
.map(messageId -> (CassandraMessageId) messageId)
.map(messageDAOV1::delete)
.orElse(CompletableFuture.completedFuture(null));
}

private CompletionStage<Optional<SimpleMailboxMessage>> saveInV2FromV1(SimpleMailboxMessage message) {
private CompletableFuture<Optional<SimpleMailboxMessage>> saveInV2FromV1(SimpleMailboxMessage message) {
try {
return messageDAOV2.save(message).thenApply(any -> Optional.of(message));
} catch (MailboxException e) {
Expand Down

0 comments on commit 9c46faa

Please sign in to comment.