Skip to content

Commit

Permalink
Merge pull request #448 from apache/master
Browse files Browse the repository at this point in the history
[pull] master from apache:master
  • Loading branch information
pull[bot] committed May 17, 2024
2 parents b23ced4 + a07585e commit 652397f
Show file tree
Hide file tree
Showing 15 changed files with 303 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ public final int hashCode() {
private static final String QUEUE_DELIVERY_LIMIT = "quorum.queues.delivery.limit";
private static final String EVENT_BUS_NOTIFICATION_DURABILITY_ENABLED = "event.bus.notification.durability.enabled";
private static final String EVENT_BUS_PUBLISH_CONFIRM_ENABLED = "event.bus.publish.confirm.enabled";
private static final String EVENT_BUS_PROPAGATE_DISPATCH_ERROR = "event.bus.propagate.dispatch.error";
private static final String TASK_QUEUE_CONSUMER_TIMEOUT = "task.queue.consumer.timeout";
private static final String VHOST = "vhost";

Expand Down Expand Up @@ -411,6 +412,7 @@ public static class Builder {
private Optional<Long> queueTTL;
private Optional<Boolean> eventBusPublishConfirmEnabled;
private Optional<Boolean> eventBusNotificationDurabilityEnabled;
private Optional<Boolean> eventBusPropagateDispatchError;
private Optional<String> vhost;
private Optional<Duration> taskQueueConsumerTimeout;

Expand All @@ -437,6 +439,7 @@ private Builder(URI amqpUri, URI managementUri, ManagementCredentials management
this.eventBusNotificationDurabilityEnabled = Optional.empty();
this.vhost = Optional.empty();
this.taskQueueConsumerTimeout = Optional.empty();
this.eventBusPropagateDispatchError = Optional.empty();
}

public Builder maxRetries(int maxRetries) {
Expand Down Expand Up @@ -524,6 +527,11 @@ public Builder useQuorumQueues(Boolean useQuorumQueues) {
return this;
}

public Builder eventBusPropagateDispatchError(Boolean eventBusPropagateDispatchError) {
this.eventBusPropagateDispatchError = Optional.ofNullable(eventBusPropagateDispatchError);
return this;
}

public Builder useSslManagement(Boolean useSslForManagement) {
this.useSslManagement = Optional.of(useSslForManagement);
return this;
Expand Down Expand Up @@ -580,7 +588,8 @@ public RabbitMQConfiguration build() {
eventBusPublishConfirmEnabled.orElse(true),
eventBusNotificationDurabilityEnabled.orElse(true),
vhost,
taskQueueConsumerTimeout.orElse(DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT));
taskQueueConsumerTimeout.orElse(DEFAULT_TASK_QUEUE_CONSUMER_TIMEOUT),
eventBusPropagateDispatchError.orElse(true));
}

private List<Host> hostsDefaultingToUri() {
Expand Down Expand Up @@ -655,6 +664,7 @@ public static RabbitMQConfiguration from(Configuration configuration) {
.queueTTL(queueTTL)
.eventBusNotificationDurabilityEnabled(configuration.getBoolean(EVENT_BUS_NOTIFICATION_DURABILITY_ENABLED, null))
.eventBusPublishConfirmEnabled(configuration.getBoolean(EVENT_BUS_PUBLISH_CONFIRM_ENABLED, null))
.eventBusPropagateDispatchError(configuration.getBoolean(EVENT_BUS_PROPAGATE_DISPATCH_ERROR, null))
.vhost(vhost)
.taskQueueConsumerTimeout(taskQueueConsumerTimeout)
.build();
Expand Down Expand Up @@ -730,13 +740,14 @@ private static Optional<SSLKeyStore> getSSLKeyStore(Configuration configuration)
private final boolean eventBusNotificationDurabilityEnabled;
private final Optional<String> vhost;
private final Duration taskQueueConsumerTimeout;
private final boolean eventBusPropagateDispatchError;

private RabbitMQConfiguration(URI uri, URI managementUri, ManagementCredentials managementCredentials, int maxRetries, int minDelayInMs,
int connectionTimeoutInMs, int channelRpcTimeoutInMs, int handshakeTimeoutInMs, int shutdownTimeoutInMs,
int networkRecoveryIntervalInMs, Boolean useSsl, Boolean useSslManagement, SSLConfiguration sslConfiguration,
boolean useQuorumQueues, Optional<Integer> quorumQueueDeliveryLimit, int quorumQueueReplicationFactor, List<Host> hosts, Optional<Long> queueTTL,
boolean eventBusPublishConfirmEnabled, boolean eventBusNotificationDurabilityEnabled,
Optional<String> vhost, Duration taskQueueConsumerTimeout) {
Optional<String> vhost, Duration taskQueueConsumerTimeout, boolean eventBusPropagateDispatchError) {
this.uri = uri;
this.managementUri = managementUri;
this.managementCredentials = managementCredentials;
Expand All @@ -759,6 +770,7 @@ private RabbitMQConfiguration(URI uri, URI managementUri, ManagementCredentials
this.eventBusNotificationDurabilityEnabled = eventBusNotificationDurabilityEnabled;
this.vhost = vhost;
this.taskQueueConsumerTimeout = taskQueueConsumerTimeout;
this.eventBusPropagateDispatchError = eventBusPropagateDispatchError;
}

public URI getUri() {
Expand Down Expand Up @@ -871,6 +883,10 @@ public Optional<Integer> getQuorumQueueDeliveryLimit() {
return quorumQueueDeliveryLimit;
}

public boolean eventBusPropagateDispatchError() {
return eventBusPropagateDispatchError;
}

@Override
public final boolean equals(Object o) {
if (o instanceof RabbitMQConfiguration) {
Expand All @@ -897,7 +913,8 @@ public final boolean equals(Object o) {
&& Objects.equals(this.eventBusPublishConfirmEnabled, that.eventBusPublishConfirmEnabled)
&& Objects.equals(this.eventBusNotificationDurabilityEnabled, that.eventBusNotificationDurabilityEnabled)
&& Objects.equals(this.vhost, that.vhost)
&& Objects.equals(this.taskQueueConsumerTimeout, that.taskQueueConsumerTimeout);
&& Objects.equals(this.taskQueueConsumerTimeout, that.taskQueueConsumerTimeout)
&& Objects.equals(this.eventBusPropagateDispatchError, that.eventBusPropagateDispatchError);
}
return false;
}
Expand All @@ -906,6 +923,6 @@ public final boolean equals(Object o) {
public final int hashCode() {
return Objects.hash(uri, managementUri, maxRetries, minDelayInMs, connectionTimeoutInMs, quorumQueueReplicationFactor, quorumQueueDeliveryLimit, useQuorumQueues, hosts,
channelRpcTimeoutInMs, handshakeTimeoutInMs, shutdownTimeoutInMs, networkRecoveryIntervalInMs, managementCredentials, useSsl, useSslManagement,
sslConfiguration, queueTTL, eventBusPublishConfirmEnabled, eventBusNotificationDurabilityEnabled, vhost, taskQueueConsumerTimeout);
sslConfiguration, queueTTL, eventBusPublishConfirmEnabled, eventBusNotificationDurabilityEnabled, vhost, taskQueueConsumerTimeout, eventBusPropagateDispatchError);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,20 @@ public RabbitMQConfiguration getConfiguration() throws URISyntaxException {
.build();
}

public RabbitMQConfiguration.Builder getConfigurationBuilder() throws URISyntaxException {
return RabbitMQConfiguration.builder()
.amqpUri(amqpUri())
.managementUri(managementUri())
.managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
.maxRetries(MAX_THREE_RETRIES)
.minDelayInMs(MIN_DELAY_OF_TEN_MILLISECONDS)
.connectionTimeoutInMs(CONNECTION_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
.channelRpcTimeoutInMs(CHANNEL_RPC_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
.handshakeTimeoutInMs(HANDSHAKE_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
.shutdownTimeoutInMs(SHUTDOWN_TIMEOUT_OF_ONE_HUNDRED_MILLISECOND)
.networkRecoveryIntervalInMs(NETWORK_RECOVERY_INTERVAL_OF_ONE_HUNDRED_MILLISECOND);
}

public RabbitMQConfiguration withQuorumQueueConfiguration() throws URISyntaxException {
return RabbitMQConfiguration.builder()
.amqpUri(amqpUri())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,32 @@ void fromShouldReturnVhostValueWhenGivenAndNotUriOne() {
.isEqualTo(Optional.of("vhosttest"));
}

@Test
void eventBusPropagateDispatchErrorShouldBeTrueByDefault() {
PropertiesConfiguration configuration = new PropertiesConfiguration();
configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
configuration.addProperty("management.user", DEFAULT_USER);
configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);

assertThat(RabbitMQConfiguration.from(configuration).eventBusPropagateDispatchError())
.isTrue();
}

@Test
void eventBusPropagateDispatchErrorShouldBeDisabledWhenConfiguredFalse() {
PropertiesConfiguration configuration = new PropertiesConfiguration();
configuration.addProperty("uri", "amqp://james:james@rabbitmqhost:5672");
configuration.addProperty("management.uri", "http://james:james@rabbitmqhost:15672/api/");
configuration.addProperty("management.user", DEFAULT_USER);
configuration.addProperty("management.password", DEFAULT_PASSWORD_STRING);

configuration.addProperty("event.bus.propagate.dispatch.error", "false");

assertThat(RabbitMQConfiguration.from(configuration).eventBusPropagateDispatchError())
.isFalse();
}

@Nested
class ManagementCredentialsTest {
@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,14 @@ private Mono<Void> remoteGroupsDispatch(byte[] serializedEvent, Event event) {
event.getEventId().getId(),
ex))
.onErrorResume(ex -> deadLetters.store(dispatchingFailureGroup, event)
.then(Mono.error(ex)));
.then(propagateErrorIfNeeded(ex)));
}

private Mono<Void> propagateErrorIfNeeded(Throwable throwable) {
if (configuration.eventBusPropagateDispatchError()) {
return Mono.error(throwable);
}
return Mono.empty();
}

private Mono<Void> remoteKeysDispatch(byte[] serializedEvent, Set<RegistrationKey> keys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@
import static org.apache.james.events.EventBusTestFixture.NO_KEYS;
import static org.apache.james.events.EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION;
import static org.apache.james.events.EventBusTestFixture.newListener;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.verify;

import java.util.NoSuchElementException;

import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.RabbitMQExtension;
import org.apache.james.backends.rabbitmq.RabbitMQFixture;
import org.apache.james.events.EventBusTestFixture.TestEventSerializer;
Expand All @@ -44,25 +47,26 @@ class NetworkErrorTest {
.isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);

private RabbitMQEventBus eventBus;
private EventDeadLetters eventDeadLetters;

@BeforeEach
void setUp() throws Exception {
MemoryEventDeadLetters memoryEventDeadLetters = new MemoryEventDeadLetters();

eventDeadLetters = new MemoryEventDeadLetters();

EventSerializer eventSerializer = new TestEventSerializer();
RoutingKeyConverter routingKeyConverter = RoutingKeyConverter.forFactories(new EventBusTestFixture.TestRegistrationKeyFactory());

eventBus = new RabbitMQEventBus(new NamingStrategy(new EventBusName("test")), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(),
eventSerializer, RETRY_BACKOFF_CONFIGURATION, routingKeyConverter,
memoryEventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(),
eventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(),
EventBusId.random(), rabbitMQExtension.getRabbitMQ().getConfiguration());

eventBus.start();
}

@AfterEach
void tearDown() {
rabbitMQExtension.getRabbitMQ().unpause();
eventBus.stop();
}

Expand All @@ -74,7 +78,7 @@ void dispatchShouldWorkAfterNetworkIssuesForOldRegistration() {
rabbitMQExtension.getRabbitMQ().pause();

assertThatThrownBy(() -> eventBus.dispatch(EVENT, NO_KEYS).block())
.getCause()
.cause()
.isInstanceOf(NoSuchElementException.class)
.hasMessageContaining("Timeout waiting for idle object");

Expand All @@ -85,4 +89,37 @@ void dispatchShouldWorkAfterNetworkIssuesForOldRegistration() {
.untilAsserted(() -> verify(listener).event(EVENT));
}

@Test
void dispatchGroupEventsDuringRabbitMQOutageShouldThrowErrorByDefault() {
eventBus.register(newListener(), GROUP_A);

rabbitMQExtension.getRabbitMQ().pause();

assertThatThrownBy(() -> eventBus.dispatch(EVENT, NO_KEYS).block())
.cause()
.isInstanceOf(NoSuchElementException.class)
.hasMessageContaining("Timeout waiting for idle object");
assertThat(eventDeadLetters.containEvents().block()).isTrue();
}

@Test
void dispatchGroupEventsDuringRabbitMQOutageShouldNotThrowErrorWhenDisablePropagateDispatchError() throws Exception {
RabbitMQConfiguration disablePropagateDispatchError = rabbitMQExtension.getRabbitMQ().getConfigurationBuilder()
.eventBusPropagateDispatchError(false)
.build();
eventBus = new RabbitMQEventBus(new NamingStrategy(new EventBusName("test")), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(),
new TestEventSerializer(), RETRY_BACKOFF_CONFIGURATION, RoutingKeyConverter.forFactories(new EventBusTestFixture.TestRegistrationKeyFactory()),
eventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(),
EventBusId.random(), disablePropagateDispatchError);
eventBus.start();

assertThat(eventDeadLetters.containEvents().block()).isFalse();
eventBus.register(newListener(), GROUP_A);

rabbitMQExtension.getRabbitMQ().pause();

assertThatCode(() -> eventBus.dispatch(EVENT, NO_KEYS).block())
.doesNotThrowAnyException();
assertThat(eventDeadLetters.containEvents().block()).isTrue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* Handler which check if the authenticated user is the same as the one used as MAIL FROM
*/
public abstract class AbstractSenderAuthIdentifyVerificationHook implements MailHook, RcptHook {
private static final HookResult INVALID_AUTH = HookResult.builder()
protected static final HookResult INVALID_AUTH = HookResult.builder()
.hookReturnCode(HookReturnCode.deny())
.smtpReturnCode(SMTPRetCode.BAD_SEQUENCE)
.smtpDescription(DSNStatus.getStatus(DSNStatus.PERMANENT, DSNStatus.SECURITY_AUTH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,13 @@ james.s3.metrics.enabled=false
----
To disable the S3 metrics.

== Reactor Stream Prefetch

Prefetch to use in Reactor to stream convertions (S3 => InputStream). Default to 1.
Higher values will tend to block less often at the price of higher memory consumptions.

Ex in `jvm.properties`
----
# james.reactor.inputstream.prefetch=4
----

Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ A coma separated list of hosts, example: hosts=ip1:5672,ip2:5672
| event.bus.notification.durability.enabled
| Whether or not the queue backing notifications should be durable. Optional boolean, defaults to true.

| event.bus.propagate.dispatch.error
| Whether to propagate errors back to the callers when eventbus fails to dispatch group events to RabbitMQ (then store the failed events in the event dead letters).
Optional boolean, defaults to true.

| vhost
| Optional string. This parameter is only a workaround to support invalid URIs containing character like '_'.
You still need to specify the vhost in the uri parameter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false
# Maximum size of a blob. Larger blobs will be rejected.
# Unit supported: K, M, G, default to no unit
#james.blob.aes.blob.max.size=100M

# Prefetch to use in Reactor to stream convertions (S3 => InputStream). Default to 1.
# Higher values will tend to block less often at the price of higher memory consumptions.
# james.reactor.inputstream.prefetch=4
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class ReactorUtils {
public static final int DEFAULT_BOUNDED_ELASTIC_QUEUESIZE = Optional.ofNullable(System.getProperty("james.schedulers.defaultBoundedElasticQueueSize"))
.map(Integer::parseInt)
.orElse(100000);
public static final int DEFAULT_INPUT_STREAM_PREFETCH = Optional.ofNullable(System.getProperty("james.reactor.inputstream.prefetch"))
.map(Integer::parseInt)
.orElse(1);
private static final int TTL_SECONDS = 60;
private static final boolean DAEMON = true;
public static final Scheduler BLOCKING_CALL_WRAPPER = Schedulers.newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
Expand Down Expand Up @@ -105,7 +108,7 @@ public static <T> BiConsumer<Optional<T>, SynchronousSink<T>> publishIfPresent()
}

public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
return new StreamInputStream(byteArrays.toStream(1));
return new StreamInputStream(byteArrays.toStream(DEFAULT_INPUT_STREAM_PREFETCH));
}

public static Flux<ByteBuffer> toChunks(InputStream inputStream, int bufferSize) {
Expand Down
Loading

0 comments on commit 652397f

Please sign in to comment.