Skip to content

Commit

Permalink
Remove the use of KafkaProducer from the dispatcher and use `Reacti…
Browse files Browse the repository at this point in the history
…veKafkaProducer` instead (knative-extensions#3206)

* Move Reactive Inteefaces into core module

* chenge ReactiveProducerFact to use in dispatche

* remove vertx KafkaProducer from dispatcher

* test module use vertx impl

* spotless formating and codegen

* Use ReceiverMock instead create new in dispatcher

* Remove unnesessury THIRD-PARTY file change

* remove not used constructor
  • Loading branch information
Debasish Biswas authored and Rahul-Kumar-prog committed Jul 31, 2023
1 parent 56641b8 commit 5ea6713
Show file tree
Hide file tree
Showing 44 changed files with 166 additions and 178 deletions.
3 changes: 2 additions & 1 deletion data-plane/THIRD-PARTY.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

Lists of 229 third-party dependencies.
Lists of 230 third-party dependencies.
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Classic Module (ch.qos.logback:logback-classic:1.2.11 - http://logback.qos.ch/logback-classic)
(Eclipse Public License - v 1.0) (GNU Lesser General Public License) Logback Core Module (ch.qos.logback:logback-core:1.2.11 - http://logback.qos.ch/logback-core)
(Apache License 2.0) brotli4j (com.aayushatharva.brotli4j:brotli4j:1.11.0 - https://github.com/hyperxpro/Brotli4j/brotli4j)
Expand Down Expand Up @@ -36,6 +36,7 @@ Lists of 229 third-party dependencies.
(Unknown license) dispatcher (dev.knative.eventing.kafka.broker:dispatcher:1.0-SNAPSHOT - no url defined)
(Unknown license) dispatcher-vertx (dev.knative.eventing.kafka.broker:dispatcher-vertx:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver (dev.knative.eventing.kafka.broker:receiver:1.0-SNAPSHOT - no url defined)
(Unknown license) receiver-vertx (dev.knative.eventing.kafka.broker:receiver-vertx:1.0-SNAPSHOT - no url defined)
(The Apache Software License, Version 2.0) CloudEvents - API (io.cloudevents:cloudevents-api:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-api/)
(The Apache Software License, Version 2.0) CloudEvents - Core (io.cloudevents:cloudevents-core:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-core/)
(The Apache Software License, Version 2.0) CloudEvents - Vert.x Http Binding (io.cloudevents:cloudevents-http-vertx:2.5.0 - https://cloudevents.github.io/sdk-java/cloudevents-http-vertx/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import dev.knative.eventing.kafka.broker.dispatcher.ReactiveKafkaConsumer;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer;
import io.cloudevents.CloudEvent;
import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher;
package dev.knative.eventing.kafka.broker.core;

import io.vertx.core.Vertx;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher;
package dev.knative.eventing.kafka.broker.core;

import io.vertx.core.Future;
import io.vertx.core.Handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.receiver;
package dev.knative.eventing.kafka.broker.core;

import io.vertx.core.Future;
import org.apache.kafka.clients.producer.Producer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,26 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.receiver;
package dev.knative.eventing.kafka.broker.core;

import io.vertx.core.Vertx;
import org.apache.kafka.clients.producer.Producer;
import java.util.Properties;

/**
* Factory for creating ReactiveKafkaProducer
*
* @param <K> the key type
* @param <V> the value type
*/
@FunctionalInterface
public interface ReactiveProducerFactory<K, V> {

/**
* Create a new ReactiveKafkaProducer
*
* @param v the Vertx instance used when creating the vertx KafkaProducer
* @param producer the Kafka producer
* @param v the Vertx instance used when creating the vertx KafkaProducer
* @param config the KafkaProducer configuration
* @return a new ReactiveKafkaProducer
*/
ReactiveKafkaProducer<K, V> create(Vertx v, Producer<K, V> producer);
ReactiveKafkaProducer<K, V> create(Vertx v, Properties config);
}
5 changes: 5 additions & 0 deletions data-plane/dispatcher-vertx/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>dispatcher</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>dev.knative.eventing.kafka.broker</groupId>
<artifactId>receiver-vertx</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package dev.knative.eventing.kafka.broker.dispatchervertx;

import dev.knative.eventing.kafka.broker.receiververtx.VertxProducerFactory;
import java.io.IOException;

public class Main {
public static void main(String[] args) throws IOException {
dev.knative.eventing.kafka.broker.dispatcher.main.Main.start(args, new VertxConsumerFactory<>());
dev.knative.eventing.kafka.broker.dispatcher.main.Main.start(
args, new VertxConsumerFactory<>(), new VertxProducerFactory<>());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package dev.knative.eventing.kafka.broker.dispatchervertx;

import dev.knative.eventing.kafka.broker.dispatcher.ReactiveConsumerFactory;
import dev.knative.eventing.kafka.broker.dispatcher.ReactiveKafkaConsumer;
import dev.knative.eventing.kafka.broker.core.ReactiveConsumerFactory;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer;
import io.vertx.core.Vertx;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.kafka.client.common.KafkaClientOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package dev.knative.eventing.kafka.broker.dispatchervertx;

import dev.knative.eventing.kafka.broker.dispatcher.ReactiveKafkaConsumer;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
Expand Down
8 changes: 8 additions & 0 deletions data-plane/dispatcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>dev.knative.eventing.kafka.broker</groupId>
<artifactId>receiver</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-junit5</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
// import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* This class implements a {@link ResponseHandler} that will convert the sink response into a {@link CloudEvent} and post it to a URL.
*/
public class ResponseToHttpEndpointHandler extends BaseResponseHandler implements ResponseHandler {
public class ResponseToHttpEndpointHandler extends BaseResponseHandler {

private final CloudEventSender cloudEventSender;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,24 @@
import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

import dev.knative.eventing.kafka.broker.core.AsyncCloseable;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler;
import io.cloudevents.CloudEvent;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.LoggerFactory;

/**
* This class implements a {@link ResponseHandler} that will convert the sink response into a {@link CloudEvent} and push it to a Kafka topic.
*/
public final class ResponseToKafkaTopicHandler extends BaseResponseHandler implements ResponseHandler {
public final class ResponseToKafkaTopicHandler extends BaseResponseHandler {

private final String topic;
private final KafkaProducer<String, CloudEvent> producer;
private final ReactiveKafkaProducer<String, CloudEvent> producer;
private final AsyncCloseable producerMeterBinder;

private int inFlightEvents = 0;
Expand All @@ -45,10 +45,10 @@ public final class ResponseToKafkaTopicHandler extends BaseResponseHandler imple
/**
* All args constructor.
*
* @param producer Kafka producer.
* @param producer Reactive Kafka producer.
* @param topic topic to produce records.
*/
public ResponseToKafkaTopicHandler(final KafkaProducer<String, CloudEvent> producer, final String topic) {
public ResponseToKafkaTopicHandler(final ReactiveKafkaProducer<String, CloudEvent> producer, final String topic) {
super(LoggerFactory.getLogger(ResponseToKafkaTopicHandler.class));

Objects.requireNonNull(topic, "provide topic");
Expand All @@ -67,8 +67,7 @@ protected Future<Void> doHandleEvent(CloudEvent event) {

eventReceived();

final Future<Void> f =
producer.send(KafkaProducerRecord.create(topic, event)).mapEmpty();
final Future<Void> f = producer.send(new ProducerRecord<>(topic, event)).mapEmpty();

f.onComplete(v -> eventProduced());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package dev.knative.eventing.kafka.broker.dispatcher.impl.consumer;

import dev.knative.eventing.kafka.broker.core.AsyncCloseable;
import dev.knative.eventing.kafka.broker.dispatcher.ReactiveKafkaConsumer;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher;
import dev.knative.eventing.kafka.broker.dispatcher.main.ConsumerVerticleContext;
import io.cloudevents.CloudEvent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

import dev.knative.eventing.kafka.broker.dispatcher.ReactiveKafkaConsumer;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer;
import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.core.security.Credentials;
import dev.knative.eventing.kafka.broker.core.security.KafkaClientsAuth;
import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender;
import dev.knative.eventing.kafka.broker.dispatcher.DeliveryOrder;
import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.ReactiveKafkaConsumer;
import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler;
import dev.knative.eventing.kafka.broker.dispatcher.impl.NoopResponseHandler;
import dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl;
Expand Down Expand Up @@ -56,10 +57,10 @@
import io.vertx.ext.web.client.WebClientOptions;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.common.tracing.ConsumerTracer;
import io.vertx.kafka.client.producer.KafkaProducer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -240,9 +241,11 @@ private ResponseHandler createResponseHandler(final Vertx vertx) {
return new NoopResponseHandler();
}

final KafkaProducer<String, CloudEvent> producer = this.consumerVerticleContext
.getProducerFactory()
.create(vertx, consumerVerticleContext.getProducerConfigs());
final Properties producerConfigs = new Properties();
producerConfigs.putAll(consumerVerticleContext.getProducerConfigs());

final ReactiveKafkaProducer<String, CloudEvent> producer =
this.consumerVerticleContext.getProducerFactory().create(vertx, producerConfigs);
return new ResponseToKafkaTopicHandler(
producer, consumerVerticleContext.getResource().getTopics(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.ReactiveConsumerFactory;
import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.core.security.AuthProvider;
import dev.knative.eventing.kafka.broker.dispatcher.ReactiveConsumerFactory;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class ConsumerVerticleContext {
private WebClientOptions webClientOptions;

private ReactiveConsumerFactory<Object, CloudEvent> consumerFactory;
private ProducerFactory<String, CloudEvent> producerFactory;
private ReactiveProducerFactory<String, CloudEvent> producerFactory;

private Integer maxPollRecords;
private static final int DEFAULT_MAX_POLL_RECORDS = 50;
Expand Down Expand Up @@ -152,7 +153,8 @@ public ConsumerVerticleContext withConsumerFactory(
return this;
}

public ConsumerVerticleContext withProducerFactory(final ProducerFactory<String, CloudEvent> producerFactory) {
public ConsumerVerticleContext withProducerFactory(
final ReactiveProducerFactory<String, CloudEvent> producerFactory) {
this.producerFactory = producerFactory;
return this;
}
Expand Down Expand Up @@ -226,7 +228,7 @@ public ReactiveConsumerFactory<Object, CloudEvent> getConsumerFactory() {
return this.consumerFactory;
}

public ProducerFactory<String, CloudEvent> getProducerFactory() {
public ReactiveProducerFactory<String, CloudEvent> getProducerFactory() {
return this.producerFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
package dev.knative.eventing.kafka.broker.dispatcher.main;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.ReactiveConsumerFactory;
import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory;
import dev.knative.eventing.kafka.broker.core.security.AuthProvider;
import dev.knative.eventing.kafka.broker.dispatcher.ConsumerVerticleFactory;
import dev.knative.eventing.kafka.broker.dispatcher.ReactiveConsumerFactory;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.ConsumerVerticle;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.ext.web.client.WebClientOptions;
Expand All @@ -37,6 +38,7 @@ public class ConsumerVerticleFactoryImpl implements ConsumerVerticleFactory {
private final AuthProvider authProvider;
private final MeterRegistry metricsRegistry;
private final ReactiveConsumerFactory reactiveConsumerFactory;
private final ReactiveProducerFactory reactiveProducerFactory;

/**
* All args constructor.
Expand All @@ -53,7 +55,8 @@ public ConsumerVerticleFactoryImpl(
final Properties producerConfigs,
final AuthProvider authProvider,
final MeterRegistry metricsRegistry,
final ReactiveConsumerFactory reactiveConsumerFactory) {
final ReactiveConsumerFactory reactiveConsumerFactory,
final ReactiveProducerFactory reactiveProducerFactory) {

Objects.requireNonNull(consumerConfigs, "provide consumerConfigs");
Objects.requireNonNull(webClientOptions, "provide webClientOptions");
Expand All @@ -71,6 +74,7 @@ public ConsumerVerticleFactoryImpl(
this.authProvider = authProvider;
this.metricsRegistry = metricsRegistry;
this.reactiveConsumerFactory = reactiveConsumerFactory;
this.reactiveProducerFactory = reactiveProducerFactory;
}

/**
Expand All @@ -89,7 +93,7 @@ public ConsumerVerticle get(final DataPlaneContract.Resource resource, final Dat
.withMeterRegistry(metricsRegistry)
.withResource(resource, egress)
.withConsumerFactory(reactiveConsumerFactory)
.withProducerFactory(ProducerFactory.defaultFactory()))
.withProducerFactory(reactiveProducerFactory))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

import dev.knative.eventing.kafka.broker.core.ReactiveConsumerFactory;
import dev.knative.eventing.kafka.broker.core.ReactiveProducerFactory;
import dev.knative.eventing.kafka.broker.core.eventbus.ContractMessageCodec;
import dev.knative.eventing.kafka.broker.core.eventbus.ContractPublisher;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
Expand All @@ -26,7 +28,6 @@
import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
import dev.knative.eventing.kafka.broker.core.utils.Shutdown;
import dev.knative.eventing.kafka.broker.dispatcher.ReactiveConsumerFactory;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.CloudEventDeserializer;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.InvalidCloudEventInterceptor;
import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.KeyDeserializer;
Expand Down Expand Up @@ -62,7 +63,10 @@ public class Main {
*
* @param args command line arguments.
*/
public static void start(final String[] args, final ReactiveConsumerFactory reactiveConsumerFactory)
public static void start(
final String[] args,
final ReactiveConsumerFactory reactiveConsumerFactory,
final ReactiveProducerFactory reactiveProducerFactory)
throws IOException {
DispatcherEnv env = new DispatcherEnv(System::getenv);

Expand Down Expand Up @@ -108,7 +112,8 @@ public static void start(final String[] args, final ReactiveConsumerFactory reac
producerConfig,
AuthProvider.kubernetes(),
Metrics.getRegistry(),
reactiveConsumerFactory),
reactiveConsumerFactory,
reactiveProducerFactory),
env.getEgressesInitialCapacity());

// Deploy the consumer deployer
Expand Down
Loading

0 comments on commit 5ea6713

Please sign in to comment.