Skip to content

Commit

Permalink
KAFKA-14247: add handler impl to the prototype (#12775)
Browse files Browse the repository at this point in the history
Minor revision for KAFKA-14247. Added how the handler is called and constructed to the prototype code path.

Reviewers: John Roesler <vvcephei@apache.org>, Kirk True <kirk@mustardgrain.com>
  • Loading branch information
philipnee committed Oct 31, 2022
1 parent c710ecd commit fa10e21
Show file tree
Hide file tree
Showing 4 changed files with 519 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,6 @@ public class DefaultBackgroundThread extends KafkaThread {
private final AtomicReference<Optional<RuntimeException>> exception =
new AtomicReference<>(Optional.empty());

public DefaultBackgroundThread(final ConsumerConfig config,
final LogContext logContext,
final BlockingQueue<ApplicationEvent> applicationEventQueue,
final BlockingQueue<BackgroundEvent> backgroundEventQueue,
final SubscriptionState subscriptions,
final ConsumerMetadata metadata,
final ConsumerNetworkClient networkClient,
final Metrics metrics) {
this(
Time.SYSTEM,
config,
logContext,
applicationEventQueue,
backgroundEventQueue,
subscriptions,
metadata,
networkClient,
metrics
);
}

public DefaultBackgroundThread(final Time time,
final ConsumerConfig config,
final LogContext logContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
* An {@code EventHandler} that uses a single background thread to consume {@code ApplicationEvent} and produce
Expand All @@ -46,6 +47,24 @@ public class DefaultEventHandler implements EventHandler {
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
private final DefaultBackgroundThread backgroundThread;

public DefaultEventHandler(final ConsumerConfig config,
final LogContext logContext,
final SubscriptionState subscriptionState,
final ApiVersions apiVersions,
final Metrics metrics,
final ClusterResourceListeners clusterResourceListeners,
final Sensor fetcherThrottleTimeSensor) {
this(Time.SYSTEM,
config,
logContext,
new LinkedBlockingQueue<>(),
new LinkedBlockingQueue<>(),
subscriptionState,
apiVersions,
metrics,
clusterResourceListeners,
fetcherThrottleTimeSensor);
}

public DefaultEventHandler(final Time time,
final ConsumerConfig config,
Expand Down Expand Up @@ -95,14 +114,13 @@ public DefaultEventHandler(final Time time,
logContext
);
final ConsumerNetworkClient networkClient = new ConsumerNetworkClient(
logContext,
netClient,
metadata,
time,
config.getInt(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)
);
logContext,
netClient,
metadata,
time,
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
this.backgroundThread = new DefaultBackgroundThread(
time,
config,
Expand Down
Loading

0 comments on commit fa10e21

Please sign in to comment.