Skip to content
Permalink
Browse files

Add Kafka support to reactive apps

  • Loading branch information
cbornet committed Dec 9, 2019
1 parent c1c0dae commit b21d6298f212c155acdc8ec685b2017b30bc9e19
@@ -371,11 +371,11 @@ function askForOptionalItems(meta) {
value: 'websocket:spring-websocket'
});
}
choices.push({
name: 'Asynchronous messages using Apache Kafka',
value: 'messageBroker:kafka'
});
}
choices.push({
name: 'Asynchronous messages using Apache Kafka',
value: 'messageBroker:kafka'
});
choices.push({
name: 'API first development using OpenAPI-generator',
value: 'enableSwaggerCodegen:true'
@@ -371,7 +371,11 @@ dependencies {
implementation "org.springframework.boot:spring-boot-starter-data-<%=databaseType%><% if (reactive) { %>-reactive<% } %>"
<%_ } _%>
<%_ if (messageBroker === 'kafka') { _%>
<%_ if (!reactive) { _%>
implementation "org.apache.kafka:kafka-clients"
<%_ } else { _%>
implementation "io.projectreactor.kafka:reactor-kafka"
<%_ } _%>
<%_ } _%>
implementation "org.springframework.boot:spring-boot-starter-security"
implementation ("org.springframework.boot:spring-boot-starter-web<% if (reactive) { %>flux<% } %>") {
@@ -617,10 +617,17 @@
<scope>test</scope>
</dependency>
<%_ if (messageBroker === 'kafka') { _%>
<%_ if (!reactive) { _%>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
<%_ } else { _%>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
<%_ } _%>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
@@ -20,23 +20,41 @@ package <%= packageName %>.web.rest;

import <%= packageName %>.config.KafkaProperties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
<%_ if (!reactive) { _%>
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
<%_ } _%>
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;
<%_ if (reactive) { _%>
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
<%_ } _%>
<%_ if (!reactive) { _%>
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
<%_ } _%>

<%_ if (!reactive) { _%>
import java.time.Duration;
<%_ } _%>
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
<%_ if (!reactive) { _%>
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
<%_ } _%>

@RestController
@RequestMapping("/api/<%= dasherizedBaseName %>-kafka")
@@ -45,28 +63,45 @@ public class <%= upperFirstCamelCase(baseName) %>KafkaResource {
private final Logger log = LoggerFactory.getLogger(<%= upperFirstCamelCase(baseName) %>KafkaResource.class);

private final KafkaProperties kafkaProperties;
<%_ if (!reactive) { _%>
private KafkaProducer<String, String> producer;
private ExecutorService sseExecutorService = Executors.newCachedThreadPool();
<%_ } else { _%>
private KafkaSender<String, String> sender;
<%_ } _%>

public <%= upperFirstCamelCase(baseName) %>KafkaResource(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
<%_ if (!reactive) { _%>
this.producer = new KafkaProducer<>(kafkaProperties.getProducerProps());
<%_ } else { _%>
this.sender = KafkaSender.create(SenderOptions.create(kafkaProperties.getProducerProps()));
<%_ } _%>
}

@PostMapping("/publish/{topic}")
public PublishResult publish(@PathVariable String topic, @RequestParam String message, @RequestParam(required = false) String key) throws ExecutionException, InterruptedException {
public <% if (!reactive) { %>PublishResult<% } else { %>Mono<PublishResult><% } %> publish(@PathVariable String topic, @RequestParam String message, @RequestParam(required = false) String key)<% if (!reactive) { %> throws ExecutionException, InterruptedException<% } %> {
log.debug("REST request to send to Kafka topic {} with key {} the message : {}", topic, key, message);
<%_ if (!reactive) { _%>
RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, key, message)).get();
return new PublishResult(metadata.topic(), metadata.partition(), metadata.offset(), Instant.ofEpochMilli(metadata.timestamp()));
<%_ } else { _%>
return Mono.just(SenderRecord.create(topic, null, null, key, message, null))
.as(sender::send)
.next()
.map(SenderResult::recordMetadata)
.map(metadata -> new PublishResult(metadata.topic(), metadata.partition(), metadata.offset(), Instant.ofEpochMilli(metadata.timestamp())));
<%_ } _%>
}

@GetMapping("/consume")
public SseEmitter consume(@RequestParam("topic") List<String> topics, @RequestParam Map<String, String> consumerParams) {
public <% if (!reactive) { %>SseEmitter<% } else { %>Flux<String><% } %> consume(@RequestParam("topic") List<String> topics, @RequestParam Map<String, String> consumerParams) {
log.debug("REST request to consume records from Kafka topics {}", topics);
Map<String, Object> consumerProps = kafkaProperties.getConsumerProps();
consumerProps.putAll(consumerParams);
consumerProps.remove("topic");

<%_ if (!reactive) { _%>
SseEmitter emitter = new SseEmitter(Long.MAX_VALUE);
sseExecutorService.execute(() -> {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
@@ -90,6 +125,13 @@ public class <%= upperFirstCamelCase(baseName) %>KafkaResource {
emitter.complete();
});
return emitter;
<%_ } else { _%>
ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create(consumerProps)
.subscription(topics);
return KafkaReceiver.create(receiverOptions)
.receive()
.map(ConsumerRecord::value);
<%_ } _%>
}

private static class PublishResult {
@@ -28,9 +28,13 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.http.MediaType;
<%_ if (!reactive) { _%>
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
<%_ } else { _%>
import org.springframework.test.web.reactive.server.WebTestClient;
<%_ } _%>
import org.testcontainers.containers.KafkaContainer;

import java.time.Duration;
@@ -39,17 +43,23 @@ import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
<%_ if (!reactive) { _%>
import static org.junit.jupiter.api.Assertions.fail;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.*;
<%_ } _%>

class <%= upperFirstCamelCase(baseName) %>KafkaResourceIT {

private static boolean started = false;
private static KafkaContainer kafkaContainer;

<%_ if (!reactive) { _%>
private MockMvc restMockMvc;
<%_ } else { _%>
private WebTestClient client;
<%_ } _%>

@BeforeAll
static void startServer() {
@@ -76,15 +86,25 @@ class <%= upperFirstCamelCase(baseName) %>KafkaResourceIT {

<%= upperFirstCamelCase(baseName) %>KafkaResource kafkaResource = new <%= upperFirstCamelCase(baseName) %>KafkaResource(kafkaProperties);

restMockMvc = MockMvcBuilders.standaloneSetup(kafkaResource)
.build();
<%_ if (!reactive) { _%>
restMockMvc = MockMvcBuilders.standaloneSetup(kafkaResource).build();
<%_ } else { _%>
client = WebTestClient.bindToController(kafkaResource).build();
<%_ } _%>
}

@Test
void producesMessages() throws Exception {
void producesMessages()<% if (!reactive) { %> throws Exception<% } %> {
<%_ if (!reactive) { _%>
restMockMvc.perform(post("/api/<%= dasherizedBaseName %>-kafka/publish/topic-produce?message=value-produce"))
.andExpect(status().isOk())
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8_VALUE));
.andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8));
<%_ } else { _%>
client.post().uri("/api/jhipster-kafka/publish/topic-produce?message=value-produce")
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8);
<%_ } _%>

Map<String, Object> consumerProps = new HashMap<>(getConsumerProps("group-produce"));
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
@@ -97,15 +117,16 @@ class <%= upperFirstCamelCase(baseName) %>KafkaResourceIT {
}

@Test
void consumesMessages() throws Exception {
void consumesMessages()<% if (!reactive) { %> throws Exception<% } %> {
Map<String, Object> producerProps = new HashMap<>(getProducerProps());
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

producer.send(new ProducerRecord<>("topic-consume", "value-consume"));

<%_ if (!reactive) { _%>
MvcResult mvcResult = restMockMvc.perform(get("/api/<%= dasherizedBaseName %>-kafka/consume?topic=topic-consume"))
.andExpect(status().isOk())
.andExpect(content().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM_VALUE))
.andExpect(content().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM))
.andExpect(request().asyncStarted())
.andReturn();

@@ -117,6 +138,17 @@ class <%= upperFirstCamelCase(baseName) %>KafkaResourceIT {
}
}
fail("Expected content data:value-consume not received");
<%_ } else { _%>
String value = client.get().uri("/api/jhipster-kafka/consume?topic=topic-consume")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
.expectHeader().contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
.returnResult(String.class)
.getResponseBody().blockFirst(Duration.ofSeconds(10));

assertThat(value).isEqualTo("value-consume");
<%_ } _%>
}

private Map<String, String> getProducerProps() {

0 comments on commit b21d629

Please sign in to comment.
You can’t perform that action at this time.