forked from quarkusio/quarkus
-
Notifications
You must be signed in to change notification settings - Fork 1
/
SslKafkaConsumerTest.java
60 lines (51 loc) · 2.46 KB
/
SslKafkaConsumerTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package io.quarkus.it.kafka;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
@QuarkusTest
@QuarkusTestResource(KafkaSSLTestResource.class)
public class SslKafkaConsumerTest {
private static void addSsl(Properties props) {
try {
File sslDir = KafkaSSLTestResource.sslDir(null, false);
File tsFile = new File(sslDir, "kafka-truststore.p12");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tsFile.getPath());
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "Z_pkTh9xgZovK4t34cGB2o6afT4zZg0L");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "PKCS12");
props.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public static Producer<Integer, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19099");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "test-ssl-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
addSsl(props);
return new KafkaProducer<>(props);
}
@Test
public void testReception() {
Producer<Integer, String> consumer = createProducer();
consumer.send(new ProducerRecord<>("test-ssl-consumer", 1, "hi world"));
String string = RestAssured.when().get("/ssl").andReturn().asString();
Assertions.assertEquals("hi world", string);
}
}