Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAMES-2586 Fix PostgresEmailGetMethodTest, PostgresEmailQueryMethodTest, PostgresMailboxSetMethodTest #2108

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.james.mailbox.postgres.user.PostgresSubscriptionMapper;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.mail.AnnotationMapper;
import org.apache.james.mailbox.store.mail.AttachmentMapper;
import org.apache.james.mailbox.store.mail.AttachmentMapperFactory;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.MessageIdMapper;
Expand Down Expand Up @@ -92,6 +91,7 @@ public MessageIdMapper createMessageIdMapper(MailboxSession session) {
new PostgresMessageDAO(executorFactory.create(session.getUser().getDomainPart()), blobIdFactory),
new PostgresMailboxMessageDAO(executorFactory.create(session.getUser().getDomainPart())),
getModSeqProvider(session),
getAttachmentMapper(session),
blobStore,
blobIdFactory,
clock);
Expand All @@ -118,13 +118,13 @@ public PostgresModSeqProvider getModSeqProvider(MailboxSession session) {
}

@Override
public AttachmentMapper createAttachmentMapper(MailboxSession session) {
public PostgresAttachmentMapper createAttachmentMapper(MailboxSession session) {
PostgresAttachmentDAO postgresAttachmentDAO = new PostgresAttachmentDAO(executorFactory.create(session.getUser().getDomainPart()), blobIdFactory);
return new PostgresAttachmentMapper(postgresAttachmentDAO, blobStore);
}

@Override
public AttachmentMapper getAttachmentMapper(MailboxSession session) {
public PostgresAttachmentMapper getAttachmentMapper(MailboxSession session) {
return createAttachmentMapper(session);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@

import static org.apache.james.mailbox.postgres.mail.PostgresMessageModule.MessageTable.ATTACHMENT_METADATA;

import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.mailbox.model.AttachmentId;
import org.apache.james.mailbox.model.AttachmentMetadata;
import org.apache.james.mailbox.model.MessageAttachmentMetadata;
import org.apache.james.mailbox.postgres.mail.dto.AttachmentsDTO;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
import org.apache.james.util.ReactorUtils;
import org.jooq.Record;

import com.google.common.collect.ImmutableMap;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -45,10 +52,8 @@ public Flux<Pair<SimpleMailboxMessage.Builder, Record>> addAttachmentToMessage(F
return findMessagePublisher.flatMap(pair -> {
if (fetchType == MessageMapper.FetchType.FULL || fetchType == MessageMapper.FetchType.ATTACHMENTS_METADATA) {
return Mono.fromCallable(() -> pair.getRight().get(ATTACHMENT_METADATA))
.flatMapMany(Flux::fromIterable)
.flatMapSequential(attachmentRepresentation -> attachmentMapper.getAttachmentReactive(attachmentRepresentation.getAttachmentId())
.map(attachment -> constructMessageAttachment(attachment, attachmentRepresentation)))
.collectList()
.map(e -> toMap((AttachmentsDTO) e))
.flatMap(this::getAttachments)
.map(messageAttachmentMetadata -> {
pair.getLeft().addAttachments(messageAttachmentMetadata);
return pair;
Expand All @@ -59,6 +64,17 @@ public Flux<Pair<SimpleMailboxMessage.Builder, Record>> addAttachmentToMessage(F
}, ReactorUtils.DEFAULT_CONCURRENCY);
}

private Map<AttachmentId, MessageRepresentation.AttachmentRepresentation> toMap(AttachmentsDTO attachmentRepresentations) {
return attachmentRepresentations.stream().collect(ImmutableMap.toImmutableMap(MessageRepresentation.AttachmentRepresentation::getAttachmentId, obj -> obj));
}

private Mono<List<MessageAttachmentMetadata>> getAttachments(Map<AttachmentId, MessageRepresentation.AttachmentRepresentation> mapAttachmentIdToAttachmentRepresentation) {
return Mono.fromCallable(mapAttachmentIdToAttachmentRepresentation::keySet)
.flatMapMany(attachmentMapper::getAttachmentsReactive)
.map(attachmentMetadata -> constructMessageAttachment(attachmentMetadata, mapAttachmentIdToAttachmentRepresentation.get(attachmentMetadata.getAttachmentId())))
.collectList();
}

private MessageAttachmentMetadata constructMessageAttachment(AttachmentMetadata attachment, MessageRepresentation.AttachmentRepresentation messageAttachmentRepresentation) {
return MessageAttachmentMetadata.builder()
.attachment(attachment)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.james.mailbox.postgres.mail;

import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;

import java.io.InputStream;
import java.util.Collection;
Expand All @@ -39,7 +38,6 @@

import com.github.fge.lambdas.Throwing;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -84,13 +82,15 @@ public Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId)
.switchIfEmpty(Mono.error(() -> new AttachmentNotFoundException(attachmentId.getId())));
}

public Flux<AttachmentMetadata> getAttachmentsReactive(Collection<AttachmentId> attachmentIds) {
Preconditions.checkArgument(attachmentIds != null);
return postgresAttachmentDAO.getAttachments(attachmentIds);
}

@Override
public List<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds) {
Preconditions.checkArgument(attachmentIds != null);
return Flux.fromIterable(attachmentIds)
.flatMap(id -> postgresAttachmentDAO.getAttachment(id)
.map(Pair::getLeft), DEFAULT_CONCURRENCY)
.collect(ImmutableList.toImmutableList())
return getAttachmentsReactive(attachmentIds)
.collectList()
.block();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface PostgresAttachmentModule {
interface PostgresAttachmentTable {

Table<Record> TABLE_NAME = DSL.table("attachment");
Field<UUID> ID = DSL.field("id", SQLDataType.UUID.notNull());
Field<String> ID = DSL.field("id", SQLDataType.VARCHAR.notNull());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:-(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about being clever and reworking AttachmentId?

We could have:

  • An interface with a Factory for AttachmentId
  • Current AttachmentId class be renamed as LegacyAttachmentId and used by Cassandra app, JPA, memory
  • Have a new UuidBackedAttachmentId used in the Postgres app ?

Copy link
Contributor Author

@hungphan227 hungphan227 Mar 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this issue to another ticket linagora#5127 as an enhancement
This PR stay for too long and should be closed soon

Field<String> BLOB_ID = DSL.field("blob_id", SQLDataType.VARCHAR);
Field<String> TYPE = DSL.field("type", SQLDataType.VARCHAR);
Field<UUID> MESSAGE_ID = DSL.field("message_id", SQLDataType.UUID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.io.InputStream;
import java.time.Clock;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.function.Function;
Expand Down Expand Up @@ -102,10 +103,15 @@ public long size() {
private final BlobStore blobStore;
private final BlobId.Factory blobIdFactory;
private final Clock clock;
private final AttachmentLoader attachmentLoader;

public PostgresMessageIdMapper(PostgresMailboxDAO mailboxDAO, PostgresMessageDAO messageDAO,
PostgresMailboxMessageDAO mailboxMessageDAO, PostgresModSeqProvider modSeqProvider,
BlobStore blobStore, BlobId.Factory blobIdFactory,
public PostgresMessageIdMapper(PostgresMailboxDAO mailboxDAO,
PostgresMessageDAO messageDAO,
PostgresMailboxMessageDAO mailboxMessageDAO,
PostgresModSeqProvider modSeqProvider,
PostgresAttachmentMapper attachmentMapper,
BlobStore blobStore,
BlobId.Factory blobIdFactory,
Clock clock) {
this.mailboxDAO = mailboxDAO;
this.messageDAO = messageDAO;
Expand All @@ -114,6 +120,7 @@ public PostgresMessageIdMapper(PostgresMailboxDAO mailboxDAO, PostgresMessageDAO
this.blobStore = blobStore;
this.blobIdFactory = blobIdFactory;
this.clock = clock;
this.attachmentLoader = new AttachmentLoader(attachmentMapper);;
}

@Override
Expand All @@ -130,16 +137,23 @@ public Publisher<ComposedMessageIdWithMetaData> findMetadata(MessageId messageId

@Override
public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, MessageMapper.FetchType fetchType) {
return mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()),
fetchType)
.flatMap(messageBuilderAndRecord -> {
SimpleMailboxMessage.Builder messageBuilder = messageBuilderAndRecord.getLeft();
if (fetchType == MessageMapper.FetchType.FULL) {
Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessageWithoutFullContentPublisher = mailboxMessageDAO.findMessagesByMessageIds(messageIds.stream().map(PostgresMessageId.class::cast).collect(ImmutableList.toImmutableList()), fetchType);
Flux<Pair<SimpleMailboxMessage.Builder, Record>> fetchMessagePublisher = attachmentLoader.addAttachmentToMessage(fetchMessageWithoutFullContentPublisher, fetchType);

if (fetchType == MessageMapper.FetchType.FULL) {
return fetchMessagePublisher
.flatMap(messageBuilderAndRecord -> {
SimpleMailboxMessage.Builder messageBuilder = messageBuilderAndRecord.getLeft();
return retrieveFullContent(messageBuilderAndRecord.getRight())
.map(headerAndBodyContent -> messageBuilder.content(headerAndBodyContent).build());
}
return Mono.just(messageBuilder.build());
}, ReactorUtils.DEFAULT_CONCURRENCY);
}, ReactorUtils.DEFAULT_CONCURRENCY)
.sort(Comparator.comparing(MailboxMessage::getUid))
.map(message -> message);
} else {
return fetchMessagePublisher
.map(messageBuilderAndBlobId -> messageBuilderAndBlobId.getLeft()
.build());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.james.mailbox.postgres.mail.dao;

import java.util.Collection;
import java.util.Optional;

import javax.inject.Inject;
Expand All @@ -33,6 +34,8 @@
import org.apache.james.mailbox.postgres.PostgresMessageId;
import org.apache.james.mailbox.postgres.mail.PostgresAttachmentModule.PostgresAttachmentTable;

import com.google.common.collect.ImmutableList;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -69,7 +72,7 @@ public Mono<Pair<AttachmentMetadata, BlobId>> getAttachment(AttachmentId attachm
PostgresAttachmentTable.MESSAGE_ID,
PostgresAttachmentTable.SIZE)
.from(PostgresAttachmentTable.TABLE_NAME)
.where(PostgresAttachmentTable.ID.eq(attachmentId.asUUID()))))
.where(PostgresAttachmentTable.ID.eq(attachmentId.getId()))))
.map(row -> Pair.of(
AttachmentMetadata.builder()
.attachmentId(attachmentId)
Expand All @@ -80,9 +83,23 @@ public Mono<Pair<AttachmentMetadata, BlobId>> getAttachment(AttachmentId attachm
blobIdFactory.from(row.get(PostgresAttachmentTable.BLOB_ID))));
}

public Flux<AttachmentMetadata> getAttachments(Collection<AttachmentId> attachmentIds) {
if (attachmentIds.isEmpty()) {
return Flux.empty();
}
return postgresExecutor.executeRows(dslContext -> Flux.from(dslContext.selectFrom(PostgresAttachmentTable.TABLE_NAME)
.where(PostgresAttachmentTable.ID.in(attachmentIds.stream().map(AttachmentId::getId).collect(ImmutableList.toImmutableList())))))
.map(row -> AttachmentMetadata.builder()
.attachmentId(AttachmentId.from(row.get(PostgresAttachmentTable.ID)))
.type(row.get(PostgresAttachmentTable.TYPE))
.messageId(PostgresMessageId.Factory.of(row.get(PostgresAttachmentTable.MESSAGE_ID)))
.size(row.get(PostgresAttachmentTable.SIZE))
.build());
}

public Mono<Void> storeAttachment(AttachmentMetadata attachment, BlobId blobId) {
return postgresExecutor.executeVoid(dslContext -> Mono.from(dslContext.insertInto(PostgresAttachmentTable.TABLE_NAME)
.set(PostgresAttachmentTable.ID, attachment.getAttachmentId().asUUID())
.set(PostgresAttachmentTable.ID, attachment.getAttachmentId().getId())
.set(PostgresAttachmentTable.BLOB_ID, blobId.asString())
.set(PostgresAttachmentTable.TYPE, attachment.getType().asString())
.set(PostgresAttachmentTable.MESSAGE_ID, ((PostgresMessageId) attachment.getMessageId()).asUuid())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.james.mailbox.store.StoreMailboxAnnotationManager;
import org.apache.james.mailbox.store.StoreRightManager;
import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
import org.apache.james.mailbox.store.mail.NaiveThreadIdGuessingAlgorithm;
import org.apache.james.mailbox.store.mail.model.impl.MessageParser;
import org.apache.james.mailbox.store.quota.QuotaComponents;
import org.apache.james.mailbox.store.search.MessageSearchIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.james.mailbox.store.StoreMailboxAnnotationManager;
import org.apache.james.mailbox.store.StoreRightManager;
import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
import org.apache.james.mailbox.store.mail.NaiveThreadIdGuessingAlgorithm;
import org.apache.james.mailbox.store.mail.model.impl.MessageParser;
import org.apache.james.mailbox.store.quota.QuotaComponents;
import org.apache.james.mailbox.store.search.MessageSearchIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ void beforeAll() throws Exception {
SessionProviderImpl sessionProvider = new SessionProviderImpl(null, null);
QuotaComponents quotaComponents = QuotaComponents.disabled(sessionProvider, mapperFactory);

MessageIdManager messageIdManager = new StoreMessageIdManager(storeRightManager, mapperFactory
, eventBus, new NoQuotaManager(), mock(QuotaRootResolver.class), PreDeletionHooks.NO_PRE_DELETION_HOOK);
MessageIdManager messageIdManager = new StoreMessageIdManager(storeRightManager, mapperFactory,
eventBus, new NoQuotaManager(), mock(QuotaRootResolver.class), PreDeletionHooks.NO_PRE_DELETION_HOOK);

StoreAttachmentManager storeAttachmentManager = new StoreAttachmentManager(mapperFactory, messageIdManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.james.mailbox.model.MessageId;
import org.apache.james.mailbox.postgres.PostgresMailboxId;
import org.apache.james.mailbox.postgres.PostgresMessageId;
import org.apache.james.mailbox.postgres.mail.dao.PostgresAttachmentDAO;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxMessageDAO;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMessageDAO;
Expand All @@ -46,12 +47,9 @@
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.UidProvider;
import org.apache.james.mailbox.store.mail.model.MapperProvider;
import org.apache.james.mailbox.store.mail.model.MessageUidProvider;
import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
import org.apache.james.utils.UpdatableTickingClock;
import org.testcontainers.utility.ThrowingFunction;

import com.github.fge.lambdas.Throwing;
import com.google.common.collect.ImmutableList;

public class PostgresMapperProvider implements MapperProvider {
Expand Down Expand Up @@ -106,7 +104,10 @@ public MessageIdMapper createMessageIdMapper() {
new PostgresMessageDAO(postgresExtension.getPostgresExecutor(), blobIdFactory),
new PostgresMailboxMessageDAO(postgresExtension.getPostgresExecutor()),
new PostgresModSeqProvider(mailboxDAO),
blobStore, blobIdFactory, updatableTickingClock);
new PostgresAttachmentMapper(new PostgresAttachmentDAO(postgresExtension.getPostgresExecutor(), blobIdFactory), blobStore),
blobStore,
blobIdFactory,
updatableTickingClock);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ void blobReferencesShouldReturnAllBlobs() {
SimpleMailboxMessage message = createMessage(messageId1, ThreadId.fromBaseMessageId(messageId1), CONTENT, BODY_START, new PropertyBuilder());
MessageId messageId2 = PostgresMessageId.Factory.of(UUID.randomUUID());
MailboxMessage message2 = createMessage(messageId2, ThreadId.fromBaseMessageId(messageId2), CONTENT_2, BODY_START, new PropertyBuilder());
postgresMessageDAO.insert(message, "1") .block();
postgresMessageDAO.insert(message2, "2") .block();
postgresMessageDAO.insert(message, "1").block();
postgresMessageDAO.insert(message2, "2").block();

assertThat(blobReferenceSource.listReferencedBlobs().collectList().block())
.hasSize(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class PostgresMessageMapperTest extends MessageMapperTest {
static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity(PostgresMailboxAggregateModule.MODULE);

private PostgresMapperProvider postgresMapperProvider;

@Override
protected MapperProvider createMapperProvider() {
postgresMapperProvider = new PostgresMapperProvider(postgresExtension);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import static org.assertj.core.api.Assertions.assertThat;

import org.apache.james.backends.postgres.PostgresExtension;
import org.apache.james.backends.postgres.utils.PostgresExecutor;
import org.apache.james.backends.postgres.utils.DomainImplPostgresConnectionFactory;
import org.apache.james.backends.postgres.utils.PostgresExecutor;
import org.apache.james.core.Username;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.MailboxSessionUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import org.apache.james.jmap.api.pushsubscription.PushDeleteUserDataTaskStep;
import org.apache.james.jmap.api.upload.UploadRepository;
import org.apache.james.jmap.memory.access.MemoryAccessTokenRepository;
import org.apache.james.jmap.memory.projections.MemoryMessageFastViewProjection;
import org.apache.james.jmap.postgres.filtering.PostgresFilteringProjection;
import org.apache.james.jmap.postgres.identity.PostgresCustomIdentityDAO;
import org.apache.james.jmap.postgres.projections.PostgresEmailQueryView;
import org.apache.james.jmap.postgres.projections.PostgresEmailQueryViewManager;
import org.apache.james.jmap.postgres.projections.PostgresMessageFastViewProjection;
import org.apache.james.jmap.postgres.upload.PostgresUploadRepository;
import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
import org.apache.james.user.api.DeleteUserDataTaskStep;
Expand Down Expand Up @@ -67,8 +67,8 @@ protected void configure() {

bind(DefaultTextExtractor.class).in(Scopes.SINGLETON);

bind(MemoryMessageFastViewProjection.class).in(Scopes.SINGLETON);
bind(MessageFastViewProjection.class).to(MemoryMessageFastViewProjection.class);
bind(PostgresMessageFastViewProjection.class).in(Scopes.SINGLETON);
bind(MessageFastViewProjection.class).to(PostgresMessageFastViewProjection.class);

bind(PostgresEmailQueryView.class).in(Scopes.SINGLETON);
bind(EmailQueryView.class).to(PostgresEmailQueryView.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3769,7 +3769,7 @@ trait EmailGetMethodContract {
| "charset": "UTF-8",
| "size": 8,
| "partId": "1",
| "blobId": "1_1",
| "blobId": "${messageId.serialize}_1",
| "type": "text/plain"
| }
|}""".stripMargin)
Expand Down
Loading