Skip to content

Commit

Permalink
JAMES-2082 proposition of a new organisation for blob
Browse files Browse the repository at this point in the history
  • Loading branch information
Luc DUZAN authored and aduprat committed Jul 10, 2017
1 parent bd69ab9 commit e9979b5
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 233 deletions.

This file was deleted.

Expand Up @@ -23,36 +23,31 @@
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs;


import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import java.util.stream.Stream; import java.util.stream.Stream;


import javax.inject.Inject; import javax.inject.Inject;


import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.CassandraUtils;
import org.apache.james.mailbox.cassandra.ids.BlobId; import org.apache.james.mailbox.cassandra.ids.BlobId;
import org.apache.james.mailbox.cassandra.ids.PartId;
import org.apache.james.mailbox.cassandra.mail.utils.DataChunker; import org.apache.james.mailbox.cassandra.mail.utils.DataChunker;
import org.apache.james.util.CompletableFutureUtil; import org.apache.james.mailbox.cassandra.table.BlobTable;
import org.apache.james.mailbox.cassandra.table.BlobTable.BlobParts;
import org.apache.james.util.FluentFutureStream; import org.apache.james.util.FluentFutureStream;
import org.apache.james.util.OptionalConverter; import org.apache.james.util.OptionalConverter;


import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row; import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session; import com.datastax.driver.core.Session;
import com.github.steveash.guavate.Guavate; import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Bytes; import com.google.common.primitives.Bytes;



public class CassandraBlobsDAO { public class CassandraBlobsDAO {


public static final int CHUNK_SIZE = 1024 * 100; public static final int CHUNK_SIZE = 1024 * 100;
Expand All @@ -76,26 +71,27 @@ public CassandraBlobsDAO(Session session) {


private PreparedStatement prepareSelect(Session session) { private PreparedStatement prepareSelect(Session session) {
return session.prepare(select() return session.prepare(select()
.from(Blobs.TABLE_NAME) .from(BlobTable.TABLE_NAME)
.where(eq(Blobs.ID, bindMarker(Blobs.ID)))); .where(eq(BlobTable.ID, bindMarker(BlobTable.ID))));
} }


private PreparedStatement prepareSelectPart(Session session) { private PreparedStatement prepareSelectPart(Session session) {
return session.prepare(select() return session.prepare(select()
.from(BlobParts.TABLE_NAME) .from(BlobParts.TABLE_NAME)
.where(eq(BlobParts.ID, bindMarker(BlobParts.ID)))); .where(eq(BlobTable.ID, bindMarker(BlobTable.ID)))
.and(eq(BlobParts.CHUNK_NUMBER, bindMarker(BlobParts.CHUNK_NUMBER))));
} }


private PreparedStatement prepareInsert(Session session) { private PreparedStatement prepareInsert(Session session) {
return session.prepare(insertInto(Blobs.TABLE_NAME) return session.prepare(insertInto(BlobTable.TABLE_NAME)
.value(Blobs.ID, bindMarker(Blobs.ID)) .value(BlobTable.ID, bindMarker(BlobTable.ID))
.value(Blobs.POSITION, bindMarker(Blobs.POSITION)) .value(BlobTable.NUMBER_OF_CHUNK, bindMarker(BlobTable.NUMBER_OF_CHUNK)));
.value(Blobs.PART, bindMarker(Blobs.PART)));
} }


private PreparedStatement prepareInsertPart(Session session) { private PreparedStatement prepareInsertPart(Session session) {
return session.prepare(insertInto(BlobParts.TABLE_NAME) return session.prepare(insertInto(BlobParts.TABLE_NAME)
.value(BlobParts.ID, bindMarker(BlobParts.ID)) .value(BlobTable.ID, bindMarker(BlobTable.ID))
.value(BlobParts.CHUNK_NUMBER, bindMarker(BlobParts.CHUNK_NUMBER))
.value(BlobParts.DATA, bindMarker(BlobParts.DATA))); .value(BlobParts.DATA, bindMarker(BlobParts.DATA)));
} }


Expand All @@ -105,55 +101,57 @@ public CompletableFuture<Optional<BlobId>> save(byte[] data) {
} }
BlobId blobId = BlobId.forPayload(data); BlobId blobId = BlobId.forPayload(data);
return saveBlobParts(data, blobId) return saveBlobParts(data, blobId)
.thenCompose(partIds -> saveBlobPartsReferences(blobId, partIds)) .thenCompose(numberOfChunk-> saveBlobPartsReferences(blobId, numberOfChunk))
.thenApply(any -> Optional.of(blobId)); .thenApply(any -> Optional.of(blobId));
} }


