Skip to content

Commit

Permalink
Remove ConsumerRebalanceListener from ConsumerVerticleContext (kn…
Browse files Browse the repository at this point in the history
…ative-extensions#3207)

* Implemented

* Test fixed acording

* Change the Tests to set RebalanceListener
  • Loading branch information
Debasish Biswas authored and Rahul-Kumar-prog committed Jul 31, 2023
1 parent 5ea6713 commit 57a2123
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.vertx.core.Vertx;
import java.util.Objects;
import java.util.function.BiFunction;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,6 +41,7 @@ public interface Initializer extends BiFunction<Vertx, ConsumerVerticle, Future<
private final ConsumerVerticleContext consumerVerticleContext;

ReactiveKafkaConsumer<Object, CloudEvent> consumer;
ConsumerRebalanceListener consumerRebalanceListener;
RecordDispatcher recordDispatcher;
private AsyncCloseable closeable;

Expand Down Expand Up @@ -85,6 +87,10 @@ public void setCloser(AsyncCloseable closeable) {
this.closeable = closeable;
}

public void setRebalanceListener(ConsumerRebalanceListener consumerRebalanceListener) {
this.consumerRebalanceListener = consumerRebalanceListener;
}

void exceptionHandler(Throwable cause) {
logger.error("Consumer exception {}", consumerVerticleContext.getLoggingKeyValue(), cause);

Expand All @@ -98,5 +104,9 @@ protected ConsumerVerticleContext getConsumerVerticleContext() {
return consumerVerticleContext;
}

protected ConsumerRebalanceListener getConsumerRebalanceListener() {
return consumerRebalanceListener;
}

public abstract PartitionRevokedHandler getPartitionRevokedHandler();
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public OrderedConsumerVerticle(final ConsumerVerticleContext context, final Init

@Override
void startConsumer(Promise<Void> startPromise) {
Objects.requireNonNull(getConsumerVerticleContext().getConsumerRebalanceListener());
Objects.requireNonNull(getConsumerRebalanceListener());
// We need to sub first, then we can start the polling loop
this.consumer
.subscribe(
Set.copyOf(getConsumerVerticleContext().getResource().getTopicsList()),
getConsumerVerticleContext().getConsumerRebalanceListener())
getConsumerRebalanceListener())
.onFailure(startPromise::fail)
.onSuccess(v -> {
if (this.pollTimer.compareAndSet(-1, 0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public UnorderedConsumerVerticle(final ConsumerVerticleContext context, final In

@Override
void startConsumer(final Promise<Void> startPromise) {
Objects.requireNonNull(getConsumerVerticleContext().getConsumerRebalanceListener());
Objects.requireNonNull(getConsumerRebalanceListener());

this.consumer
.subscribe(
Set.copyOf(getConsumerVerticleContext().getResource().getTopicsList()),
getConsumerVerticleContext().getConsumerRebalanceListener())
getConsumerRebalanceListener())
.onFailure(startPromise::tryFail)
.onSuccess(startPromise::tryComplete)
.onSuccess(v -> poll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private void build(final Vertx vertx, final ConsumerVerticle consumerVerticle, f

final var partitionRevokedHandlers =
List.of(consumerVerticle.getPartitionRevokedHandler(), offsetManager.getPartitionRevokedHandler());
this.consumerVerticleContext.withConsumerRebalanceListener(createRebalanceListener(partitionRevokedHandlers));
consumerVerticle.setRebalanceListener(createRebalanceListener(partitionRevokedHandlers));
}

private ConsumerVerticle createConsumerVerticle(final ConsumerVerticle.Initializer initializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -65,8 +64,6 @@ public class ConsumerVerticleContext {

private Tags tags;

private ConsumerRebalanceListener consumerRebalanceListener;

public ConsumerVerticleContext withConsumerConfigs(final Map<String, Object> consumerConfigs) {
this.consumerConfigs = new HashMap<>(consumerConfigs);
return this;
Expand Down Expand Up @@ -159,16 +156,6 @@ public ConsumerVerticleContext withProducerFactory(
return this;
}

public ConsumerVerticleContext withConsumerRebalanceListener(
final ConsumerRebalanceListener consumerRebalanceListener) {
this.consumerRebalanceListener = consumerRebalanceListener;
return this;
}

public ConsumerRebalanceListener getConsumerRebalanceListener() {
return consumerRebalanceListener;
}

public DataPlaneContract.Resource getResource() {
return resource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.vertx.micrometer.backends.BackendRegistries;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -89,7 +90,13 @@ public void subscribedToTopic(final Vertx vertx, final VertxTestContext context)
consumerVerticle.setConsumer(new MockReactiveKafkaConsumer<>(consumer));
consumerVerticle.setRecordDispatcher(recordDispatcher);
consumerVerticle.setCloser(Future::succeededFuture);
consumerVerticle.setRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {}
});
return Future.succeededFuture();
});

Expand Down Expand Up @@ -130,6 +137,13 @@ public void stop(final Vertx vertx, final VertxTestContext context) {
consumerVerticle.setConsumer(new MockReactiveKafkaConsumer<>(consumer));
consumerVerticle.setRecordDispatcher(recordDispatcher);
consumerVerticle.setCloser(Future::succeededFuture);
consumerVerticle.setRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {}
});

return Future.succeededFuture();
});
Expand Down Expand Up @@ -207,7 +221,13 @@ public void shouldCloseEverything(final Vertx vertx, final VertxTestContext cont
consumerVerticle.setConsumer(consumer);
consumerVerticle.setRecordDispatcher(recordDispatcher);
consumerVerticle.setCloser(Future::succeededFuture);
consumerVerticle.setRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {}
});
return Future.succeededFuture();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -43,6 +44,7 @@
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
Expand Down Expand Up @@ -123,7 +125,13 @@ public void consumeOneByOne(
consumerVerticle.setConsumer(new MockReactiveKafkaConsumer<>(consumer));
consumerVerticle.setRecordDispatcher(recordDispatcher);
consumerVerticle.setCloser(Future::succeededFuture);
consumerVerticle.setRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {}
});
return Future.succeededFuture();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import dev.knative.eventing.kafka.broker.core.testing.CoreObjects;
import io.vertx.ext.web.client.WebClientOptions;
import java.util.HashMap;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;

public class FakeConsumerVerticleContext {

Expand All @@ -30,16 +29,7 @@ public static ConsumerVerticleContext get() {
.withProducerConfigs(new HashMap<>())
.withConsumerConfigs(new HashMap<>())
.withMeterRegistry(Metrics.getRegistry())
.withResource(CoreObjects.resource1(), CoreObjects.egress1())
.withConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(
final java.util.Collection<org.apache.kafka.common.TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(
final java.util.Collection<org.apache.kafka.common.TopicPartition> partitions) {}
});
.withResource(CoreObjects.resource1(), CoreObjects.egress1());
}

public static ConsumerVerticleContext get(
Expand All @@ -50,15 +40,6 @@ public static ConsumerVerticleContext get(
.withAuthProvider(AuthProvider.noAuth())
.withWebClientOptions(new WebClientOptions())
.withMeterRegistry(Metrics.getRegistry())
.withResource(resource, egress)
.withConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(
final java.util.Collection<org.apache.kafka.common.TopicPartition> partitions) {}

@Override
public void onPartitionsAssigned(
final java.util.Collection<org.apache.kafka.common.TopicPartition> partitions) {}
});
.withResource(resource, egress);
}
}

0 comments on commit 57a2123

Please sign in to comment.