Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JAMES-2586 Apply reactor timeout for jOOQ #2185

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

package org.apache.james.backends.postgres;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.Optional;

import org.apache.commons.configuration2.Configuration;
import org.apache.james.util.DurationParser;

import com.google.common.base.Preconditions;

Expand All @@ -44,6 +47,8 @@ public class PostgresConfiguration {
public static final String RLS_ENABLED = "row.level.security.enabled";
public static final String SSL_MODE = "ssl.mode";
public static final String SSL_MODE_DEFAULT_VALUE = "allow";
public static final String JOOQ_REACTIVE_TIMEOUT = "jooq.reactive.timeout";
public static final Duration JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE = Duration.ofSeconds(10);

public static class Credential {
private final String username;
Expand Down Expand Up @@ -75,6 +80,7 @@ public static class Builder {
private Optional<String> nonRLSPassword = Optional.empty();
private Optional<Boolean> rowLevelSecurityEnabled = Optional.empty();
private Optional<String> sslMode = Optional.empty();
private Optional<Duration> jooqReactiveTimeout = Optional.empty();

public Builder databaseName(String databaseName) {
this.databaseName = Optional.of(databaseName);
Expand Down Expand Up @@ -176,6 +182,11 @@ public Builder sslMode(String sslMode) {
return this;
}

public Builder jooqReactiveTimeout(Optional<Duration> jooqReactiveTimeout) {
this.jooqReactiveTimeout = jooqReactiveTimeout;
return this;
}

public PostgresConfiguration build() {
Preconditions.checkArgument(username.isPresent() && !username.get().isBlank(), "You need to specify username");
Preconditions.checkArgument(password.isPresent() && !password.get().isBlank(), "You need to specify password");
Expand All @@ -192,7 +203,8 @@ public PostgresConfiguration build() {
new Credential(username.get(), password.get()),
new Credential(nonRLSUser.orElse(username.get()), nonRLSPassword.orElse(password.get())),
rowLevelSecurityEnabled.orElse(false),
SSLMode.fromValue(sslMode.orElse(SSL_MODE_DEFAULT_VALUE)));
SSLMode.fromValue(sslMode.orElse(SSL_MODE_DEFAULT_VALUE)),
jooqReactiveTimeout.orElse(JOOQ_REACTIVE_TIMEOUT_DEFAULT_VALUE));
}
}

Expand All @@ -212,6 +224,8 @@ public static PostgresConfiguration from(Configuration propertiesConfiguration)
.nonRLSPassword(Optional.ofNullable(propertiesConfiguration.getString(NON_RLS_PASSWORD)))
.rowLevelSecurityEnabled(propertiesConfiguration.getBoolean(RLS_ENABLED, false))
.sslMode(Optional.ofNullable(propertiesConfiguration.getString(SSL_MODE)))
.jooqReactiveTimeout(Optional.ofNullable(propertiesConfiguration.getString(JOOQ_REACTIVE_TIMEOUT))
.map(value -> DurationParser.parse(value, ChronoUnit.SECONDS)))
.build();
}

Expand All @@ -223,10 +237,11 @@ public static PostgresConfiguration from(Configuration propertiesConfiguration)
private final Credential nonRLSCredential;
private final boolean rowLevelSecurityEnabled;
private final SSLMode sslMode;
private final Duration jooqReactiveTimeout;

private PostgresConfiguration(String host, int port, String databaseName, String databaseSchema,
Credential credential, Credential nonRLSCredential, boolean rowLevelSecurityEnabled,
SSLMode sslMode) {
SSLMode sslMode, Duration jooqReactiveTimeout) {
this.host = host;
this.port = port;
this.databaseName = databaseName;
Expand All @@ -235,6 +250,7 @@ private PostgresConfiguration(String host, int port, String databaseName, String
this.nonRLSCredential = nonRLSCredential;
this.rowLevelSecurityEnabled = rowLevelSecurityEnabled;
this.sslMode = sslMode;
this.jooqReactiveTimeout = jooqReactiveTimeout;
}

public String getHost() {
Expand Down Expand Up @@ -269,9 +285,13 @@ public SSLMode getSslMode() {
return sslMode;
}

public Duration getJooqReactiveTimeout() {
return jooqReactiveTimeout;
}

@Override
public final int hashCode() {
return Objects.hash(host, port, databaseName, databaseSchema, credential, nonRLSCredential, rowLevelSecurityEnabled, sslMode);
return Objects.hash(host, port, databaseName, databaseSchema, credential, nonRLSCredential, rowLevelSecurityEnabled, sslMode, jooqReactiveTimeout);
}

@Override
Expand All @@ -286,7 +306,8 @@ public final boolean equals(Object o) {
&& Objects.equals(this.nonRLSCredential, that.nonRLSCredential)
&& Objects.equals(this.databaseName, that.databaseName)
&& Objects.equals(this.databaseSchema, that.databaseSchema)
&& Objects.equals(this.sslMode, that.sslMode);
&& Objects.equals(this.sslMode, that.sslMode)
&& Objects.equals(this.jooqReactiveTimeout, that.jooqReactiveTimeout);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;

import javax.inject.Inject;

import org.apache.james.backends.postgres.PostgresConfiguration;
import org.apache.james.core.Domain;
import org.jooq.DSLContext;
import org.jooq.DeleteResultStep;
Expand All @@ -40,6 +42,8 @@
import org.jooq.conf.StatementType;
import org.jooq.impl.DSL;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;

Expand All @@ -55,18 +59,23 @@ public class PostgresExecutor {
public static final String NON_RLS_INJECT = "non_rls";
public static final int MAX_RETRY_ATTEMPTS = 5;
public static final Duration MIN_BACKOFF = Duration.ofMillis(1);
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresExecutor.class);
private static final String JOOQ_TIMEOUT_ERROR_LOG = "Time out executing Postgres query. May need to check either jOOQ reactive issue or Postgres DB performance.";

public static class Factory {

private final JamesPostgresConnectionFactory jamesPostgresConnectionFactory;
private final PostgresConfiguration postgresConfiguration;

@Inject
public Factory(JamesPostgresConnectionFactory jamesPostgresConnectionFactory) {
public Factory(JamesPostgresConnectionFactory jamesPostgresConnectionFactory,
PostgresConfiguration postgresConfiguration) {
this.jamesPostgresConnectionFactory = jamesPostgresConnectionFactory;
this.postgresConfiguration = postgresConfiguration;
}

public PostgresExecutor create(Optional<Domain> domain) {
return new PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain));
return new PostgresExecutor(jamesPostgresConnectionFactory.getConnection(domain), postgresConfiguration);
}

public PostgresExecutor create() {
Expand All @@ -78,10 +87,14 @@ public PostgresExecutor create() {
private static final Settings SETTINGS = new Settings()
.withRenderFormatted(true)
.withStatementType(StatementType.PREPARED_STATEMENT);

private final Mono<Connection> connection;
private final PostgresConfiguration postgresConfiguration;

private PostgresExecutor(Mono<Connection> connection) {
private PostgresExecutor(Mono<Connection> connection,
PostgresConfiguration postgresConfiguration) {
this.connection = connection;
this.postgresConfiguration = postgresConfiguration;
}

public Mono<DSLContext> dslContext() {
Expand All @@ -91,6 +104,8 @@ public Mono<DSLContext> dslContext() {
public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) {
return dslContext()
.flatMap(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()))
.then();
Expand All @@ -99,13 +114,19 @@ public Mono<Void> executeVoid(Function<DSLContext, Mono<?>> queryFunction) {
public Flux<Record> executeRows(Function<DSLContext, Flux<Record>> queryFunction) {
return dslContext()
.flatMapMany(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.collectList()
.flatMapIterable(list -> list) // Mitigation fix for https://github.com/jOOQ/jOOQ/issues/16556
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()));
}

public Flux<Record> executeDeleteAndReturnList(Function<DSLContext, DeleteResultStep<Record>> queryFunction) {
return dslContext()
.flatMapMany(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.collectList()
.flatMapIterable(list -> list) // The convert Flux -> Mono<List> -> Flux to avoid a hanging issue. See: https://github.com/jOOQ/jOOQ/issues/16055
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
Expand All @@ -115,6 +136,8 @@ public Flux<Record> executeDeleteAndReturnList(Function<DSLContext, DeleteResult
public Mono<Record> executeRow(Function<DSLContext, Publisher<Record>> queryFunction) {
return dslContext()
.flatMap(queryFunction.andThen(Mono::from))
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()));
}
Expand All @@ -128,6 +151,8 @@ public Mono<Optional<Record>> executeSingleRowOptional(Function<DSLContext, Publ
public Mono<Integer> executeCount(Function<DSLContext, Mono<Record1<Integer>>> queryFunction) {
return dslContext()
.flatMap(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()))
.map(Record1::value1);
Expand All @@ -141,6 +166,8 @@ public Mono<Boolean> executeExists(Function<DSLContext, SelectConditionStep<?>>
public Mono<Integer> executeReturnAffectedRowsCount(Function<DSLContext, Mono<Integer>> queryFunction) {
return dslContext()
.flatMap(queryFunction)
.timeout(postgresConfiguration.getJooqReactiveTimeout())
.doOnError(TimeoutException.class, e -> LOGGER.error(JOOQ_TIMEOUT_ERROR_LOG, e))
.retryWhen(Retry.backoff(MAX_RETRY_ATTEMPTS, MIN_BACKOFF)
.filter(preparedStatementConflictException()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ private void initPostgresSession() {
.build());

if (rlsEnabled) {
executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(connectionFactory));
executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(connectionFactory), postgresConfiguration);
} else {
executorFactory = new PostgresExecutor.Factory(new SinglePostgresConnectionFactory(connectionFactory.create()
.cache()
.cast(Connection.class).block()));
.cast(Connection.class).block()),
postgresConfiguration);
}

postgresExecutor = executorFactory.create();
Expand All @@ -153,7 +154,7 @@ private void initPostgresSession() {
.password(postgresConfiguration.getNonRLSCredential().getPassword())
.build())
.flatMap(configuration -> new PostgresqlConnectionFactory(configuration).create().cache())
.map(connection -> new PostgresExecutor.Factory(new SinglePostgresConnectionFactory(connection)).create())
.map(connection -> new PostgresExecutor.Factory(new SinglePostgresConnectionFactory(connection), postgresConfiguration).create())
.block();
} else {
nonRLSPostgresExecutor = postgresExecutor;
Expand Down Expand Up @@ -225,6 +226,10 @@ public PostgresExecutor.Factory getExecutorFactory() {
return executorFactory;
}

public PostgresConfiguration getPostgresConfiguration() {
return postgresConfiguration;
}

private void initTablesAndIndexes() {
postgresTableManager.initializeTables().block();
postgresTableManager.initializeTableIndexes().block();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ private MailboxId generateMailboxId() {
@BeforeEach
public void setUp() {
BlobId.Factory blobIdFactory = new HashBlobId.Factory();
postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory())),
postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
postgresExtension.getPostgresConfiguration()),
new UpdatableTickingClock(Instant.now()),
new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory),
blobIdFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class PostgresMailboxMapperRowLevelSecurityTest {

@BeforeEach
public void setUp() {
PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()));
PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
postgresExtension.getPostgresConfiguration());
mailboxMapperFactory = session -> new PostgresMailboxMapper(new PostgresMailboxDAO(executorFactory.create(session.getUser().getDomainPart())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ private Mailbox generateMailbox() {
@BeforeEach
public void setUp() {
BlobId.Factory blobIdFactory = new HashBlobId.Factory();
postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory())),
postgresMailboxSessionMapperFactory = new PostgresMailboxSessionMapperFactory(new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
postgresExtension.getPostgresConfiguration()),
new UpdatableTickingClock(Instant.now()),
new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory),
blobIdFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public class PostgresSubscriptionMapperRowLevelSecurityTest {

@BeforeEach
public void setUp() {
PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()));
PostgresExecutor.Factory executorFactory = new PostgresExecutor.Factory(new DomainImplPostgresConnectionFactory(postgresExtension.getConnectionFactory()),
postgresExtension.getPostgresConfiguration());
subscriptionMapperFactory = session -> new PostgresSubscriptionMapper(new PostgresSubscriptionDAO(executorFactory.create(session.getUser().getDomainPart())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ row.level.security.enabled=false

# String. Optional, defaults to allow. SSLMode required to connect to the Postgresql db server.
# Check https://www.postgresql.org/docs/current/libpq-ssl.html#LIBPQ-SSL-PROTECTION for a list of supported SSLModes.
ssl.mode=allow
ssl.mode=allow

## Duration. Optional, defaults to 10 second. jOOQ reactive timeout when executing Postgres query. This setting prevent jooq reactive bug from causing hanging issue.
#jooq.reactive.timeout=10second
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ PostgresTableManager postgresTableManager(PostgresExecutor postgresExecutor,
@Provides
@Named(PostgresExecutor.NON_RLS_INJECT)
@Singleton
PostgresExecutor.Factory postgresExecutorFactoryWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT) JamesPostgresConnectionFactory singlePostgresConnectionFactory) {
return new PostgresExecutor.Factory(singlePostgresConnectionFactory);
PostgresExecutor.Factory postgresExecutorFactoryWithRLSBypass(@Named(PostgresExecutor.NON_RLS_INJECT) JamesPostgresConnectionFactory singlePostgresConnectionFactory,
PostgresConfiguration postgresConfiguration) {
return new PostgresExecutor.Factory(singlePostgresConnectionFactory, postgresConfiguration);
}

@Provides
Expand Down