private CompletableFuture<Stream<Pair<Integer, PartId>>> saveBlobParts(byte[] data, BlobId blobId) { private CompletableFuture<Integer> saveBlobParts(byte[] data, BlobId blobId) {
return FluentFutureStream.of( return FluentFutureStream.of(
dataChunker.chunk(data, CHUNK_SIZE) dataChunker.chunk(data, CHUNK_SIZE)
.map(pair -> writePart(pair.getRight(), blobId, pair.getKey()) .map(pair -> writePart(pair.getRight(), blobId, pair.getKey())
.thenApply(partId -> Pair.of(pair.getKey(), partId)))) .thenApply(partId -> Pair.of(pair.getKey(), partId))))
.completableFuture(); .completableFuture()
.thenApply(stream ->
getLastOfStream(stream)
.map(numOfChunkAndPartId -> numOfChunkAndPartId.getLeft() + 1)
.orElse(0));
}

private static <T> Optional<T> getLastOfStream(Stream<T> stream) {
return stream.reduce((first, second) -> second);
} }


private CompletableFuture<PartId> writePart(ByteBuffer data, BlobId blobId, int position) { private CompletableFuture<Void> writePart(ByteBuffer data, BlobId blobId, int position) {
PartId partId = PartId.create(blobId, position);
return cassandraAsyncExecutor.executeVoid( return cassandraAsyncExecutor.executeVoid(
insertPart.bind() insertPart.bind()
.setString(BlobParts.ID, partId.getId()) .setString(BlobTable.ID, blobId.getId())
.setBytes(BlobParts.DATA, data)) .setInt(BlobParts.CHUNK_NUMBER, position)
.thenApply(any -> partId); .setBytes(BlobParts.DATA, data));
} }


private CompletableFuture<Stream<Void>> saveBlobPartsReferences(BlobId blobId, Stream<Pair<Integer, PartId>> stream) { private CompletableFuture<Void> saveBlobPartsReferences(BlobId blobId, int numberOfChunk) {
return FluentFutureStream.of(stream.map(pair -> return cassandraAsyncExecutor.executeVoid(insert.bind()
cassandraAsyncExecutor.executeVoid(insert.bind() .setString(BlobTable.ID, blobId.getId())
.setString(Blobs.ID, blobId.getId()) .setInt(BlobTable.NUMBER_OF_CHUNK, numberOfChunk));
.setLong(Blobs.POSITION, pair.getKey())
.setString(Blobs.PART, pair.getValue().getId()))))
.completableFuture();
} }


public CompletableFuture<byte[]> read(BlobId blobId) { public CompletableFuture<byte[]> read(BlobId blobId) {
return cassandraAsyncExecutor.execute( return cassandraAsyncExecutor.executeSingleRow(
select.bind() select.bind()
.setString(Blobs.ID, blobId.getId())) .setString(BlobTable.ID, blobId.getId()))
.thenApply(this::toPartIds)
.thenCompose(this::toDataParts) .thenCompose(this::toDataParts)
.thenApply(this::concatenateDataParts); .thenApply(this::concatenateDataParts);
} }


private ImmutableMap<Long, PartId> toPartIds(ResultSet resultSet) { private CompletableFuture<Stream<Optional<Row>>> toDataParts(Optional<Row> blobRowOptional) {
return CassandraUtils.convertToStream(resultSet) return blobRowOptional.map(blobRow -> {
.map(row -> Pair.of(row.getLong(Blobs.POSITION), PartId.from(row.getString(Blobs.PART)))) BlobId blobId = BlobId.from(blobRow.getString(BlobTable.ID));
.collect(Guavate.toImmutableMap(Pair::getKey, Pair::getValue)); int numOfChunk = blobRow.getInt(BlobTable.NUMBER_OF_CHUNK);
} return FluentFutureStream.of(

IntStream.range(0, numOfChunk)
private CompletableFuture<Stream<Optional<Row>>> toDataParts(ImmutableMap<Long, PartId> positionToIds) { .mapToObj(position -> readPart(blobId, position)))
return CompletableFutureUtil.chainAll( .completableFuture();
positionToIds.values().stream(), }).orElse(CompletableFuture.completedFuture(Stream.empty()));
this::readPart);
} }


