From a7724744df644fbd1f763be0673d75d190324e68 Mon Sep 17 00:00:00 2001 From: Benoit Tellier Date: Wed, 4 Aug 2021 17:27:23 +0700 Subject: [PATCH] JAMES-3544 Implement a distributed UploadRepository --- server/data/data-jmap-cassandra/pom.xml | 10 ++ .../upload/CassandraUploadRepository.java | 68 +++++++ .../cassandra/upload/UploadConfiguration.java | 61 +++++++ .../jmap/cassandra/upload/UploadDAO.java | 166 ++++++++++++++++++ .../jmap/cassandra/upload/UploadModule.java | 59 +++++++ .../upload/CassandraUploadRepositoryTest.java | 80 +++++++++ .../api/upload/UploadRepositoryContract.scala | 4 +- 7 files changed, 447 insertions(+), 1 deletion(-) create mode 100644 server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java create mode 100644 server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java create mode 100644 server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java create mode 100644 server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java create mode 100644 server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java diff --git a/server/data/data-jmap-cassandra/pom.xml b/server/data/data-jmap-cassandra/pom.xml index 567591de2e5..ec5a89899d0 100644 --- a/server/data/data-jmap-cassandra/pom.xml +++ b/server/data/data-jmap-cassandra/pom.xml @@ -54,6 +54,16 @@ ${james.groupId} apache-james-mailbox-cassandra + + ${james.groupId} + blob-memory + test + + + ${james.groupId} + blob-storage-strategy + test + ${james.groupId} event-sourcing-event-store-cassandra diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java new file mode 100644 index 00000000000..d32eacef9ba --- /dev/null +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java @@ -0,0 +1,68 @@ +package org.apache.james.jmap.cassandra.upload; + +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; + +import java.io.InputStream; + +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BucketName; +import org.apache.james.core.Username; +import org.apache.james.jmap.api.model.Upload; +import org.apache.james.jmap.api.model.UploadId; +import org.apache.james.jmap.api.model.UploadMetaData; +import org.apache.james.jmap.api.model.UploadNotFoundException; +import org.apache.james.jmap.api.upload.UploadRepository; +import org.apache.james.mailbox.model.ContentType; +import org.reactivestreams.Publisher; + +import com.datastax.driver.core.utils.UUIDs; +import com.google.common.io.CountingInputStream; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class CassandraUploadRepository implements UploadRepository { + private final UploadDAO uploadDAO; + private final BlobStore blobStore; + private final BucketNameGenerator bucketNameGenerator; + + public CassandraUploadRepository(UploadDAO uploadDAO, BlobStore blobStore, BucketNameGenerator bucketNameGenerator) { + this.uploadDAO = uploadDAO; + this.blobStore = blobStore; + this.bucketNameGenerator = bucketNameGenerator; + } + + @Override + public Publisher upload(InputStream data, ContentType contentType, Username user) { + UploadId uploadId = generateId(); + UploadBucketName uploadBucketName = bucketNameGenerator.current(); + BucketName bucketName = uploadBucketName.asBucketName(); + + return Mono.fromCallable(() -> new CountingInputStream(data)) + .flatMap(countingInputStream -> Mono.from(blobStore.save(bucketName, countingInputStream, LOW_COST)) + .map(blobId -> new UploadDAO.UploadRepresentation(uploadId, bucketName, blobId, contentType, countingInputStream.getCount(), user)) + .flatMap(upload -> uploadDAO.save(upload).thenReturn(upload.getId()))); + } + + @Override + public Publisher retrieve(UploadId id, Username user) { + return uploadDAO.retrieve(id) + .filter(upload -> upload.getUser().equals(user)) + .map(upload -> Upload.from( + UploadMetaData.from(id, upload.getContentType(), upload.getSize(), upload.getBlobId()), + () -> blobStore.read(upload.getBucketName(), upload.getBlobId(), LOW_COST))) + .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id))); + } + + public Mono purge() { + return Flux.from(blobStore.listBuckets()) + .handle((bucketName, sink) -> UploadBucketName.ofBucket(bucketName).ifPresentOrElse(sink::next, sink::complete)) + .filter(bucketNameGenerator.evictionPredicate()) + .concatMap(bucket -> blobStore.deleteBucket(bucket.asBucketName())) + .then(); + } + + private UploadId generateId() { + return UploadId.from(UUIDs.timeBased()); + } +} diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java new file mode 100644 index 00000000000..675715dc6cf --- /dev/null +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java @@ -0,0 +1,61 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.cassandra.upload; + +import java.time.Duration; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; + +public class UploadConfiguration { + public static UploadConfiguration SINGLETON = new UploadConfiguration(Duration.ofDays(7)); + + private final Duration uploadTtlDuration; + + public UploadConfiguration(Duration uploadTtlDuration) { + this.uploadTtlDuration = uploadTtlDuration; + } + + public Duration getUploadTtlDuration() { + return uploadTtlDuration; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof UploadConfiguration) { + UploadConfiguration other = (UploadConfiguration) obj; + return Objects.equal(uploadTtlDuration, other.uploadTtlDuration); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(uploadTtlDuration); + } + + @Override + public String toString() { + return MoreObjects + .toStringHelper(this) + .add("uploadTtlDuration", uploadTtlDuration) + .toString(); + } +} diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java new file mode 100644 index 00000000000..07dd8b405e0 --- /dev/null +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java @@ -0,0 +1,166 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.cassandra.upload; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl; +import static org.apache.james.jmap.cassandra.upload.UploadModule.BLOB_ID; +import static org.apache.james.jmap.cassandra.upload.UploadModule.BUCKET_ID; +import static org.apache.james.jmap.cassandra.upload.UploadModule.CONTENT_TYPE; +import static org.apache.james.jmap.cassandra.upload.UploadModule.ID; +import static org.apache.james.jmap.cassandra.upload.UploadModule.SIZE; +import static org.apache.james.jmap.cassandra.upload.UploadModule.TABLE_NAME; +import static org.apache.james.jmap.cassandra.upload.UploadModule.USER; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BucketName; +import org.apache.james.core.Username; +import org.apache.james.jmap.api.model.UploadId; +import org.apache.james.mailbox.model.ContentType; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +import reactor.core.publisher.Mono; + +public class UploadDAO { + public static class UploadRepresentation { + private final UploadId id; + private final BucketName bucketName; + private final BlobId blobId; + private final ContentType contentType; + private final long size; + private final Username user; + + public UploadRepresentation(UploadId id, BucketName bucketName, BlobId blobId, ContentType contentType, long size, Username user) { + this.user = user; + Preconditions.checkArgument(size >= 0, "Size must be strictly positive"); + this.id = id; + this.bucketName = bucketName; + this.blobId = blobId; + this.contentType = contentType; + this.size = size; + } + + public UploadId getId() { + return id; + } + + public BucketName getBucketName() { + return bucketName; + } + + public BlobId getBlobId() { + return blobId; + } + + public ContentType getContentType() { + return contentType; + } + + public long getSize() { + return size; + } + + public Username getUser() { + return user; + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof UploadRepresentation) { + UploadRepresentation other = (UploadRepresentation) obj; + return Objects.equal(id, other.id) + && Objects.equal(bucketName, other.bucketName) + && Objects.equal(user, other.user) + && Objects.equal(blobId, other.blobId) + && Objects.equal(contentType, other.contentType) + && Objects.equal(size, other.size); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(id, bucketName, blobId, contentType, size, user); + } + + @Override + public String toString() { + return MoreObjects + .toStringHelper(this) + .add("id", id) + .add("bucketName", bucketName) + .add("blobId", blobId) + .add("contentType", contentType) + .add("user", user) + .add("size", size) + .toString(); + } + } + + private final CassandraAsyncExecutor executor; + private final BlobId.Factory blobIdFactory; + private final PreparedStatement insert; + private final PreparedStatement selectOne; + + public UploadDAO(Session session, BlobId.Factory blobIdFactory, UploadConfiguration configuration) { + this.executor = new CassandraAsyncExecutor(session); + this.blobIdFactory = blobIdFactory; + this.insert = session.prepare(insertInto(TABLE_NAME) + .value(ID, bindMarker(ID)) + .value(BUCKET_ID, bindMarker(BUCKET_ID)) + .value(BLOB_ID, bindMarker(BLOB_ID)) + .value(SIZE, bindMarker(SIZE)) + .value(USER, bindMarker(USER)) + .value(CONTENT_TYPE, bindMarker(CONTENT_TYPE)) + .using(ttl((int) configuration.getUploadTtlDuration().getSeconds()))); + this.selectOne = session.prepare(select().from(TABLE_NAME) + .where(eq(ID, bindMarker(ID)))); + } + + public Mono save(UploadRepresentation uploadRepresentation) { + return executor.executeVoid(insert.bind() + .setUUID(ID, uploadRepresentation.getId().getId()) + .setString(BUCKET_ID, uploadRepresentation.getBucketName().asString()) + .setString(BLOB_ID, uploadRepresentation.getBlobId().asString()) + .setLong(SIZE, uploadRepresentation.getSize()) + .setString(USER, uploadRepresentation.getUser().asString()) + .setString(CONTENT_TYPE, uploadRepresentation.getContentType().asString())); + } + + public Mono retrieve(UploadId id) { + return executor.executeSingleRow(selectOne.bind() + .setUUID(ID, id.getId())) + .map(row -> new UploadRepresentation(id, + BucketName.of(row.getString(BUCKET_ID)), + blobIdFactory.from(row.getString(BLOB_ID)), + ContentType.of(row.getString(CONTENT_TYPE)), + row.getLong(SIZE), + Username.of(row.getString(USER)))); + } +} diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java new file mode 100644 index 00000000000..52a677b63fa --- /dev/null +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java @@ -0,0 +1,59 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.cassandra.upload; + +import static com.datastax.driver.core.DataType.bigint; +import static com.datastax.driver.core.DataType.text; +import static com.datastax.driver.core.DataType.timeuuid; +import static com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.DAYS; +import static org.apache.james.backends.cassandra.utils.CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION; + +import org.apache.james.backends.cassandra.components.CassandraModule; + +import com.datastax.driver.core.schemabuilder.SchemaBuilder; + +public interface UploadModule { + + String TABLE_NAME = "uploads"; + + String ID = "id"; + String CONTENT_TYPE = "content_type"; + String SIZE = "size"; + String BUCKET_ID = "bucket_id"; + String BLOB_ID = "blob_id"; + String USER = "user"; + + CassandraModule MODULE = CassandraModule.table(TABLE_NAME) + .comment("Holds JMAP uploads") + .options(options -> options + .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy() + .compactionWindowSize(7) + .compactionWindowUnit(DAYS)) + .caching(SchemaBuilder.KeyCaching.ALL, SchemaBuilder.rows(DEFAULT_CACHED_ROW_PER_PARTITION))) + .statement(statement -> statement + .addPartitionKey(ID, timeuuid()) + .addColumn(CONTENT_TYPE, text()) + .addColumn(SIZE, bigint()) + .addColumn(BUCKET_ID, text()) + .addColumn(BLOB_ID, text()) + .addColumn(USER, text())) + + .build(); +} diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java new file mode 100644 index 00000000000..c4b71cddb61 --- /dev/null +++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java @@ -0,0 +1,80 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.cassandra.upload; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.time.Clock; +import java.time.Duration; + +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.memory.MemoryBlobStoreDAO; +import org.apache.james.core.Username; +import org.apache.james.jmap.api.model.UploadId; +import org.apache.james.jmap.api.model.UploadNotFoundException; +import org.apache.james.jmap.api.upload.UploadRepository; +import org.apache.james.jmap.api.upload.UploadRepositoryContract; +import org.apache.james.mailbox.model.ContentType; +import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.datastax.driver.core.utils.UUIDs; + +import reactor.core.publisher.Mono; + +class CassandraUploadRepositoryTest implements UploadRepositoryContract { + @RegisterExtension + static CassandraClusterExtension cassandra = new CassandraClusterExtension(UploadModule.MODULE); + private CassandraUploadRepository testee; + + @BeforeEach + void setUp() { + testee = new CassandraUploadRepository(new UploadDAO(cassandra.getCassandraCluster().getConf(), + new HashBlobId.Factory(), + new UploadConfiguration(Duration.ofSeconds(5))), + new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.of("default"), new HashBlobId.Factory()), + new BucketNameGenerator(Clock.systemUTC())); + } + + @Override + public UploadId randomUploadId() { + return UploadId.from(UUIDs.timeBased()); + } + + @Override + public UploadRepository testee() { + return testee; + } + + @Test + void uploadShouldExpire() throws Exception { + Username bob = Username.of("bob"); + UploadId id = Mono.from(testee.upload(data(), ContentType.of("text/plain"), bob)).block(); + + Thread.sleep(6000); + + assertThatThrownBy(() -> Mono.from(testee.retrieve(id, bob)).blockOptional()) + .isInstanceOf(UploadNotFoundException.class); + } +} \ No newline at end of file diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala index 6757f90d56d..0eaa000f4ab 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala @@ -42,6 +42,8 @@ trait UploadRepositoryContract { + def randomUploadId(): UploadId = UploadId.from(UUID.randomUUID()) + def testee: UploadRepository def data(): InputStream = IOUtils.toInputStream(DATA_STRING, StandardCharsets.UTF_8) @@ -87,7 +89,7 @@ @Test def retrieveShouldThrowWhenUploadIdIsNotExist(): Unit = { - assertThatThrownBy(() => SMono.fromPublisher(testee.retrieve(UploadId.from(UUID.randomUUID()), USER)).block()) + assertThatThrownBy(() => SMono.fromPublisher(testee.retrieve(randomUploadId(), USER)).block()) .isInstanceOf(classOf[UploadNotFoundException]) }