Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAMES-3150 BlobReferenceSource and its implementations #601

Merged
merged 5 commits into from Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,38 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.cassandra.mail;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobReferenceSource;

import reactor.core.publisher.Flux;

public class AttachmentBlobReferenceSource implements BlobReferenceSource {
private final CassandraAttachmentDAOV2 attachmentDAOV2;

public AttachmentBlobReferenceSource(CassandraAttachmentDAOV2 attachmentDAOV2) {
this.attachmentDAOV2 = attachmentDAOV2;
}

@Override
public Flux<BlobId> listReferencedBlobs() {
return attachmentDAOV2.listBlobs();
}
}
Expand Up @@ -49,6 +49,7 @@
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.google.common.base.Preconditions;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class CassandraAttachmentDAOV2 {
Expand Down Expand Up @@ -129,6 +130,7 @@ private static DAOAttachment fromRow(Row row, BlobId.Factory blobIfFactory) {
private final PreparedStatement insertStatement;
private final PreparedStatement deleteStatement;
private final PreparedStatement selectStatement;
private final PreparedStatement listBlobs;
private final ConsistencyLevel consistencyLevel;

@Inject
Expand All @@ -141,6 +143,12 @@ public CassandraAttachmentDAOV2(BlobId.Factory blobIdFactory, Session session,
this.selectStatement = prepareSelect(session);
this.insertStatement = prepareInsert(session);
this.deleteStatement = prepareDelete(session);
this.listBlobs = prepareSelectBlobs(session);
}

private PreparedStatement prepareSelectBlobs(Session session) {
return session.prepare(select(BLOB_ID)
.from(TABLE_NAME));
}

private PreparedStatement prepareDelete(Session session) {
Expand Down Expand Up @@ -189,4 +197,9 @@ public Mono<Void> delete(AttachmentId attachmentId) {
deleteStatement.bind()
.setUUID(ID_AS_UUID, attachmentId.asUUID()));
}

public Flux<BlobId> listBlobs() {
return cassandraAsyncExecutor.executeRows(listBlobs.bind())
.map(row -> blobIdFactory.from(row.getString(BLOB_ID)));
}
}
Expand Up @@ -30,12 +30,14 @@
import static org.apache.james.mailbox.cassandra.table.CassandraMessageIds.MESSAGE_ID;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.ATTACHMENTS;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.BODY_CONTENT;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.BODY_CONTENT_LOWERCASE;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.BODY_OCTECTS;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.BODY_START_OCTET;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.BODY_START_OCTET_LOWERCASE;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.FULL_CONTENT_OCTETS;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.FULL_CONTENT_OCTETS_LOWERCASE;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.HEADER_CONTENT;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.HEADER_CONTENT_LOWERCASE;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.INTERNAL_DATE;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.INTERNAL_DATE_LOWERCASE;
import static org.apache.james.mailbox.cassandra.table.CassandraMessageV3Table.Properties.CONTENT_DESCRIPTION;
Expand Down Expand Up @@ -97,6 +99,7 @@
import com.google.common.primitives.Bytes;
import com.google.common.reflect.TypeToken;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

Expand All @@ -116,6 +119,7 @@ public class CassandraMessageDAOV3 {
private final PreparedStatement insert;
private final PreparedStatement delete;
private final PreparedStatement select;
private final PreparedStatement listBlobs;
private final Cid.CidParser cidParser;
private final ConsistencyLevel consistencyLevel;

Expand All @@ -132,6 +136,7 @@ public CassandraMessageDAOV3(Session session, CassandraTypesProvider typesProvid
this.insert = prepareInsert(session);
this.delete = prepareDelete(session);
this.select = prepareSelect(session);
this.listBlobs = prepareSelectBlobs(session);
this.cidParser = Cid.parser().relaxed();
}

Expand All @@ -141,6 +146,11 @@ private PreparedStatement prepareSelect(Session session) {
.where(eq(MESSAGE_ID, bindMarker(MESSAGE_ID))));
}

private PreparedStatement prepareSelectBlobs(Session session) {
return session.prepare(select(HEADER_CONTENT, BODY_CONTENT)
.from(TABLE_NAME));
}

private PreparedStatement prepareInsert(Session session) {
return session.prepare(update(TABLE_NAME)
.with(set(INTERNAL_DATE, bindMarker(INTERNAL_DATE)))
Expand Down Expand Up @@ -408,4 +418,11 @@ private Mono<byte[]> getContent(BlobId blobId, BlobStore.StoragePolicy storagePo
private BlobId retrieveBlobId(String field, Row row) {
return blobIdFactory.from(row.getString(field));
}

Flux<BlobId> listBlobs() {
return cassandraAsyncExecutor.executeRows(listBlobs.bind())
.flatMapIterable(row -> ImmutableList.of(
blobIdFactory.from(row.getString(HEADER_CONTENT_LOWERCASE)),
blobIdFactory.from(row.getString(BODY_CONTENT_LOWERCASE))));
}
}
@@ -0,0 +1,38 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.mailbox.cassandra.mail;

import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobReferenceSource;

import reactor.core.publisher.Flux;

public class MessageBlobReferenceSource implements BlobReferenceSource {
private final CassandraMessageDAOV3 daov3;

public MessageBlobReferenceSource(CassandraMessageDAOV3 daov3) {
this.daov3 = daov3;
}

@Override
public Flux<BlobId> listReferencedBlobs() {
return daov3.listBlobs();
}
}
Expand Up @@ -34,7 +34,7 @@ public interface CassandraMessageV3Table {
String TEXTUAL_LINE_COUNT = "textualLineCount";
String TEXTUAL_LINE_COUNT_LOWERCASE = TEXTUAL_LINE_COUNT.toLowerCase(Locale.US);
String BODY_CONTENT = "bodyContent";
String BODY_CONTENT_LOWERCASE = BODY_START_OCTET.toLowerCase(Locale.US);
String BODY_CONTENT_LOWERCASE = BODY_CONTENT.toLowerCase(Locale.US);
String HEADER_CONTENT = "headerContent";
String HEADER_CONTENT_LOWERCASE = HEADER_CONTENT.toLowerCase(Locale.US);
String ATTACHMENTS = "attachments";
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.blob.api.BlobId;
import org.apache.james.blob.api.BlobReferenceSource;
import org.apache.james.blob.api.HashBlobId;
import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentDAOV2.DAOAttachment;
import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
Expand All @@ -38,19 +39,22 @@

class CassandraAttachmentDAOV2Test {
private static final AttachmentId ATTACHMENT_ID = AttachmentId.from("id1");
private static final AttachmentId ATTACHMENT_ID_2 = AttachmentId.from("id2");
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();

@RegisterExtension
static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraAttachmentModule.MODULE);

private CassandraAttachmentDAOV2 testee;
private AttachmentBlobReferenceSource blobReferenceSource;

@BeforeEach
void setUp(CassandraCluster cassandra) {
testee = new CassandraAttachmentDAOV2(
BLOB_ID_FACTORY,
cassandra.getConf(),
cassandraCluster.getCassandraConsistenciesConfiguration());
blobReferenceSource = new AttachmentBlobReferenceSource(testee);
}

@Test
Expand Down Expand Up @@ -99,4 +103,58 @@ void getAttachmentShouldNotReturnDeletedAttachments() {

assertThat(actual).isEmpty();
}

@Test
void blobReferencesShouldBeEmptyByDefault() {
assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
.isEmpty();
}

@Test
void blobReferencesShouldReturnAllValues() {
AttachmentMetadata attachment1 = AttachmentMetadata.builder()
.attachmentId(ATTACHMENT_ID)
.type("application/json")
.size(36)
.build();
BlobId blobId1 = BLOB_ID_FACTORY.from("blobId");
DAOAttachment daoAttachment1 = CassandraAttachmentDAOV2.from(attachment1, blobId1);
testee.storeAttachment(daoAttachment1).block();

AttachmentMetadata attachment2 = AttachmentMetadata.builder()
.attachmentId(ATTACHMENT_ID_2)
.type("application/json")
.size(36)
.build();
BlobId blobId2 = BLOB_ID_FACTORY.from("blobId");
DAOAttachment daoAttachment2 = CassandraAttachmentDAOV2.from(attachment2, blobId2);
testee.storeAttachment(daoAttachment2).block();

assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
.containsOnly(blobId1, blobId2);
}

@Test
void blobReferencesShouldReturnDuplicates() {
AttachmentMetadata attachment1 = AttachmentMetadata.builder()
.attachmentId(ATTACHMENT_ID)
.type("application/json")
.size(36)
.build();
BlobId blobId = BLOB_ID_FACTORY.from("blobId");
DAOAttachment daoAttachment1 = CassandraAttachmentDAOV2.from(attachment1, blobId);
testee.storeAttachment(daoAttachment1).block();

AttachmentMetadata attachment2 = AttachmentMetadata.builder()
.attachmentId(ATTACHMENT_ID_2)
.type("application/json")
.size(36)
.build();
DAOAttachment daoAttachment2 = CassandraAttachmentDAOV2.from(attachment2, blobId);
testee.storeAttachment(daoAttachment2).block();

assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
.hasSize(2)
.containsOnly(blobId);
}
}
Expand Up @@ -49,6 +49,7 @@
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.model.ThreadId;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.metrics.tests.RecordingMetricFactory;
Expand All @@ -65,6 +66,7 @@ class CassandraMessageDAOV3Test {
private static final int BODY_START = 16;
private static final CassandraId MAILBOX_ID = CassandraId.timeBased();
private static final String CONTENT = "Subject: Test7 \n\nBody7\n.\n";
private static final String CONTENT_2 = "Subject: Test3 \n\nBody23\n.\n";
private static final MessageUid messageUid = MessageUid.of(1);
private static final List<MessageAttachmentMetadata> NO_ATTACHMENT = ImmutableList.of();

Expand All @@ -81,13 +83,16 @@ class CassandraMessageDAOV3Test {

private SimpleMailboxMessage message;
private CassandraMessageId messageId;
private CassandraMessageId messageId2;
private ThreadId threadId;
private ComposedMessageIdWithMetaData messageIdWithMetadata;
private MessageBlobReferenceSource blobReferenceSource;

@BeforeEach
void setUp(CassandraCluster cassandra) {
CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory();
messageId = messageIdFactory.generate();
messageId2 = messageIdFactory.generate();
threadId = ThreadId.fromBaseMessageId(messageId);
BlobStore blobStore = CassandraBlobStoreFactory.forTesting(cassandra.getConf(), new RecordingMetricFactory())
.passthrough();
Expand All @@ -105,6 +110,7 @@ void setUp(CassandraCluster cassandra) {
.modSeq(ModSeq.of(1))
.threadId(threadId)
.build();
blobReferenceSource = new MessageBlobReferenceSource(testee);
}

@Test
Expand Down Expand Up @@ -177,6 +183,23 @@ void saveShouldStoreMessageWithHeaderContent() throws Exception {
.isEqualTo(CONTENT.substring(0, BODY_START));
}

@Test
void blobReferencesShouldBeEmptyByDefault() {
assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
.isEmpty();
}

@Test
void blobReferencesShouldReturnAllBlobs() throws Exception {
message = createMessage(messageId, threadId, CONTENT, BODY_START, new PropertyBuilder(), NO_ATTACHMENT);
MailboxMessage message2 = createMessage(messageId2, threadId, CONTENT_2, BODY_START, new PropertyBuilder(), NO_ATTACHMENT);
testee.save(message).block();
testee.save(message2).block();

assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
.hasSize(4);
}

private SimpleMailboxMessage createMessage(MessageId messageId, ThreadId threadId, String content, int bodyStart, PropertyBuilder propertyBuilder, Collection<MessageAttachmentMetadata> attachments) {
return SimpleMailboxMessage.builder()
.messageId(messageId)
Expand Down
@@ -0,0 +1,27 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.blob.api;

import org.reactivestreams.Publisher;

@FunctionalInterface
public interface BlobReferenceSource {
Publisher<BlobId> listReferencedBlobs();
}