diff --git a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java index 29e5d904762..959c346f43e 100644 --- a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java +++ b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresConfiguration.java @@ -59,6 +59,8 @@ public class PostgresConfiguration { public static final String SSL_MODE_DEFAULT_VALUE = "allow"; public static final String JOOQ_REACTIVE_TIMEOUT = "jooq.reactive.timeout"; public static final Duration JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE = Duration.ofSeconds(10); + public static final String ATTACHMENT_STORAGE_ENABLED = "attachment.storage.enabled"; + public static final boolean ATTACHMENT_STORAGE_ENABLED_DEFAULT_VALUE = true; public static class Credential { private final String username; @@ -95,6 +97,7 @@ public static class Builder { private Optional byPassRLSPoolMaxSize = Optional.empty(); private Optional sslMode = Optional.empty(); private Optional jooqReactiveTimeout = Optional.empty(); + private Optional attachmentStorageEnabled = Optional.empty(); public Builder databaseName(String databaseName) { this.databaseName = Optional.of(databaseName); @@ -241,6 +244,16 @@ public Builder jooqReactiveTimeout(Optional jooqReactiveTimeout) { return this; } + public Builder attachmentStorageEnabled(Optional attachmentStorageEnabled) { + this.attachmentStorageEnabled = attachmentStorageEnabled; + return this; + } + + public Builder attachmentStorageEnabled(Boolean attachmentStorageEnabled) { + this.attachmentStorageEnabled = Optional.of(attachmentStorageEnabled); + return this; + } + public PostgresConfiguration build() { Preconditions.checkArgument(username.isPresent() && !username.get().isBlank(), "You need to specify username"); Preconditions.checkArgument(password.isPresent() && !password.get().isBlank(), "You need to specify password"); @@ -251,18 +264,19 @@ public PostgresConfiguration build() { } return new PostgresConfiguration(host.orElse(HOST_DEFAULT_VALUE), - port.orElse(PORT_DEFAULT_VALUE), - databaseName.orElse(DATABASE_NAME_DEFAULT_VALUE), - databaseSchema.orElse(DATABASE_SCHEMA_DEFAULT_VALUE), - new Credential(username.get(), password.get()), - new Credential(byPassRLSUser.orElse(username.get()), byPassRLSPassword.orElse(password.get())), + port.orElse(PORT_DEFAULT_VALUE), + databaseName.orElse(DATABASE_NAME_DEFAULT_VALUE), + databaseSchema.orElse(DATABASE_SCHEMA_DEFAULT_VALUE), + new Credential(username.get(), password.get()), + new Credential(byPassRLSUser.orElse(username.get()), byPassRLSPassword.orElse(password.get())), rowLevelSecurityEnabled.filter(rlsEnabled -> rlsEnabled).map(rlsEnabled -> RowLevelSecurity.ENABLED).orElse(RowLevelSecurity.DISABLED), - poolInitialSize.orElse(POOL_INITIAL_SIZE_DEFAULT_VALUE), - poolMaxSize.orElse(POOL_MAX_SIZE_DEFAULT_VALUE), - byPassRLSPoolInitialSize.orElse(BY_PASS_RLS_POOL_INITIAL_SIZE_DEFAULT_VALUE), - byPassRLSPoolMaxSize.orElse(BY_PASS_RLS_POOL_MAX_SIZE_DEFAULT_VALUE), - SSLMode.fromValue(sslMode.orElse(SSL_MODE_DEFAULT_VALUE)), - jooqReactiveTimeout.orElse(JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE)); + poolInitialSize.orElse(POOL_INITIAL_SIZE_DEFAULT_VALUE), + poolMaxSize.orElse(POOL_MAX_SIZE_DEFAULT_VALUE), + byPassRLSPoolInitialSize.orElse(BY_PASS_RLS_POOL_INITIAL_SIZE_DEFAULT_VALUE), + byPassRLSPoolMaxSize.orElse(BY_PASS_RLS_POOL_MAX_SIZE_DEFAULT_VALUE), + SSLMode.fromValue(sslMode.orElse(SSL_MODE_DEFAULT_VALUE)), + jooqReactiveTimeout.orElse(JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE), + attachmentStorageEnabled.orElse(ATTACHMENT_STORAGE_ENABLED_DEFAULT_VALUE)); } } @@ -272,23 +286,24 @@ public static Builder builder() { public static PostgresConfiguration from(Configuration propertiesConfiguration) { return builder() - .databaseName(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_NAME))) - .databaseSchema(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_SCHEMA))) - .host(Optional.ofNullable(propertiesConfiguration.getString(HOST))) - .port(propertiesConfiguration.getInt(PORT, PORT_DEFAULT_VALUE)) - .username(Optional.ofNullable(propertiesConfiguration.getString(USERNAME))) - .password(Optional.ofNullable(propertiesConfiguration.getString(PASSWORD))) - .byPassRLSUser(Optional.ofNullable(propertiesConfiguration.getString(BY_PASS_RLS_USERNAME))) - .byPassRLSPassword(Optional.ofNullable(propertiesConfiguration.getString(BY_PASS_RLS_PASSWORD))) - .rowLevelSecurityEnabled(propertiesConfiguration.getBoolean(RLS_ENABLED, false)) - .poolInitialSize(Optional.ofNullable(propertiesConfiguration.getInteger(POOL_INITIAL_SIZE, null))) - .poolMaxSize(Optional.ofNullable(propertiesConfiguration.getInteger(POOL_MAX_SIZE, null))) - .byPassRLSPoolInitialSize(Optional.ofNullable(propertiesConfiguration.getInteger(BY_PASS_RLS_POOL_INITIAL_SIZE, null))) - .byPassRLSPoolMaxSize(Optional.ofNullable(propertiesConfiguration.getInteger(BY_PASS_RLS_POOL_MAX_SIZE, null))) - .sslMode(Optional.ofNullable(propertiesConfiguration.getString(SSL_MODE))) - .jooqReactiveTimeout(Optional.ofNullable(propertiesConfiguration.getString(JOOQ_REACTIVE_TIMEOUT)) - .map(value -> DurationParser.parse(value, ChronoUnit.SECONDS))) - .build(); + .databaseName(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_NAME))) + .databaseSchema(Optional.ofNullable(propertiesConfiguration.getString(DATABASE_SCHEMA))) + .host(Optional.ofNullable(propertiesConfiguration.getString(HOST))) + .port(propertiesConfiguration.getInt(PORT, PORT_DEFAULT_VALUE)) + .username(Optional.ofNullable(propertiesConfiguration.getString(USERNAME))) + .password(Optional.ofNullable(propertiesConfiguration.getString(PASSWORD))) + .byPassRLSUser(Optional.ofNullable(propertiesConfiguration.getString(BY_PASS_RLS_USERNAME))) + .byPassRLSPassword(Optional.ofNullable(propertiesConfiguration.getString(BY_PASS_RLS_PASSWORD))) + .rowLevelSecurityEnabled(propertiesConfiguration.getBoolean(RLS_ENABLED, false)) + .poolInitialSize(Optional.ofNullable(propertiesConfiguration.getInteger(POOL_INITIAL_SIZE, null))) + .poolMaxSize(Optional.ofNullable(propertiesConfiguration.getInteger(POOL_MAX_SIZE, null))) + .byPassRLSPoolInitialSize(Optional.ofNullable(propertiesConfiguration.getInteger(BY_PASS_RLS_POOL_INITIAL_SIZE, null))) + .byPassRLSPoolMaxSize(Optional.ofNullable(propertiesConfiguration.getInteger(BY_PASS_RLS_POOL_MAX_SIZE, null))) + .sslMode(Optional.ofNullable(propertiesConfiguration.getString(SSL_MODE))) + .jooqReactiveTimeout(Optional.ofNullable(propertiesConfiguration.getString(JOOQ_REACTIVE_TIMEOUT)) + .map(value -> DurationParser.parse(value, ChronoUnit.SECONDS))) + .attachmentStorageEnabled(propertiesConfiguration.getBoolean(ATTACHMENT_STORAGE_ENABLED, ATTACHMENT_STORAGE_ENABLED_DEFAULT_VALUE)) + .build(); } private final String host; @@ -304,12 +319,13 @@ public static PostgresConfiguration from(Configuration propertiesConfiguration) private final Integer byPassRLSPoolMaxSize; private final SSLMode sslMode; private final Duration jooqReactiveTimeout; + private final boolean attachmentStorageEnabled; private PostgresConfiguration(String host, int port, String databaseName, String databaseSchema, Credential defaultCredential, Credential byPassRLSCredential, RowLevelSecurity rowLevelSecurity, Integer poolInitialSize, Integer poolMaxSize, Integer byPassRLSPoolInitialSize, Integer byPassRLSPoolMaxSize, - SSLMode sslMode, Duration jooqReactiveTimeout) { + SSLMode sslMode, Duration jooqReactiveTimeout, boolean attachmentStorageEnabled) { this.host = host; this.port = port; this.databaseName = databaseName; @@ -323,6 +339,7 @@ private PostgresConfiguration(String host, int port, String databaseName, String this.byPassRLSPoolMaxSize = byPassRLSPoolMaxSize; this.sslMode = sslMode; this.jooqReactiveTimeout = jooqReactiveTimeout; + this.attachmentStorageEnabled = attachmentStorageEnabled; } public String getHost() { @@ -377,9 +394,13 @@ public Duration getJooqReactiveTimeout() { return jooqReactiveTimeout; } + public boolean isAttachmentStorageEnabled() { + return attachmentStorageEnabled; + } + @Override public final int hashCode() { - return Objects.hash(host, port, databaseName, databaseSchema, defaultCredential, byPassRLSCredential, rowLevelSecurity, poolInitialSize, poolMaxSize, sslMode, jooqReactiveTimeout); + return Objects.hash(host, port, databaseName, databaseSchema, defaultCredential, byPassRLSCredential, rowLevelSecurity, poolInitialSize, poolMaxSize, sslMode, jooqReactiveTimeout, attachmentStorageEnabled); } @Override @@ -388,16 +409,17 @@ public final boolean equals(Object o) { PostgresConfiguration that = (PostgresConfiguration) o; return Objects.equals(this.rowLevelSecurity, that.rowLevelSecurity) - && Objects.equals(this.host, that.host) - && Objects.equals(this.port, that.port) - && Objects.equals(this.defaultCredential, that.defaultCredential) - && Objects.equals(this.byPassRLSCredential, that.byPassRLSCredential) - && Objects.equals(this.databaseName, that.databaseName) - && Objects.equals(this.databaseSchema, that.databaseSchema) - && Objects.equals(this.poolInitialSize, that.poolInitialSize) - && Objects.equals(this.poolMaxSize, that.poolMaxSize) - && Objects.equals(this.sslMode, that.sslMode) - && Objects.equals(this.jooqReactiveTimeout, that.jooqReactiveTimeout); + && Objects.equals(this.host, that.host) + && Objects.equals(this.port, that.port) + && Objects.equals(this.defaultCredential, that.defaultCredential) + && Objects.equals(this.byPassRLSCredential, that.byPassRLSCredential) + && Objects.equals(this.databaseName, that.databaseName) + && Objects.equals(this.databaseSchema, that.databaseSchema) + && Objects.equals(this.poolInitialSize, that.poolInitialSize) + && Objects.equals(this.poolMaxSize, that.poolMaxSize) + && Objects.equals(this.sslMode, that.sslMode) + && Objects.equals(this.jooqReactiveTimeout, that.jooqReactiveTimeout) + && Objects.equals(this.attachmentStorageEnabled, that.attachmentStorageEnabled); } return false; } diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/DeleteMessageListener.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/DeleteMessageListener.java index 5f31c3ba9b7..3edd0090446 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/DeleteMessageListener.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/DeleteMessageListener.java @@ -25,6 +25,7 @@ import jakarta.inject.Named; import org.apache.commons.io.IOUtils; +import org.apache.james.backends.postgres.PostgresConfiguration; import org.apache.james.blob.api.BlobStore; import org.apache.james.core.Username; import org.apache.james.events.Event; @@ -64,6 +65,7 @@ public static class DeleteMessageListenerGroup extends Group { private final PostgresAttachmentDAO.Factory attachmentDAOFactory; private final PostgresThreadDAO.Factory threadDAOFactory; private final EventBus contentDeletionEventBus; + private final PostgresConfiguration postgresConfiguration; @Inject public DeleteMessageListener(BlobStore blobStore, @@ -71,13 +73,15 @@ public DeleteMessageListener(BlobStore blobStore, PostgresMessageDAO.Factory messageDAOFactory, PostgresAttachmentDAO.Factory attachmentDAOFactory, PostgresThreadDAO.Factory threadDAOFactory, - @Named(CONTENT_DELETION) EventBus contentDeletionEventBus) { + @Named(CONTENT_DELETION) EventBus contentDeletionEventBus, + PostgresConfiguration postgresConfiguration) { this.messageDAOFactory = messageDAOFactory; this.mailboxMessageDAOFactory = mailboxMessageDAOFactory; this.blobStore = blobStore; this.attachmentDAOFactory = attachmentDAOFactory; this.threadDAOFactory = threadDAOFactory; this.contentDeletionEventBus = contentDeletionEventBus; + this.postgresConfiguration = postgresConfiguration; } @Override @@ -141,7 +145,7 @@ private Mono handleMessageDeletion(PostgresMessageDAO postgresMessageDAO, .flatMap(msgId -> postgresMessageDAO.retrieveMessage(messageId) .flatMap(messageRepresentation -> dispatchMessageContentDeletionEvent(mailboxId, owner, messageRepresentation)) .then(deleteBodyBlob(msgId, postgresMessageDAO)) - .then(deleteAttachment(msgId, attachmentDAO)) + .then(deleteAttachmentIfEnabled(msgId, attachmentDAO)) .then(threadDAO.deleteSome(owner, msgId)) .then(postgresMessageDAO.deleteByMessageId(msgId))); } @@ -188,4 +192,11 @@ private Mono deleteAttachmentBlobs(PostgresMessageId messageId, PostgresAt .flatMap(blobId -> Mono.from(blobStore.delete(blobStore.getDefaultBucketName(), blobId)), ReactorUtils.DEFAULT_CONCURRENCY) .then(); } + + private Mono deleteAttachmentIfEnabled(PostgresMessageId messageId, PostgresAttachmentDAO attachmentDAO) { + if (postgresConfiguration.isAttachmentStorageEnabled()) { + return deleteAttachment(messageId, attachmentDAO); + } + return Mono.empty(); + } } diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java index d4743b2f6a7..5daa6d52ad5 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMailboxSessionMapperFactory.java @@ -65,6 +65,8 @@ public class PostgresMailboxSessionMapperFactory extends MailboxSessionMapperFac private final Clock clock; private final RowLevelSecurity rowLevelSecurity; private final AttachmentIdAssignationStrategy attachmentIdAssignationStrategy; + private final boolean attachmentStorageEnabled; + private final PostgresConfiguration postgresConfiguration; @Inject public PostgresMailboxSessionMapperFactory(PostgresExecutor.Factory executorFactory, @@ -78,7 +80,9 @@ public PostgresMailboxSessionMapperFactory(PostgresExecutor.Factory executorFact this.blobIdFactory = blobIdFactory; this.clock = clock; this.rowLevelSecurity = postgresConfiguration.getRowLevelSecurity(); + this.attachmentStorageEnabled = postgresConfiguration.isAttachmentStorageEnabled(); this.attachmentIdAssignationStrategy = attachmentIdAssignationStrategy; + this.postgresConfiguration = postgresConfiguration; } @Override @@ -146,14 +150,17 @@ public PostgresAttachmentMapper getAttachmentMapper(MailboxSession session) { return createAttachmentMapper(session); } + public boolean isAttachmentStorageEnabled() { + return attachmentStorageEnabled; + } + @VisibleForTesting protected DeleteMessageListener deleteMessageListener(EventBus contentDeletionEventBus) { PostgresMessageDAO.Factory postgresMessageDAOFactory = new PostgresMessageDAO.Factory(blobIdFactory, executorFactory); PostgresMailboxMessageDAO.Factory postgresMailboxMessageDAOFactory = new PostgresMailboxMessageDAO.Factory(executorFactory); PostgresAttachmentDAO.Factory attachmentDAOFactory = new PostgresAttachmentDAO.Factory(executorFactory, blobIdFactory); PostgresThreadDAO.Factory threadDAOFactory = new PostgresThreadDAO.Factory(executorFactory); - return new DeleteMessageListener(blobStore, postgresMailboxMessageDAOFactory, postgresMessageDAOFactory, - attachmentDAOFactory, threadDAOFactory, contentDeletionEventBus); + attachmentDAOFactory, threadDAOFactory, contentDeletionEventBus, postgresConfiguration); } } diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageManager.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageManager.java index ad2621b4aaf..9de1bfe3a01 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageManager.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/PostgresMessageManager.java @@ -69,13 +69,24 @@ public PostgresMessageManager(PostgresMailboxSessionMapperFactory mapperFactory, StoreRightManager storeRightManager, ThreadIdGuessingAlgorithm threadIdGuessingAlgorithm, Clock clock, PreDeletionHooks preDeletionHooks) { super(StoreMailboxManager.DEFAULT_NO_MESSAGE_CAPABILITIES, mapperFactory, index, eventBus, locker, mailbox, - quotaManager, quotaRootResolver, batchSizes, storeRightManager, preDeletionHooks, - new MessageStorer.WithAttachment(mapperFactory, messageIdFactory, new MessageFactory.StoreMessageFactory(), mapperFactory, messageParser, threadIdGuessingAlgorithm, clock)); + quotaManager, quotaRootResolver, batchSizes, storeRightManager, preDeletionHooks, + createMessageStorer(mapperFactory, messageIdFactory, messageParser, threadIdGuessingAlgorithm, clock)); this.storeRightManager = storeRightManager; this.mapperFactory = mapperFactory; this.mailbox = mailbox; } + private static MessageStorer createMessageStorer(PostgresMailboxSessionMapperFactory mapperFactory, + MessageId.Factory messageIdFactory, + MessageParser messageParser, + ThreadIdGuessingAlgorithm threadIdGuessingAlgorithm, + Clock clock) { + if (mapperFactory.isAttachmentStorageEnabled()) { + return new MessageStorer.WithAttachment(mapperFactory, messageIdFactory, new MessageFactory.StoreMessageFactory(), mapperFactory, messageParser, threadIdGuessingAlgorithm, clock); + } else { + return new MessageStorer.WithoutAttachment(mapperFactory, messageIdFactory, new MessageFactory.StoreMessageFactory(), threadIdGuessingAlgorithm, clock); + } + } @Override public Flags getPermanentFlags(MailboxSession session) { diff --git a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerAttachmentTest.java b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerAttachmentTest.java index a3dbc27696b..b2190334b44 100644 --- a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerAttachmentTest.java +++ b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/PostgresMailboxManagerAttachmentTest.java @@ -27,6 +27,7 @@ import java.time.Clock; import java.time.Instant; +import org.apache.james.backends.postgres.PostgresConfiguration; import org.apache.james.backends.postgres.PostgresExtension; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BucketName; @@ -82,6 +83,7 @@ public class PostgresMailboxManagerAttachmentTest extends AbstractMailboxManager void beforeAll() throws Exception { BlobId.Factory blobIdFactory = new PlainBlobId.Factory(); DeDuplicationBlobStore blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory); + PostgresConfiguration postgresConfiguration = PostgresConfiguration.builder().username("a").password("a").build(); mapperFactory = new PostgresMailboxSessionMapperFactory(postgresExtension.getExecutorFactory(), Clock.systemUTC(), blobStore, blobIdFactory, postgresExtension.getPostgresConfiguration(), new AttachmentIdAssignationStrategy.Default(new StringBackedAttachmentIdFactory())); @@ -108,7 +110,7 @@ void beforeAll() throws Exception { PostgresThreadDAO.Factory threadDAOFactory = new PostgresThreadDAO.Factory(postgresExtension.getExecutorFactory()); eventBus.register(new DeleteMessageListener(blobStore, postgresMailboxMessageDAOFactory, postgresMessageDAOFactory, - attachmentDAOFactory, threadDAOFactory, eventBus)); + attachmentDAOFactory, threadDAOFactory,eventBus, postgresConfiguration)); mailboxManager = new PostgresMailboxManager(mapperFactory, sessionProvider, messageParser, new PostgresMessageId.Factory(),