diff --git a/server/data/data-jmap-cassandra/pom.xml b/server/data/data-jmap-cassandra/pom.xml
index 567591de2e5..ec5a89899d0 100644
--- a/server/data/data-jmap-cassandra/pom.xml
+++ b/server/data/data-jmap-cassandra/pom.xml
@@ -54,6 +54,16 @@
${james.groupId}
apache-james-mailbox-cassandra
+
+ ${james.groupId}
+ blob-memory
+ test
+
+
+ ${james.groupId}
+ blob-storage-strategy
+ test
+
${james.groupId}
event-sourcing-event-store-cassandra
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
new file mode 100644
index 00000000000..d32eacef9ba
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
@@ -0,0 +1,68 @@
+package org.apache.james.jmap.cassandra.upload;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+
+import java.io.InputStream;
+
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.Upload;
+import org.apache.james.jmap.api.model.UploadId;
+import org.apache.james.jmap.api.model.UploadMetaData;
+import org.apache.james.jmap.api.model.UploadNotFoundException;
+import org.apache.james.jmap.api.upload.UploadRepository;
+import org.apache.james.mailbox.model.ContentType;
+import org.reactivestreams.Publisher;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.io.CountingInputStream;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class CassandraUploadRepository implements UploadRepository {
+ private final UploadDAO uploadDAO;
+ private final BlobStore blobStore;
+ private final BucketNameGenerator bucketNameGenerator;
+
+ public CassandraUploadRepository(UploadDAO uploadDAO, BlobStore blobStore, BucketNameGenerator bucketNameGenerator) {
+ this.uploadDAO = uploadDAO;
+ this.blobStore = blobStore;
+ this.bucketNameGenerator = bucketNameGenerator;
+ }
+
+ @Override
+ public Publisher upload(InputStream data, ContentType contentType, Username user) {
+ UploadId uploadId = generateId();
+ UploadBucketName uploadBucketName = bucketNameGenerator.current();
+ BucketName bucketName = uploadBucketName.asBucketName();
+
+ return Mono.fromCallable(() -> new CountingInputStream(data))
+ .flatMap(countingInputStream -> Mono.from(blobStore.save(bucketName, countingInputStream, LOW_COST))
+ .map(blobId -> new UploadDAO.UploadRepresentation(uploadId, bucketName, blobId, contentType, countingInputStream.getCount(), user))
+ .flatMap(upload -> uploadDAO.save(upload).thenReturn(upload.getId())));
+ }
+
+ @Override
+ public Publisher retrieve(UploadId id, Username user) {
+ return uploadDAO.retrieve(id)
+ .filter(upload -> upload.getUser().equals(user))
+ .map(upload -> Upload.from(
+ UploadMetaData.from(id, upload.getContentType(), upload.getSize(), upload.getBlobId()),
+ () -> blobStore.read(upload.getBucketName(), upload.getBlobId(), LOW_COST)))
+ .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id)));
+ }
+
+ public Mono purge() {
+ return Flux.from(blobStore.listBuckets())
+ .handle((bucketName, sink) -> UploadBucketName.ofBucket(bucketName).ifPresentOrElse(sink::next, sink::complete))
+ .filter(bucketNameGenerator.evictionPredicate())
+ .concatMap(bucket -> blobStore.deleteBucket(bucket.asBucketName()))
+ .then();
+ }
+
+ private UploadId generateId() {
+ return UploadId.from(UUIDs.timeBased());
+ }
+}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java
new file mode 100644
index 00000000000..675715dc6cf
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java
@@ -0,0 +1,61 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.upload;
+
+import java.time.Duration;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+
+public class UploadConfiguration {
+ public static UploadConfiguration SINGLETON = new UploadConfiguration(Duration.ofDays(7));
+
+ private final Duration uploadTtlDuration;
+
+ public UploadConfiguration(Duration uploadTtlDuration) {
+ this.uploadTtlDuration = uploadTtlDuration;
+ }
+
+ public Duration getUploadTtlDuration() {
+ return uploadTtlDuration;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof UploadConfiguration) {
+ UploadConfiguration other = (UploadConfiguration) obj;
+ return Objects.equal(uploadTtlDuration, other.uploadTtlDuration);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(uploadTtlDuration);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects
+ .toStringHelper(this)
+ .add("uploadTtlDuration", uploadTtlDuration)
+ .toString();
+ }
+}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java
new file mode 100644
index 00000000000..07dd8b405e0
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java
@@ -0,0 +1,166 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.upload;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.BLOB_ID;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.BUCKET_ID;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.CONTENT_TYPE;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.ID;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.SIZE;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.USER;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.UploadId;
+import org.apache.james.mailbox.model.ContentType;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Mono;
+
+public class UploadDAO {
+ public static class UploadRepresentation {
+ private final UploadId id;
+ private final BucketName bucketName;
+ private final BlobId blobId;
+ private final ContentType contentType;
+ private final long size;
+ private final Username user;
+
+ public UploadRepresentation(UploadId id, BucketName bucketName, BlobId blobId, ContentType contentType, long size, Username user) {
+ this.user = user;
+ Preconditions.checkArgument(size >= 0, "Size must be strictly positive");
+ this.id = id;
+ this.bucketName = bucketName;
+ this.blobId = blobId;
+ this.contentType = contentType;
+ this.size = size;
+ }
+
+ public UploadId getId() {
+ return id;
+ }
+
+ public BucketName getBucketName() {
+ return bucketName;
+ }
+
+ public BlobId getBlobId() {
+ return blobId;
+ }
+
+ public ContentType getContentType() {
+ return contentType;
+ }
+
+ public long getSize() {
+ return size;
+ }
+
+ public Username getUser() {
+ return user;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof UploadRepresentation) {
+ UploadRepresentation other = (UploadRepresentation) obj;
+ return Objects.equal(id, other.id)
+ && Objects.equal(bucketName, other.bucketName)
+ && Objects.equal(user, other.user)
+ && Objects.equal(blobId, other.blobId)
+ && Objects.equal(contentType, other.contentType)
+ && Objects.equal(size, other.size);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(id, bucketName, blobId, contentType, size, user);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects
+ .toStringHelper(this)
+ .add("id", id)
+ .add("bucketName", bucketName)
+ .add("blobId", blobId)
+ .add("contentType", contentType)
+ .add("user", user)
+ .add("size", size)
+ .toString();
+ }
+ }
+
+ private final CassandraAsyncExecutor executor;
+ private final BlobId.Factory blobIdFactory;
+ private final PreparedStatement insert;
+ private final PreparedStatement selectOne;
+
+ public UploadDAO(Session session, BlobId.Factory blobIdFactory, UploadConfiguration configuration) {
+ this.executor = new CassandraAsyncExecutor(session);
+ this.blobIdFactory = blobIdFactory;
+ this.insert = session.prepare(insertInto(TABLE_NAME)
+ .value(ID, bindMarker(ID))
+ .value(BUCKET_ID, bindMarker(BUCKET_ID))
+ .value(BLOB_ID, bindMarker(BLOB_ID))
+ .value(SIZE, bindMarker(SIZE))
+ .value(USER, bindMarker(USER))
+ .value(CONTENT_TYPE, bindMarker(CONTENT_TYPE))
+ .using(ttl((int) configuration.getUploadTtlDuration().getSeconds())));
+ this.selectOne = session.prepare(select().from(TABLE_NAME)
+ .where(eq(ID, bindMarker(ID))));
+ }
+
+ public Mono save(UploadRepresentation uploadRepresentation) {
+ return executor.executeVoid(insert.bind()
+ .setUUID(ID, uploadRepresentation.getId().getId())
+ .setString(BUCKET_ID, uploadRepresentation.getBucketName().asString())
+ .setString(BLOB_ID, uploadRepresentation.getBlobId().asString())
+ .setLong(SIZE, uploadRepresentation.getSize())
+ .setString(USER, uploadRepresentation.getUser().asString())
+ .setString(CONTENT_TYPE, uploadRepresentation.getContentType().asString()));
+ }
+
+ public Mono retrieve(UploadId id) {
+ return executor.executeSingleRow(selectOne.bind()
+ .setUUID(ID, id.getId()))
+ .map(row -> new UploadRepresentation(id,
+ BucketName.of(row.getString(BUCKET_ID)),
+ blobIdFactory.from(row.getString(BLOB_ID)),
+ ContentType.of(row.getString(CONTENT_TYPE)),
+ row.getLong(SIZE),
+ Username.of(row.getString(USER))));
+ }
+}
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java
new file mode 100644
index 00000000000..52a677b63fa
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java
@@ -0,0 +1,59 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.upload;
+
+import static com.datastax.driver.core.DataType.bigint;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timeuuid;
+import static com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.DAYS;
+import static org.apache.james.backends.cassandra.utils.CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+
+public interface UploadModule {
+
+ String TABLE_NAME = "uploads";
+
+ String ID = "id";
+ String CONTENT_TYPE = "content_type";
+ String SIZE = "size";
+ String BUCKET_ID = "bucket_id";
+ String BLOB_ID = "blob_id";
+ String USER = "user";
+
+ CassandraModule MODULE = CassandraModule.table(TABLE_NAME)
+ .comment("Holds JMAP uploads")
+ .options(options -> options
+ .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy()
+ .compactionWindowSize(7)
+ .compactionWindowUnit(DAYS))
+ .caching(SchemaBuilder.KeyCaching.ALL, SchemaBuilder.rows(DEFAULT_CACHED_ROW_PER_PARTITION)))
+ .statement(statement -> statement
+ .addPartitionKey(ID, timeuuid())
+ .addColumn(CONTENT_TYPE, text())
+ .addColumn(SIZE, bigint())
+ .addColumn(BUCKET_ID, text())
+ .addColumn(BLOB_ID, text())
+ .addColumn(USER, text()))
+
+ .build();
+}
diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java
new file mode 100644
index 00000000000..c4b71cddb61
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java
@@ -0,0 +1,80 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.upload;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.UploadId;
+import org.apache.james.jmap.api.model.UploadNotFoundException;
+import org.apache.james.jmap.api.upload.UploadRepository;
+import org.apache.james.jmap.api.upload.UploadRepositoryContract;
+import org.apache.james.mailbox.model.ContentType;
+import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.datastax.driver.core.utils.UUIDs;
+
+import reactor.core.publisher.Mono;
+
+class CassandraUploadRepositoryTest implements UploadRepositoryContract {
+ @RegisterExtension
+ static CassandraClusterExtension cassandra = new CassandraClusterExtension(UploadModule.MODULE);
+ private CassandraUploadRepository testee;
+
+ @BeforeEach
+ void setUp() {
+ testee = new CassandraUploadRepository(new UploadDAO(cassandra.getCassandraCluster().getConf(),
+ new HashBlobId.Factory(),
+ new UploadConfiguration(Duration.ofSeconds(5))),
+ new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.of("default"), new HashBlobId.Factory()),
+ new BucketNameGenerator(Clock.systemUTC()));
+ }
+
+ @Override
+ public UploadId randomUploadId() {
+ return UploadId.from(UUIDs.timeBased());
+ }
+
+ @Override
+ public UploadRepository testee() {
+ return testee;
+ }
+
+ @Test
+ void uploadShouldExpire() throws Exception {
+ Username bob = Username.of("bob");
+ UploadId id = Mono.from(testee.upload(data(), ContentType.of("text/plain"), bob)).block();
+
+ Thread.sleep(6000);
+
+ assertThatThrownBy(() -> Mono.from(testee.retrieve(id, bob)).blockOptional())
+ .isInstanceOf(UploadNotFoundException.class);
+ }
+}
\ No newline at end of file
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
index 6757f90d56d..0eaa000f4ab 100644
--- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
@@ -42,6 +42,8 @@
trait UploadRepositoryContract {
+ def randomUploadId(): UploadId = UploadId.from(UUID.randomUUID())
+
def testee: UploadRepository
def data(): InputStream = IOUtils.toInputStream(DATA_STRING, StandardCharsets.UTF_8)
@@ -87,7 +89,7 @@
@Test
def retrieveShouldThrowWhenUploadIdIsNotExist(): Unit = {
- assertThatThrownBy(() => SMono.fromPublisher(testee.retrieve(UploadId.from(UUID.randomUUID()), USER)).block())
+ assertThatThrownBy(() => SMono.fromPublisher(testee.retrieve(randomUploadId(), USER)).block())
.isInstanceOf(classOf[UploadNotFoundException])
}