Skip to content

Commit

Permalink
JAMES-3544 Implement a distributed UploadRepository
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa authored and Arsnael committed Aug 6, 2021
1 parent f5fc8a9 commit a772474
Show file tree
Hide file tree
Showing 7 changed files with 447 additions and 1 deletion.
10 changes: 10 additions & 0 deletions server/data/data-jmap-cassandra/pom.xml
Expand Up @@ -54,6 +54,16 @@
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-mailbox-cassandra</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>blob-memory</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>blob-storage-strategy</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>event-sourcing-event-store-cassandra</artifactId>
Expand Down
@@ -0,0 +1,68 @@
package org.apache.james.jmap.cassandra.upload;

import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;

import java.io.InputStream;

import org.apache.james.blob.api.BlobStore;
import org.apache.james.blob.api.BucketName;
import org.apache.james.core.Username;
import org.apache.james.jmap.api.model.Upload;
import org.apache.james.jmap.api.model.UploadId;
import org.apache.james.jmap.api.model.UploadMetaData;
import org.apache.james.jmap.api.model.UploadNotFoundException;
import org.apache.james.jmap.api.upload.UploadRepository;
import org.apache.james.mailbox.model.ContentType;
import org.reactivestreams.Publisher;

import com.datastax.driver.core.utils.UUIDs;
import com.google.common.io.CountingInputStream;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class CassandraUploadRepository implements UploadRepository {
private final UploadDAO uploadDAO;
private final BlobStore blobStore;
private final BucketNameGenerator bucketNameGenerator;

public CassandraUploadRepository(UploadDAO uploadDAO, BlobStore blobStore, BucketNameGenerator bucketNameGenerator) {
this.uploadDAO = uploadDAO;
this.blobStore = blobStore;
this.bucketNameGenerator = bucketNameGenerator;
}

@Override
public Publisher<UploadId> upload(InputStream data, ContentType contentType, Username user) {
UploadId uploadId = generateId();
UploadBucketName uploadBucketName = bucketNameGenerator.current();
BucketName bucketName = uploadBucketName.asBucketName();

return Mono.fromCallable(() -> new CountingInputStream(data))
.flatMap(countingInputStream -> Mono.from(blobStore.save(bucketName, countingInputStream, LOW_COST))
.map(blobId -> new UploadDAO.UploadRepresentation(uploadId, bucketName, blobId, contentType, countingInputStream.getCount(), user))
.flatMap(upload -> uploadDAO.save(upload).thenReturn(upload.getId())));
}

@Override
public Publisher<Upload> retrieve(UploadId id, Username user) {
return uploadDAO.retrieve(id)
.filter(upload -> upload.getUser().equals(user))
.map(upload -> Upload.from(
UploadMetaData.from(id, upload.getContentType(), upload.getSize(), upload.getBlobId()),
() -> blobStore.read(upload.getBucketName(), upload.getBlobId(), LOW_COST)))
.switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id)));
}

public Mono<Void> purge() {
return Flux.from(blobStore.listBuckets())
.<UploadBucketName>handle((bucketName, sink) -> UploadBucketName.ofBucket(bucketName).ifPresentOrElse(sink::next, sink::complete))
.filter(bucketNameGenerator.evictionPredicate())
.concatMap(bucket -> blobStore.deleteBucket(bucket.asBucketName()))
.then();
}

private UploadId generateId() {
return UploadId.from(UUIDs.timeBased());
}
}
@@ -0,0 +1,61 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.jmap.cassandra.upload;

import java.time.Duration;

import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;

public class UploadConfiguration {
public static UploadConfiguration SINGLETON = new UploadConfiguration(Duration.ofDays(7));

private final Duration uploadTtlDuration;

public UploadConfiguration(Duration uploadTtlDuration) {
this.uploadTtlDuration = uploadTtlDuration;
}

public Duration getUploadTtlDuration() {
return uploadTtlDuration;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof UploadConfiguration) {
UploadConfiguration other = (UploadConfiguration) obj;
return Objects.equal(uploadTtlDuration, other.uploadTtlDuration);
}
return false;
}

@Override
public int hashCode() {
return Objects.hashCode(uploadTtlDuration);
}

@Override
public String toString() {
return MoreObjects
.toStringHelper(this)
.add("uploadTtlDuration", uploadTtlDuration)
.toString();
}
}
@@ -0,0 +1,166 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.jmap.cassandra.upload;

import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
import static org.apache.james.jmap.cassandra.upload.UploadModule.BLOB_ID;
import static org.apache.james.jmap.cassandra.upload.UploadModule.BUCKET_ID;
import static org.apache.james.jmap.cassandra.upload.UploadModule.CONTENT_TYPE;
import static org.apache.james.jmap.cassandra.upload.UploadModule.ID;
import static org.apache.james.jmap.cassandra.upload.UploadModule.SIZE;
import static org.apache.james.jmap.cassandra.upload.UploadModule.TABLE_NAME;
import static org.apache.james.jmap.cassandra.upload.UploadModule.USER;

import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BucketName;
import org.apache.james.core.Username;
import org.apache.james.jmap.api.model.UploadId;
import org.apache.james.mailbox.model.ContentType;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;

