Skip to content

Commit

Permalink
Try to make kafka smoke tests non-flaky by using spring reply mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
manuel-alvarez-alvarez committed Feb 28, 2024
1 parent 93c75f4 commit 0b9fec4
Show file tree
Hide file tree
Showing 6 changed files with 662 additions and 231 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

Expand All @@ -40,6 +42,14 @@ public class IastConfiguration {

public static final String JSON_TOPIC = "iast_json";

public static final String REPLY_STRING_TOPIC = "iast_string_reply";

public static final String REPLY_BYTE_ARRAY_TOPIC = "iast_byteArray_reply";

public static final String REPLY_BYTE_BUFFER_TOPIC = "iast_byteBuffer_reply";

public static final String REPLY_JSON_TOPIC = "iast_json_reply";

@Value("${spring.kafka.bootstrap-servers}")
private String boostrapServers;

Expand All @@ -49,83 +59,278 @@ public KafkaAdmin.NewTopics iastTopics() {
newTopic(STRING_TOPIC),
newTopic(BYTE_ARRAY_TOPIC),
newTopic(BYTE_BUFFER_TOPIC),
newTopic(JSON_TOPIC));
newTopic(JSON_TOPIC),
newTopic(REPLY_STRING_TOPIC),
newTopic(REPLY_BYTE_ARRAY_TOPIC),
newTopic(REPLY_BYTE_BUFFER_TOPIC),
newTopic(REPLY_JSON_TOPIC));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> iastStringListenerFactory() {
return listenerFor(StringDeserializer.class, config -> {});
public DefaultKafkaConsumerFactory<String, String> iastStringConsumer() {
return consumerFor(StringDeserializer.class, StringDeserializer.class, config -> {});
}

@Bean
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> iastByteArrayListenerFactory() {
return listenerFor(ByteArrayDeserializer.class, config -> {});
public DefaultKafkaConsumerFactory<byte[], byte[]> iastByteArrayConsumer() {
return consumerFor(ByteArrayDeserializer.class, ByteArrayDeserializer.class, config -> {});
}

@Bean
public ConcurrentKafkaListenerContainerFactory<ByteBuffer, ByteBuffer>
iastByteBufferListenerFactory() {
return listenerFor(ByteBufferDeserializer.class, config -> {});
public DefaultKafkaConsumerFactory<ByteBuffer, ByteBuffer> iastByteBufferConsumer() {
return consumerFor(ByteBufferDeserializer.class, ByteBufferDeserializer.class, config -> {});
}

@Bean
public ConcurrentKafkaListenerContainerFactory<IastMessage, IastMessage>
iastJsonListenerFactory() {
return listenerFor(
JsonDeserializer.class,
public DefaultKafkaConsumerFactory<IastMessage, IastMessage> iastJsonConsumer() {
final Class<? extends Deserializer<IastMessage>> deserializer = jsonDeserializer();
return consumerFor(
deserializer,
deserializer,
config -> config.put(JsonDeserializer.TRUSTED_PACKAGES, "datadog.*"));
}

@Bean
public KafkaTemplate<String, String> iastStringKafkaTemplate() {
return templateFor(StringSerializer.class);
public DefaultKafkaConsumerFactory<String, String> iastReplyStringConsumer() {
return consumerFor(StringDeserializer.class, StringDeserializer.class, config -> {});
}

@Bean
public DefaultKafkaConsumerFactory<byte[], String> iastReplyByteArrayConsumer() {
return consumerFor(ByteArrayDeserializer.class, StringDeserializer.class, config -> {});
}

@Bean
public DefaultKafkaConsumerFactory<ByteBuffer, String> iastReplyByteBufferConsumer() {
return consumerFor(ByteBufferDeserializer.class, StringDeserializer.class, config -> {});
}

@Bean
public DefaultKafkaConsumerFactory<IastMessage, String> iastReplyJsonConsumer() {
return consumerFor(jsonDeserializer(), StringDeserializer.class, config -> {});
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> iastStringListener() {
final ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(iastStringConsumer());
factory.setReplyTemplate(iastReplyStringTemplate());
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> iastByteArrayListener() {
final ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(iastByteArrayConsumer());
factory.setReplyTemplate(iastReplyByteArrayTemplate());
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<ByteBuffer, ByteBuffer> iastByteBufferListener() {
final ConcurrentKafkaListenerContainerFactory<ByteBuffer, ByteBuffer> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(iastByteBufferConsumer());
factory.setReplyTemplate(iastReplyByteBufferTemplate());
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<IastMessage, IastMessage> iastJsonListener() {
final ConcurrentKafkaListenerContainerFactory<IastMessage, IastMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(iastJsonConsumer());
factory.setReplyTemplate(iastReplyJsonTemplate());
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> iastReplyStringListener() {
final ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(iastReplyStringConsumer());
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<byte[], String> iastReplyByteArrayListener() {
final ConcurrentKafkaListenerContainerFactory<byte[], String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(iastReplyByteArrayConsumer());
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<ByteBuffer, String> iastReplyByteBufferListener() {
final ConcurrentKafkaListenerContainerFactory<ByteBuffer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(iastReplyByteBufferConsumer());
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<IastMessage, String> iastReplyJsonListener() {
final ConcurrentKafkaListenerContainerFactory<IastMessage, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(iastReplyJsonConsumer());
return factory;
}

@Bean
public ConcurrentMessageListenerContainer<String, String> iastReplyStringContainer() {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
iastReplyStringListener().createContainer(REPLY_STRING_TOPIC);
repliesContainer.getContainerProperties().setGroupId(GROUP_ID);
repliesContainer.setAutoStartup(false);
return repliesContainer;
}

@Bean
public ConcurrentMessageListenerContainer<byte[], String> iastReplyByteArrayContainer() {
ConcurrentMessageListenerContainer<byte[], String> repliesContainer =
iastReplyByteArrayListener().createContainer(REPLY_BYTE_ARRAY_TOPIC);
repliesContainer.getContainerProperties().setGroupId(GROUP_ID);
repliesContainer.setAutoStartup(false);
return repliesContainer;
}

@Bean
public ConcurrentMessageListenerContainer<ByteBuffer, String> iastReplyByteBufferContainer() {
ConcurrentMessageListenerContainer<ByteBuffer, String> repliesContainer =
iastReplyByteBufferListener().createContainer(REPLY_BYTE_BUFFER_TOPIC);
repliesContainer.getContainerProperties().setGroupId(GROUP_ID);
repliesContainer.setAutoStartup(false);
return repliesContainer;
}

@Bean
public ConcurrentMessageListenerContainer<IastMessage, String> iastReplyJsonContainer() {
ConcurrentMessageListenerContainer<IastMessage, String> repliesContainer =
iastReplyJsonListener().createContainer(REPLY_JSON_TOPIC);
repliesContainer.getContainerProperties().setGroupId(GROUP_ID);
repliesContainer.setAutoStartup(false);
return repliesContainer;
}

@Bean
public DefaultKafkaProducerFactory<String, String> iastStringProducer() {
return producerFor(StringSerializer.class, StringSerializer.class);
}

@Bean
public DefaultKafkaProducerFactory<byte[], byte[]> iastByteArrayProducer() {
return producerFor(ByteArraySerializer.class, ByteArraySerializer.class);
}

@Bean
public DefaultKafkaProducerFactory<ByteBuffer, ByteBuffer> iastByteBufferProducer() {
return producerFor(ByteBufferSerializer.class, ByteBufferSerializer.class);
}

@Bean
public DefaultKafkaProducerFactory<IastMessage, IastMessage> iastJsonProducer() {
final Class<? extends Serializer<IastMessage>> serializer = jsonSerializer();
return producerFor(serializer, serializer);
}

@Bean
public DefaultKafkaProducerFactory<String, String> iastReplyStringProducer() {
return producerFor(StringSerializer.class, StringSerializer.class);
}

@Bean
public DefaultKafkaProducerFactory<byte[], String> iastReplyByteArrayProducer() {
return producerFor(ByteArraySerializer.class, StringSerializer.class);
}

@Bean
public DefaultKafkaProducerFactory<ByteBuffer, String> iastReplyByteBufferProducer() {
return producerFor(ByteBufferSerializer.class, StringSerializer.class);
}

@Bean
public KafkaTemplate<byte[], byte[]> iastByteArrayKafkaTemplate() {
return templateFor(ByteArraySerializer.class);
public DefaultKafkaProducerFactory<IastMessage, String> iastReplyJsonProducer() {
return producerFor(jsonSerializer(), StringSerializer.class);
}

@Bean
public KafkaTemplate<ByteBuffer, ByteBuffer> iastByteBufferKafkaTemplate() {
return templateFor(ByteBufferSerializer.class);
public ReplyingKafkaTemplate<String, String, String> iastStringTemplate() {
return new ReplyingKafkaTemplate<>(iastStringProducer(), iastReplyStringContainer());
}

@Bean
public KafkaTemplate<IastMessage, IastMessage> iastJsonKafkaTemplate() {
return templateFor(JsonSerializer.class);
public ReplyingKafkaTemplate<byte[], byte[], String> iastByteArrayTemplate() {
return new ReplyingKafkaTemplate<>(iastByteArrayProducer(), iastReplyByteArrayContainer());
}

@SuppressWarnings("rawtypes")
private <E> ConcurrentKafkaListenerContainerFactory<E, E> listenerFor(
final Class<? extends Deserializer> deserializer,
@Bean
public ReplyingKafkaTemplate<ByteBuffer, ByteBuffer, String> iastByteBufferTemplate() {
return new ReplyingKafkaTemplate<>(iastByteBufferProducer(), iastReplyByteBufferContainer());
}

@Bean
public ReplyingKafkaTemplate<IastMessage, IastMessage, String> iastJsonTemplate() {
return new ReplyingKafkaTemplate<>(iastJsonProducer(), iastReplyJsonContainer());
}

@Bean
public KafkaTemplate<String, String> iastReplyStringTemplate() {
return new KafkaTemplate<>(iastReplyStringProducer());
}

@Bean
public KafkaTemplate<byte[], String> iastReplyByteArrayTemplate() {
return new KafkaTemplate<>(iastReplyByteArrayProducer());
}

@Bean
public KafkaTemplate<ByteBuffer, String> iastReplyByteBufferTemplate() {
return new KafkaTemplate<>(iastReplyByteBufferProducer());
}

@Bean
public KafkaTemplate<IastMessage, String> iastReplyJsonTemplate() {
return new KafkaTemplate<>(iastReplyJsonProducer());
}

private <K, V> DefaultKafkaProducerFactory<K, V> producerFor(
final Class<? extends Serializer<K>> keySerializer,
final Class<? extends Serializer<V>> valueSerializer) {
final Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
return new DefaultKafkaProducerFactory<>(configProps);
}

private <K, V> DefaultKafkaConsumerFactory<K, V> consumerFor(
final Class<? extends Deserializer<K>> keyDeserializer,
final Class<? extends Deserializer<V>> valueDeserializer,
final Consumer<Map<String, Object>> consumer) {
final Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
consumer.accept(config);
final DefaultKafkaConsumerFactory<E, E> consumerFactory =
new DefaultKafkaConsumerFactory<>(config);
ConcurrentKafkaListenerContainerFactory<E, E> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
return new DefaultKafkaConsumerFactory<>(config);
}

@SuppressWarnings("rawtypes")
private <E> KafkaTemplate<E, E> templateFor(final Class<? extends Serializer> serializer) {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serializer);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
final DefaultKafkaProducerFactory<E, E> factory =
new DefaultKafkaProducerFactory<>(configProps);
return new KafkaTemplate<>(factory);
private NewTopic newTopic(final String name) {
return TopicBuilder.name(name).partitions(1).replicas(1).build();
}

private NewTopic newTopic(final String name) {
return TopicBuilder.name(name).partitions(1).replicas(1).compact().build();
@SuppressWarnings({"rawtypes", "unchecked"})
private Class<? extends Deserializer<IastMessage>> jsonDeserializer() {
final Class<? extends Deserializer> type = JsonDeserializer.class;
return (Class<? extends Deserializer<IastMessage>>) type;
}

@SuppressWarnings({"rawtypes", "unchecked"})
private Class<? extends Serializer<IastMessage>> jsonSerializer() {
final Class<? extends Serializer> type = JsonSerializer.class;
return (Class<? extends Serializer<IastMessage>>) type;
}
}

0 comments on commit 0b9fec4

Please sign in to comment.