Skip to content

Commit

Permalink
Immediate fallback for MultiDatacentenderMessageProducer (#1861)
Browse files Browse the repository at this point in the history
  • Loading branch information
moscicky committed May 27, 2024
1 parent 7c2e3ff commit 8bfb454
Show file tree
Hide file tree
Showing 15 changed files with 706 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,51 @@ public ExecutorService getExecutorService(String name, int size, boolean monitor
return monitoringEnabled ? monitor(name, executor) : executor;
}

public ScheduledExecutorService getScheduledExecutorService(
String name, int size, boolean monitoringEnabled
) {

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(name + "-scheduled-executor-%d").build();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(size, threadFactory);
return monitoringEnabled ? monitor(name, executor) : executor;
public class ScheduledExecutorServiceBuilder {
final String name;
final int size;
boolean monitoringEnabled = false;
boolean removeOnCancel = false;

public ScheduledExecutorServiceBuilder(String name, int size) {
this.name = name;
this.size = size;
}

public ScheduledExecutorServiceBuilder withMonitoringEnabled(boolean monitoringEnabled) {
this.monitoringEnabled = monitoringEnabled;
return this;
}

public ScheduledExecutorServiceBuilder withRemoveOnCancel(boolean removeOnCancel) {
this.removeOnCancel = removeOnCancel;
return this;
}

public ScheduledExecutorService create() {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(name + "-scheduled-executor-%d").build();
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(size, threadFactory);
executor.setRemoveOnCancelPolicy(removeOnCancel);
return monitoringEnabled ? monitor(name, executor) : executor;
}

private ScheduledExecutorService monitor(String threadPoolName, ScheduledExecutorService executor) {
return metricsFacade.executor().monitor(executor, threadPoolName);
}
}

private ExecutorService monitor(String threadPoolName, ExecutorService executor) {
return metricsFacade.executor().monitor(executor, threadPoolName);
public ScheduledExecutorServiceBuilder scheduledExecutorBuilder(
String name, int size
) {
return new ScheduledExecutorServiceBuilder(name, size);
}

private ScheduledExecutorService monitor(String threadPoolName, ScheduledExecutorService executor) {
private ExecutorService monitor(String threadPoolName, ExecutorService executor) {
return metricsFacade.executor().monitor(executor, threadPoolName);
}


/**
* Copy of {@link java.util.concurrent.Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)}
* with configurable queue capacity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class InstrumentedExecutorServiceFactoryMetricsTest extends Specification {
def "should record metrics for scheduled executor service (monitoring enabled: #monitoringEnabled)"() {
given:
ScheduledExecutorService executor = factory.getScheduledExecutorService("test-scheduled-executor", 10, monitoringEnabled)
ScheduledExecutorService executor = factory.scheduledExecutorBuilder("test-scheduled-executor", 10)
.withMonitoringEnabled(monitoringEnabled).create()
when:
ScheduledFuture<?> task = executor.schedule({ println("scheduled task executed") }, 1, SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,11 @@ public EndpointAddressResolver interpolatingEndpointAddressResolver(UriInterpola
@Bean
public FutureAsyncTimeout futureAsyncTimeoutFactory(InstrumentedExecutorServiceFactory executorFactory,
SenderAsyncTimeoutProperties senderAsyncTimeoutProperties) {
ScheduledExecutorService timeoutExecutorService = executorFactory.getScheduledExecutorService(
"async-timeout",
senderAsyncTimeoutProperties.getThreadPoolSize(),
senderAsyncTimeoutProperties.isThreadPoolMonitoringEnabled()
);
ScheduledExecutorService timeoutExecutorService = executorFactory.scheduledExecutorBuilder(
"async-timeout",
senderAsyncTimeoutProperties.getThreadPoolSize()
).withMonitoringEnabled(senderAsyncTimeoutProperties.isThreadPoolMonitoringEnabled())
.create();
return new FutureAsyncTimeout(timeoutExecutorService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ public class FailFastKafkaProducerProperties {

private FallbackSchedulerProperties fallbackScheduler = new FallbackSchedulerProperties();

private ChaosSchedulerProperties chaosScheduler = new ChaosSchedulerProperties();

public Duration getSpeculativeSendDelay() {
return speculativeSendDelay;
}
Expand Down Expand Up @@ -50,14 +48,6 @@ public void setRemote(KafkaProducerParameters remote) {
this.remote = remote;
}

public ChaosSchedulerProperties getChaosScheduler() {
return chaosScheduler;
}

public void setChaosScheduler(ChaosSchedulerProperties chaosScheduler) {
this.chaosScheduler = chaosScheduler;
}

public static class FallbackSchedulerProperties {

private int threadPoolSize = 16;
Expand All @@ -80,27 +70,4 @@ public void setThreadPoolMonitoringEnabled(boolean threadPoolMonitoringEnabled)
this.threadPoolMonitoringEnabled = threadPoolMonitoringEnabled;
}
}

public static class ChaosSchedulerProperties {

private int threadPoolSize = 16;

private boolean threadPoolMonitoringEnabled = false;

public int getThreadPoolSize() {
return threadPoolSize;
}

public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}

public boolean isThreadPoolMonitoringEnabled() {
return threadPoolMonitoringEnabled;
}

public void setThreadPoolMonitoringEnabled(boolean threadPoolMonitoringEnabled) {
this.threadPoolMonitoringEnabled = threadPoolMonitoringEnabled;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,44 @@

import java.time.Duration;


/**
Kafka producer maintains a single connection to each broker, over which produce request are sent.
When producer request duration exceeds requestTimeout, producer closes the connection to the broker
that the request was sent to. This causes all inflight requests that were sent to that broker to be cancelled.
The number of inflight requests is configured by maxInflightRequestsPerConnection property.
Let's assume that we have requestTimeout set to 500ms, maxInflightRequestsPerConnection set to 5,
and there are following inflight batches in the producer being sent to broker1:
batchId | time spent in send buffer (socket)
------------------------------------
batch1 | 10ms
batch2 | 200ms
batch3 | 300ms
batch4 | 400ms
batch5 | 501ms
Batch5 exceeded the requestTimeout so producer will close the connection to broker1. This causes batch5 to be marked
as failed but also causes batches 1-4 to be retried. This has the following consequences:
1. Batches 1-4 will probably get duplicated - even tough they were cancelled, they were probably sent to the broker,
just haven't been ACKd yet. Retry would cause them to be sent once again resulting in duplicates.
2. On retry, batches 1-4 will have a smaller time budget to complete. Part of their budget was already wasted
in send buffer + retryBackoff will be applied to them. They will have little time to complete on retry which can cause
them to be timed out, potentially resulting in a vicious circle.
3. Connection to the broker must be reestablished which takes time.
To avoid problems described above we actually set requestTimeout and deliveryTimeout to be much higher than the
maximum frontend request duration (frontend.handlers.maxPublishRequestDuration). This means that when
maxPublishRequestDuration is exceeded for a message we received, a client will receive 5xx even tough the
corresponding message is still being processed in the producer. The message will eventually be ACKd by Kafka so upon client-side
retry the message will be duplicated. This however, would likely also happen if the message was promptly timed-out by producer
before maxPublishRequestDuration elapsed - the message was likely already sent to Kafka, there just haven't been a response yet.
So by using large requestTimeout we cause the first slow message to be duplicated (by client-side retry) but:
- we protect other inflight messages from being duplicated,
- we prevent connections from being frequently dropped and reestablished.
*/
public class FailFastLocalKafkaProducerProperties implements KafkaProducerParameters {
private Duration maxBlock = Duration.ofMillis(500);

Expand All @@ -15,9 +53,9 @@ public class FailFastLocalKafkaProducerProperties implements KafkaProducerParame

private Duration retryBackoff = Duration.ofMillis(50);

private Duration requestTimeout = Duration.ofMillis(500);
private Duration requestTimeout = Duration.ofSeconds(30);

private Duration deliveryTimeout = Duration.ofMillis(500);
private Duration deliveryTimeout = Duration.ofSeconds(30);

private int batchSize = 16 * 1024;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

import java.time.Duration;

/**
* See {@link pl.allegro.tech.hermes.frontend.config.FailFastLocalKafkaProducerProperties}
* for the explanation of default values used.
*/
public class FailFastRemoteKafkaProducerProperties implements KafkaProducerParameters {
private Duration maxBlock = Duration.ofMillis(250);

Expand All @@ -15,9 +19,9 @@ public class FailFastRemoteKafkaProducerProperties implements KafkaProducerParam

private Duration retryBackoff = Duration.ofMillis(50);

private Duration requestTimeout = Duration.ofMillis(250);
private Duration requestTimeout = Duration.ofSeconds(30);

private Duration deliveryTimeout = Duration.ofMillis(250);
private Duration deliveryTimeout = Duration.ofSeconds(30);

private int batchSize = 16 * 1024;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import pl.allegro.tech.hermes.common.metric.MetricsFacade;
import pl.allegro.tech.hermes.common.metric.executor.InstrumentedExecutorServiceFactory;
import pl.allegro.tech.hermes.frontend.cache.topic.TopicsCache;
import pl.allegro.tech.hermes.frontend.config.FailFastKafkaProducerProperties.ChaosSchedulerProperties;
import pl.allegro.tech.hermes.frontend.config.FailFastKafkaProducerProperties.FallbackSchedulerProperties;
import pl.allegro.tech.hermes.frontend.producer.BrokerLatencyReporter;
import pl.allegro.tech.hermes.frontend.producer.BrokerMessageProducer;
import pl.allegro.tech.hermes.frontend.producer.kafka.FallbackToRemoteDatacenterAwareMessageProducer;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaChaosProperties;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaHeaderFactory;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageSenders;
import pl.allegro.tech.hermes.frontend.producer.kafka.KafkaMessageSendersFactory;
Expand Down Expand Up @@ -42,6 +42,7 @@
KafkaHeaderNameProperties.class,
KafkaProducerProperties.class,
FailFastKafkaProducerProperties.class,
KafkaChaosProperties.class,
KafkaClustersProperties.class,
HTTPHeadersProperties.class
})
Expand Down Expand Up @@ -69,24 +70,19 @@ public BrokerMessageProducer multiDatacenterBrokerProducer(@Named("failFastKafka
AdminReadinessService adminReadinessService,
InstrumentedExecutorServiceFactory executorServiceFactory) {
FallbackSchedulerProperties fallbackSchedulerProperties = kafkaProducerProperties.getFallbackScheduler();
ScheduledExecutorService fallbackScheduler = executorServiceFactory.getScheduledExecutorService(
ScheduledExecutorService fallbackScheduler = executorServiceFactory.scheduledExecutorBuilder(
"fallback-to-remote",
fallbackSchedulerProperties.getThreadPoolSize(),
fallbackSchedulerProperties.isThreadPoolMonitoringEnabled()
);
ChaosSchedulerProperties chaosSchedulerProperties = kafkaProducerProperties.getChaosScheduler();
ScheduledExecutorService chaosScheduler = executorServiceFactory.getScheduledExecutorService(
"chaos",
chaosSchedulerProperties.getThreadPoolSize(),
chaosSchedulerProperties.isThreadPoolMonitoringEnabled()
);
fallbackSchedulerProperties.getThreadPoolSize()
)
.withMonitoringEnabled(fallbackSchedulerProperties.isThreadPoolMonitoringEnabled())
.withRemoveOnCancel(true)
.create();
return new MultiDatacenterMessageProducer(
kafkaMessageSenders,
adminReadinessService,
messageConverter,
kafkaProducerProperties.getSpeculativeSendDelay(),
fallbackScheduler,
chaosScheduler
fallbackScheduler
);
}

Expand All @@ -111,6 +107,17 @@ public KafkaMessageSenders failFastKafkaMessageSenders(FailFastKafkaProducerProp
"failFast");
}

@Bean
public ScheduledExecutorService chaosScheduler(KafkaChaosProperties chaosProperties, InstrumentedExecutorServiceFactory executorServiceFactory) {
KafkaChaosProperties.ChaosSchedulerProperties chaosSchedulerProperties = chaosProperties.getChaosScheduler();
return executorServiceFactory.scheduledExecutorBuilder(
"chaos",
chaosSchedulerProperties.getThreadPoolSize()
)
.withMonitoringEnabled(chaosSchedulerProperties.isThreadPoolMonitoringEnabled())
.create();
}

@Bean(destroyMethod = "close")
public KafkaMessageSendersFactory kafkaMessageSendersFactory(KafkaClustersProperties kafkaClustersProperties,
KafkaProducerProperties kafkaProducerProperties,
Expand All @@ -119,9 +126,12 @@ public KafkaMessageSendersFactory kafkaMessageSendersFactory(KafkaClustersProper
LocalMessageStorageProperties localMessageStorageProperties,
DatacenterNameProvider datacenterNameProvider,
BrokerLatencyReporter brokerLatencyReporter,
MetricsFacade metricsFacade) {
MetricsFacade metricsFacade,
@Named("chaosScheduler") ScheduledExecutorService chaosScheduler
) {
KafkaProperties kafkaProperties = kafkaClustersProperties.toKafkaProperties(datacenterNameProvider);
List<KafkaParameters> remoteKafkaProperties = kafkaClustersProperties.toRemoteKafkaProperties(datacenterNameProvider);

return new KafkaMessageSendersFactory(
kafkaProperties,
remoteKafkaProperties,
Expand All @@ -133,7 +143,8 @@ public KafkaMessageSendersFactory kafkaMessageSendersFactory(KafkaClustersProper
topicLoadingProperties.getMetadata().getRetryInterval(),
topicLoadingProperties.getMetadata().getThreadPoolSize(),
localMessageStorageProperties.getBufferedSizeBytes(),
kafkaProducerProperties.getMetadataMaxAge()
kafkaProducerProperties.getMetadataMaxAge(),
chaosScheduler
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package pl.allegro.tech.hermes.frontend.producer.kafka;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "frontend.kafka.chaos")
public class KafkaChaosProperties {
private ChaosSchedulerProperties chaosScheduler = new ChaosSchedulerProperties();

public ChaosSchedulerProperties getChaosScheduler() {
return chaosScheduler;
}

public void setChaosScheduler(ChaosSchedulerProperties chaosScheduler) {
this.chaosScheduler = chaosScheduler;
}

public static class ChaosSchedulerProperties {

private int threadPoolSize = 16;

private boolean threadPoolMonitoringEnabled = false;

public int getThreadPoolSize() {
return threadPoolSize;
}

public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}

public boolean isThreadPoolMonitoringEnabled() {
return threadPoolMonitoringEnabled;
}

public void setThreadPoolMonitoringEnabled(boolean threadPoolMonitoringEnabled) {
this.threadPoolMonitoringEnabled = threadPoolMonitoringEnabled;
}
}

}
Loading

0 comments on commit 8bfb454

Please sign in to comment.