import reactor.core.publisher.Mono;

public class UploadDAO {
public static class UploadRepresentation {
private final UploadId id;
private final BucketName bucketName;
private final BlobId blobId;
private final ContentType contentType;
private final long size;
private final Username user;

public UploadRepresentation(UploadId id, BucketName bucketName, BlobId blobId, ContentType contentType, long size, Username user) {
this.user = user;
Preconditions.checkArgument(size >= 0, "Size must be strictly positive");
this.id = id;
this.bucketName = bucketName;
this.blobId = blobId;
this.contentType = contentType;
this.size = size;
}

public UploadId getId() {
return id;
}

public BucketName getBucketName() {
return bucketName;
}

public BlobId getBlobId() {
return blobId;
}

public ContentType getContentType() {
return contentType;
}

public long getSize() {
return size;
}

public Username getUser() {
return user;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof UploadRepresentation) {
UploadRepresentation other = (UploadRepresentation) obj;
return Objects.equal(id, other.id)
&& Objects.equal(bucketName, other.bucketName)
&& Objects.equal(user, other.user)
&& Objects.equal(blobId, other.blobId)
&& Objects.equal(contentType, other.contentType)
&& Objects.equal(size, other.size);
}
return false;
}

@Override
public int hashCode() {
return Objects.hashCode(id, bucketName, blobId, contentType, size, user);
}

@Override
public String toString() {
return MoreObjects
.toStringHelper(this)
.add("id", id)
.add("bucketName", bucketName)
.add("blobId", blobId)
.add("contentType", contentType)
.add("user", user)
.add("size", size)
.toString();
}
}

private final CassandraAsyncExecutor executor;
private final BlobId.Factory blobIdFactory;
private final PreparedStatement insert;
private final PreparedStatement selectOne;

public UploadDAO(Session session, BlobId.Factory blobIdFactory, UploadConfiguration configuration) {
this.executor = new CassandraAsyncExecutor(session);
this.blobIdFactory = blobIdFactory;
this.insert = session.prepare(insertInto(TABLE_NAME)
.value(ID, bindMarker(ID))
.value(BUCKET_ID, bindMarker(BUCKET_ID))
.value(BLOB_ID, bindMarker(BLOB_ID))
.value(SIZE, bindMarker(SIZE))
.value(USER, bindMarker(USER))
.value(CONTENT_TYPE, bindMarker(CONTENT_TYPE))
.using(ttl((int) configuration.getUploadTtlDuration().getSeconds())));
this.selectOne = session.prepare(select().from(TABLE_NAME)
.where(eq(ID, bindMarker(ID))));
}

public Mono<Void> save(UploadRepresentation uploadRepresentation) {
return executor.executeVoid(insert.bind()
.setUUID(ID, uploadRepresentation.getId().getId())
.setString(BUCKET_ID, uploadRepresentation.getBucketName().asString())
.setString(BLOB_ID, uploadRepresentation.getBlobId().asString())
.setLong(SIZE, uploadRepresentation.getSize())
.setString(USER, uploadRepresentation.getUser().asString())
.setString(CONTENT_TYPE, uploadRepresentation.getContentType().asString()));
}

public Mono<UploadRepresentation> retrieve(UploadId id) {
return executor.executeSingleRow(selectOne.bind()
.setUUID(ID, id.getId()))
.map(row -> new UploadRepresentation(id,
BucketName.of(row.getString(BUCKET_ID)),
blobIdFactory.from(row.getString(BLOB_ID)),
ContentType.of(row.getString(CONTENT_TYPE)),
row.getLong(SIZE),
Username.of(row.getString(USER))));
}
}
@@ -0,0 +1,59 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.jmap.cassandra.upload;

import static com.datastax.driver.core.DataType.bigint;
import static com.datastax.driver.core.DataType.text;
import static com.datastax.driver.core.DataType.timeuuid;
import static com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.DAYS;
import static org.apache.james.backends.cassandra.utils.CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION;

import org.apache.james.backends.cassandra.components.CassandraModule;

import com.datastax.driver.core.schemabuilder.SchemaBuilder;

public interface UploadModule {

String TABLE_NAME = "uploads";

String ID = "id";
String CONTENT_TYPE = "content_type";
String SIZE = "size";
String BUCKET_ID = "bucket_id";
String BLOB_ID = "blob_id";
String USER = "user";

CassandraModule MODULE = CassandraModule.table(TABLE_NAME)
.comment("Holds JMAP uploads")
.options(options -> options
.compactionOptions(SchemaBuilder.timeWindowCompactionStrategy()
.compactionWindowSize(7)
.compactionWindowUnit(DAYS))
.caching(SchemaBuilder.KeyCaching.ALL, SchemaBuilder.rows(DEFAULT_CACHED_ROW_PER_PARTITION)))
.statement(statement -> statement
.addPartitionKey(ID, timeuuid())
.addColumn(CONTENT_TYPE, text())
.addColumn(SIZE, bigint())
.addColumn(BUCKET_ID, text())
.addColumn(BLOB_ID, text())
.addColumn(USER, text()))

.build();
}

0 comments on commit a772474

Please sign in to comment.