private byte[] concatenateDataParts(Stream<Optional<Row>> rows) { private byte[] concatenateDataParts(Stream<Optional<Row>> rows) {
Expand All @@ -170,8 +168,10 @@ private byte[] rowToData(Row row) {
return data; return data;
} }


private CompletableFuture<Optional<Row>> readPart(PartId partId) { private CompletableFuture<Optional<Row>> readPart(BlobId blobId, int position) {
return cassandraAsyncExecutor.executeSingleRow(selectPart.bind() return cassandraAsyncExecutor.executeSingleRow(
.setString(BlobParts.ID, partId.getId())); selectPart.bind()
.setString(BlobTable.ID, blobId.getId())
.setInt(BlobParts.CHUNK_NUMBER, position));
} }
} }
Expand Up @@ -24,12 +24,11 @@
import org.apache.james.backends.cassandra.components.CassandraModule; import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.components.CassandraTable; import org.apache.james.backends.cassandra.components.CassandraTable;
import org.apache.james.backends.cassandra.components.CassandraType; import org.apache.james.backends.cassandra.components.CassandraType;
import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.BlobParts;
import org.apache.james.mailbox.cassandra.table.CassandraMessageV2Table.Blobs;


import com.datastax.driver.core.DataType; import com.datastax.driver.core.DataType;
import com.datastax.driver.core.schemabuilder.SchemaBuilder; import com.datastax.driver.core.schemabuilder.SchemaBuilder;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.james.mailbox.cassandra.table.BlobTable;


public class CassandraBlobModule implements CassandraModule { public class CassandraBlobModule implements CassandraModule {


Expand All @@ -38,17 +37,17 @@ public class CassandraBlobModule implements CassandraModule {


public CassandraBlobModule() { public CassandraBlobModule() {
tables = ImmutableList.of( tables = ImmutableList.of(
new CassandraTable(Blobs.TABLE_NAME, new CassandraTable(BlobTable.TABLE_NAME,
SchemaBuilder.createTable(Blobs.TABLE_NAME) SchemaBuilder.createTable(BlobTable.TABLE_NAME)
.ifNotExists() .ifNotExists()
.addPartitionKey(Blobs.ID, DataType.text()) .addPartitionKey(BlobTable.ID, DataType.text())
.addClusteringColumn(Blobs.POSITION, DataType.bigint()) .addClusteringColumn(BlobTable.NUMBER_OF_CHUNK, DataType.cint())),
.addColumn(Blobs.PART, DataType.text())), new CassandraTable(BlobTable.BlobParts.TABLE_NAME,
new CassandraTable(BlobParts.TABLE_NAME, SchemaBuilder.createTable(BlobTable.BlobParts.TABLE_NAME)
SchemaBuilder.createTable(BlobParts.TABLE_NAME)
.ifNotExists() .ifNotExists()
.addPartitionKey(BlobParts.ID, DataType.text()) .addPartitionKey(BlobTable.ID, DataType.text())
.addColumn(BlobParts.DATA, DataType.blob()))); .addClusteringColumn(BlobTable.BlobParts.CHUNK_NUMBER, DataType.cint())
.addColumn(BlobTable.BlobParts.DATA, DataType.blob())));
types = ImmutableList.of(); types = ImmutableList.of();
} }


Expand Down
Expand Up @@ -19,15 +19,14 @@


package org.apache.james.mailbox.cassandra.table; package org.apache.james.mailbox.cassandra.table;


public interface BlobsTable { public interface BlobTable {
String TABLE_NAME = "blobs"; String TABLE_NAME = "blobs";
String ID = "id"; String ID = "id";
String POSITION = "position"; String NUMBER_OF_CHUNK = "position";
String PART = "part";


interface BlobParts { interface BlobParts {
String TABLE_NAME = "blobParts"; String TABLE_NAME = "blobParts";
String ID = "id"; String CHUNK_NUMBER = "chunkNumber";
String DATA = "data"; String DATA = "data";
} }
} }
Expand Up @@ -52,16 +52,4 @@ interface Attachments {
String IS_INLINE = "isInline"; String IS_INLINE = "isInline";
} }


interface Blobs {
String TABLE_NAME = "blobs";
String ID = "id";
String POSITION = "position";
String PART = "part";
}

interface BlobParts {
String TABLE_NAME = "blobParts";
String ID = "id";
String DATA = "data";
}
} }

0 comments on commit e9979b5

Please sign in to comment.