From ef6ad3ed1f32754ffd30d37c60fb193c033c4335 Mon Sep 17 00:00:00 2001 From: JJ Date: Thu, 27 Apr 2017 11:40:59 -0500 Subject: [PATCH] METRON-891: Updated kafka service to create a ConsumerFactory per thread to prevent possible concurrency issues. METRON-891: Updated kafka to use spring's ConsumerFactory functionality for Kafka. This will help with potential multi-threading issues. METRON-891: Fixed unit test issues and checkstyle issues. METRON-891: Really updating dependencies... METRON-891: Fixing license after autoformat add p tags around link. METRON-891: fixed license after autoformat. --- dependencies_with_url.csv | 1 + metron-interface/metron-rest/pom.xml | 34 +++- .../metron/rest/config/KafkaConfig.java | 52 +++++- .../rest/controller/KafkaController.java | 104 +++++++----- .../metron/rest/service/KafkaService.java | 45 ++++- .../rest/service/impl/KafkaServiceImpl.java | 155 ++++++++++-------- .../apache/metron/rest/config/TestConfig.java | 32 ++-- .../service/impl/KafkaServiceImplTest.java | 15 +- 8 files changed, 297 insertions(+), 141 deletions(-) diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index 93a19b78f9..375236672a 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -296,3 +296,4 @@ org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.or net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/byteseek org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos org.springframework.security.kerberos:spring-security-kerberos-core:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos +org.springframework.kafka:spring-kafka:jar:1.1.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka diff --git a/metron-interface/metron-rest/pom.xml b/metron-interface/metron-rest/pom.xml index b11e999d9a..1c3ff928d9 100644 --- a/metron-interface/metron-rest/pom.xml +++ b/metron-interface/metron-rest/pom.xml @@ -33,8 +33,36 @@ 1.0.1.RELEASE 2.5.0 5.1.40 + 1.1.1.RELEASE + + org.springframework.kafka + spring-kafka + ${spring-kafka.version} + + + org.apache.kafka + kafka-clients + + + com.fasterxml.jackson.core + jackson-core + + + com.fasterxml.jackson.core + jackson-databind + + + org.springframework.retry + spring-retry + + + org.springframework + spring-messaging + + + org.springframework.boot spring-boot-starter-web @@ -75,7 +103,7 @@ com.google.guava guava - + com.googlecode.json-simple @@ -112,8 +140,8 @@ jackson-databind - org.reflections - reflections + org.reflections + reflections diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java index 309a54969c..247264be64 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/KafkaConfig.java @@ -27,33 +27,56 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.core.env.Environment; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; +/** + * Configuration used for connecting to Kafka. + */ @Configuration @Profile("!" + TEST_PROFILE) public class KafkaConfig { - + /** + * The Spring environment. + */ private Environment environment; + /** + * Construvtor used to inject {@link Environment}. + * @param environment Spring environment to inject. + */ @Autowired - public KafkaConfig(Environment environment) { + public KafkaConfig(final Environment environment) { this.environment = environment; } + /** + * The client used for ZooKeeper. + */ @Autowired private ZkClient zkClient; + /** + * Bean for ZooKeeper + */ @Bean public ZkUtils zkUtils() { return ZkUtils.apply(zkClient, false); } - @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer() { - Properties props = new Properties(); + /** + * Create properties that will be used by {@link this#createConsumerFactory()} + * + * @return Configurations used by {@link this#createConsumerFactory()}. + */ + @Bean + public Map consumerProperties() { + final Map props = new HashMap<>(); props.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY)); props.put("group.id", "metron-rest"); props.put("enable.auto.commit", "false"); @@ -64,9 +87,24 @@ public KafkaConsumer kafkaConsumer() { if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) { props.put("security.protocol", "SASL_PLAINTEXT"); } - return new KafkaConsumer<>(props); + return props; + } + + /** + * Create a {@link ConsumerFactory} which will be used for certain Kafka interactions within config API. + * + * @return a {@link ConsumerFactory} used to create {@link KafkaConsumer} for interactions with Kafka. + */ + @Bean + public ConsumerFactory createConsumerFactory() { + return new DefaultKafkaConsumerFactory<>(consumerProperties()); } + /** + * Create a bean for {@link AdminUtils$}. This is primarily done to make testing a bit easier. + * + * @return {@link AdminUtils$} is written in scala. We return a reference to this class. + */ @Bean public AdminUtils$ adminUtils() { return AdminUtils$.MODULE$; diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java index 0cd4d54539..278750467b 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/KafkaController.java @@ -35,62 +35,78 @@ import java.util.Set; +/** + * The API resource that is use to interact with Kafka. + */ @RestController @RequestMapping("/api/v1/kafka") public class KafkaController { - @Autowired - private KafkaService kafkaService; + /** + * Service used to interact with Kafka. + */ + @Autowired + private KafkaService kafkaService; - @ApiOperation(value = "Creates a new Kafka topic") + @ApiOperation(value = "Creates a new Kafka topic") + @ApiResponses({ @ApiResponse(message = "Returns saved Kafka topic", code = 200) - @RequestMapping(value = "/topic", method = RequestMethod.POST) - ResponseEntity save(@ApiParam(name="topic", value="Kafka topic", required=true)@RequestBody KafkaTopic topic) throws RestException { - return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED); - } + }) + @RequestMapping(value = "/topic", method = RequestMethod.POST) + ResponseEntity save(final @ApiParam(name = "topic", value = "Kafka topic", required = true) @RequestBody KafkaTopic topic) throws RestException { + return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED); + } - @ApiOperation(value = "Retrieves a Kafka topic") - @ApiResponses(value = { @ApiResponse(message = "Returns Kafka topic", code = 200), - @ApiResponse(message = "Kafka topic is missing", code = 404) }) - @RequestMapping(value = "/topic/{name}", method = RequestMethod.GET) - ResponseEntity get(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException { - KafkaTopic kafkaTopic = kafkaService.getTopic(name); - if (kafkaTopic != null) { - return new ResponseEntity<>(kafkaTopic, HttpStatus.OK); - } else { - return new ResponseEntity<>(HttpStatus.NOT_FOUND); - } + @ApiOperation(value = "Retrieves a Kafka topic") + @ApiResponses(value = { + @ApiResponse(message = "Returns Kafka topic", code = 200), + @ApiResponse(message = "Kafka topic is missing", code = 404) + }) + @RequestMapping(value = "/topic/{name}", method = RequestMethod.GET) + ResponseEntity get(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException { + KafkaTopic kafkaTopic = kafkaService.getTopic(name); + if (kafkaTopic != null) { + return new ResponseEntity<>(kafkaTopic, HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); } + } - @ApiOperation(value = "Retrieves all Kafka topics") + @ApiOperation(value = "Retrieves all Kafka topics") + @ApiResponses({ @ApiResponse(message = "Returns a list of all Kafka topics", code = 200) - @RequestMapping(value = "/topic", method = RequestMethod.GET) - ResponseEntity> list() throws Exception { - return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK); - } + }) + @RequestMapping(value = "/topic", method = RequestMethod.GET) + ResponseEntity> list() throws Exception { + return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK); + } - @ApiOperation(value = "Deletes a Kafka topic") - @ApiResponses(value = { @ApiResponse(message = "Kafka topic was deleted", code = 200), - @ApiResponse(message = "Kafka topic is missing", code = 404) }) - @RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE) - ResponseEntity delete(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException { - if (kafkaService.deleteTopic(name)) { - return new ResponseEntity<>(HttpStatus.OK); - } else { - return new ResponseEntity<>(HttpStatus.NOT_FOUND); - } + @ApiOperation(value = "Deletes a Kafka topic") + @ApiResponses(value = { + @ApiResponse(message = "Kafka topic was deleted", code = 200), + @ApiResponse(message = "Kafka topic is missing", code = 404) + }) + @RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE) + ResponseEntity delete(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException { + if (kafkaService.deleteTopic(name)) { + return new ResponseEntity<>(HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); } + } - @ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent offset") - @ApiResponses(value = { @ApiResponse(message = "Returns sample message", code = 200), - @ApiResponse(message = "Either Kafka topic is missing or contains no messages", code = 404) }) - @RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET) - ResponseEntity getSample(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException { - String sampleMessage = kafkaService.getSampleMessage(name); - if (sampleMessage != null) { - return new ResponseEntity<>(sampleMessage, HttpStatus.OK); - } else { - return new ResponseEntity<>(HttpStatus.NOT_FOUND); - } + @ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent offset") + @ApiResponses(value = { + @ApiResponse(message = "Returns sample message", code = 200), + @ApiResponse(message = "Either Kafka topic is missing or contains no messages", code = 404) + }) + @RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET) + ResponseEntity getSample(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException { + String sampleMessage = kafkaService.getSampleMessage(name); + if (sampleMessage != null) { + return new ResponseEntity<>(sampleMessage, HttpStatus.OK); + } else { + return new ResponseEntity<>(HttpStatus.NOT_FOUND); } + } } diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java index f3cd9019ea..bee00f2735 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/KafkaService.java @@ -22,16 +22,49 @@ import java.util.Set; +/** + * This is a set of operations created to interact with Kafka. + */ public interface KafkaService { - String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets"; + /** + * Please see the following for documentation. + * + * @see Kafka offset tracking documentation. + */ + String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets"; - KafkaTopic createTopic(KafkaTopic topic) throws RestException; + /** + * Create a topic in Kafka for given information. + * @param topic The information used to create a Kafka topic. + * @return The Kafka topic created. + * @throws RestException If exceptions occur when creating a topic they should be wrapped in a {@link RestException}. + */ + KafkaTopic createTopic(KafkaTopic topic) throws RestException; - boolean deleteTopic(String name); + /** + * Delete a topic for a given name. + * @param name The name of the topic to delete. + * @return If topic was deleted true; otherwise false. + */ + boolean deleteTopic(String name); - KafkaTopic getTopic(String name); + /** + * Retrieves the Kafka topic for a given name. + * @param name The name of the Kafka topic to retrieve. + * @return A {@link KafkaTopic} with the name of {@code name}. Null if topic with name, {@code name}, doesn't exist. + */ + KafkaTopic getTopic(String name); - Set listTopics(); + /** + * Returns a set of all topics. + * @return A set of all topics in Kafka. + */ + Set listTopics(); - String getSampleMessage(String topic); + /** + * Return a single sample message from a given topic. + * @param topic The name of the topic to retrieve a sample message from. + * @return A string representation of the sample message retrieved. If topic doesn't exist null will be returned. + */ + String getSampleMessage(String topic); } diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java index 33cb2e313b..61e2618d2d 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/KafkaServiceImpl.java @@ -20,8 +20,8 @@ import kafka.admin.AdminOperationException; import kafka.admin.AdminUtils$; import kafka.admin.RackAwareMode; -import kafka.admin.RackAwareMode$; import kafka.utils.ZkUtils; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; @@ -30,6 +30,7 @@ import org.apache.metron.rest.model.KafkaTopic; import org.apache.metron.rest.service.KafkaService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.stereotype.Service; import java.util.HashSet; @@ -38,88 +39,106 @@ import java.util.Set; import java.util.stream.Collectors; +/** + * The default service layer implementation of {@link KafkaService}. + * + * @see KafkaService + */ @Service public class KafkaServiceImpl implements KafkaService { - private ZkUtils zkUtils; - private KafkaConsumer kafkaConsumer; - private AdminUtils$ adminUtils; + /** + * The timeout used when polling Kafka. + */ + private static final int KAFKA_CONSUMER_TIMEOUT = 100; - @Autowired - public KafkaServiceImpl(ZkUtils zkUtils, KafkaConsumer kafkaConsumer, AdminUtils$ adminUtils) { - this.zkUtils = zkUtils; - this.kafkaConsumer = kafkaConsumer; - this.adminUtils = adminUtils; - } + private final ZkUtils zkUtils; + private final ConsumerFactory kafkaConsumerFactory; + private final AdminUtils$ adminUtils; - @Override - public KafkaTopic createTopic(KafkaTopic topic) throws RestException { - if (!listTopics().contains(topic.getName())) { - try { - adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(),RackAwareMode.Disabled$.MODULE$ ); - } catch (AdminOperationException e) { - throw new RestException(e); - } - } - return topic; + /** + * @param zkUtils A utility class used to interact with ZooKeeper. + * @param kafkaConsumerFactory A class used to create {@link KafkaConsumer} in order to interact with Kafka. + * @param adminUtils A utility class used to do administration operations on Kafka. + */ + @Autowired + public KafkaServiceImpl(final ZkUtils zkUtils, + final ConsumerFactory kafkaConsumerFactory, + final AdminUtils$ adminUtils) { + this.zkUtils = zkUtils; + this.kafkaConsumerFactory = kafkaConsumerFactory; + this.adminUtils = adminUtils; + } + + @Override + public KafkaTopic createTopic(final KafkaTopic topic) throws RestException { + if (!listTopics().contains(topic.getName())) { + try { + adminUtils.createTopic(zkUtils, topic.getName(), topic.getNumPartitions(), topic.getReplicationFactor(), topic.getProperties(), RackAwareMode.Disabled$.MODULE$); + } catch (AdminOperationException e) { + throw new RestException(e); + } } + return topic; + } - @Override - public boolean deleteTopic(String name) { - Set topics = listTopics(); - if (topics != null && topics.contains(name)) { - adminUtils.deleteTopic(zkUtils, name); - return true; - } else { - return false; - } + @Override + public boolean deleteTopic(final String name) { + final Set topics = listTopics(); + if (topics != null && topics.contains(name)) { + adminUtils.deleteTopic(zkUtils, name); + return true; + } else { + return false; } + } - @Override - public KafkaTopic getTopic(String name) { - KafkaTopic kafkaTopic = null; - if (listTopics().contains(name)) { - List partitionInfos = kafkaConsumer.partitionsFor(name); - if (partitionInfos.size() > 0) { - PartitionInfo partitionInfo = partitionInfos.get(0); - kafkaTopic = new KafkaTopic(); - kafkaTopic.setName(name); - kafkaTopic.setNumPartitions(partitionInfos.size()); - kafkaTopic.setReplicationFactor(partitionInfo.replicas().length); - } + @Override + public KafkaTopic getTopic(final String name) { + KafkaTopic kafkaTopic = null; + if (listTopics().contains(name)) { + try (Consumer consumer = kafkaConsumerFactory.createConsumer()) { + final List partitionInfos = consumer.partitionsFor(name); + if (partitionInfos.size() > 0) { + final PartitionInfo partitionInfo = partitionInfos.get(0); + kafkaTopic = new KafkaTopic(); + kafkaTopic.setName(name); + kafkaTopic.setNumPartitions(partitionInfos.size()); + kafkaTopic.setReplicationFactor(partitionInfo.replicas().length); } - return kafkaTopic; + } } + return kafkaTopic; + } - @Override - public Set listTopics() { - Set topics; - synchronized (this) { - Map> topicsInfo = kafkaConsumer.listTopics(); - topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet(); - topics.remove(CONSUMER_OFFSETS_TOPIC); - } - return topics; + @Override + public Set listTopics() { + try (Consumer consumer = kafkaConsumerFactory.createConsumer()) { + final Map> topicsInfo = consumer.listTopics(); + final Set topics = topicsInfo == null ? new HashSet<>() : topicsInfo.keySet(); + topics.remove(CONSUMER_OFFSETS_TOPIC); + return topics; } + } - @Override - public String getSampleMessage(String topic) { - String message = null; - if (listTopics().contains(topic)) { - synchronized (this) { - kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream() - .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) - .collect(Collectors.toList())); + @Override + public String getSampleMessage(final String topic) { + String message = null; + if (listTopics().contains(topic)) { + try (Consumer kafkaConsumer = kafkaConsumerFactory.createConsumer()) { + kafkaConsumer.assign(kafkaConsumer.partitionsFor(topic).stream() + .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) + .collect(Collectors.toList())); - kafkaConsumer.assignment().stream() - .filter(p -> (kafkaConsumer.position(p) -1) >= 0) - .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1)); + kafkaConsumer.assignment().stream() + .filter(p -> (kafkaConsumer.position(p) - 1) >= 0) + .forEach(p -> kafkaConsumer.seek(p, kafkaConsumer.position(p) - 1)); - ConsumerRecords records = kafkaConsumer.poll(100); - message = records.isEmpty() ? null : records.iterator().next().value(); - kafkaConsumer.unsubscribe(); - } - } - return message; + final ConsumerRecords records = kafkaConsumer.poll(KAFKA_CONSUMER_TIMEOUT); + message = records.isEmpty() ? null : records.iterator().next().value(); + kafkaConsumer.unsubscribe(); + } } + return message; + } } diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java index edfd5427e4..adfe056faa 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java @@ -25,7 +25,6 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.KafkaComponent; @@ -36,8 +35,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.web.client.RestTemplate; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; @@ -54,7 +57,7 @@ public Properties zkProperties() { @Bean public ZKServerComponent zkServerComponent(Properties zkProperties) { return new ZKServerComponent() - .withPostStartCallback((zkComponent) -> zkProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString())); + .withPostStartCallback((zkComponent) -> zkProperties.setProperty(ZKServerComponent.ZOOKEEPER_PROPERTY, zkComponent.getConnectionString())); } @Bean @@ -66,10 +69,10 @@ public KafkaComponent kafkaWithZKComponent(Properties zkProperties) { @Bean public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, KafkaComponent kafkaWithZKComponent) { ComponentRunner runner = new ComponentRunner.Builder() - .withComponent("zk", zkServerComponent) - .withComponent("kafka", kafkaWithZKComponent) - .withCustomShutdownOrder(new String[] {"kafka","zk"}) - .build(); + .withComponent("zk", zkServerComponent) + .withComponent("kafka", kafkaWithZKComponent) + .withCustomShutdownOrder(new String[]{"kafka", "zk"}) + .build(); try { runner.start(); } catch (UnableToStartException e) { @@ -78,14 +81,14 @@ public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, Kafk return runner; } - @Bean(initMethod = "start", destroyMethod="close") + @Bean(initMethod = "start", destroyMethod = "close") public CuratorFramework client(ComponentRunner componentRunner) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class); return CuratorFrameworkFactory.newClient(zkServerComponent.getConnectionString(), retryPolicy); } - @Bean(destroyMethod="close") + @Bean(destroyMethod = "close") public ZkClient zkClient(ComponentRunner componentRunner) { ZKServerComponent zkServerComponent = componentRunner.getComponent("zk", ZKServerComponent.class); return new ZkClient(zkServerComponent.getConnectionString(), 10000, 10000, ZKStringSerializer$.MODULE$); @@ -96,9 +99,9 @@ public ZkUtils zkUtils(ZkClient zkClient) { return ZkUtils.apply(zkClient, false); } - @Bean(destroyMethod="close") - public KafkaConsumer kafkaConsumer(KafkaComponent kafkaWithZKComponent) { - Properties props = new Properties(); + @Bean + public Map kafkaConsumer(KafkaComponent kafkaWithZKComponent) { + Map props = new HashMap<>(); props.put("bootstrap.servers", kafkaWithZKComponent.getBrokerList()); props.put("group.id", "metron-config"); props.put("enable.auto.commit", "false"); @@ -106,7 +109,12 @@ public KafkaConsumer kafkaConsumer(KafkaComponent kafkaWithZKCom props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - return new KafkaConsumer<>(props); + return props; + } + + @Bean + public ConsumerFactory createConsumerFactory() { + return new DefaultKafkaConsumerFactory<>(kafkaConsumer(kafkaWithZKComponent(zkProperties()))); } @Bean diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java index c7d42b3164..c92feab951 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/KafkaServiceImplTest.java @@ -41,6 +41,7 @@ import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import org.springframework.kafka.core.ConsumerFactory; import java.util.ArrayList; import java.util.HashMap; @@ -71,6 +72,7 @@ public class KafkaServiceImplTest { private ZkUtils zkUtils; private KafkaConsumer kafkaConsumer; + private ConsumerFactory kafkaConsumerFactory; private AdminUtils$ adminUtils; private KafkaService kafkaService; @@ -86,10 +88,13 @@ public class KafkaServiceImplTest { @Before public void setUp() throws Exception { zkUtils = mock(ZkUtils.class); + kafkaConsumerFactory = mock(ConsumerFactory.class); kafkaConsumer = mock(KafkaConsumer.class); adminUtils = mock(AdminUtils$.class); - kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumer, adminUtils); + when(kafkaConsumerFactory.createConsumer()).thenReturn(kafkaConsumer); + + kafkaService = new KafkaServiceImpl(zkUtils, kafkaConsumerFactory, adminUtils); } @Test @@ -104,6 +109,7 @@ public void listTopicsHappyPathWithListTopicsReturningNull() throws Exception { verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils, adminUtils); } @@ -119,6 +125,7 @@ public void listTopicsHappyPathWithListTopicsReturningEmptyMap() throws Exceptio verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils); } @@ -137,6 +144,7 @@ public void listTopicsHappyPath() throws Exception { verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils); } @@ -156,6 +164,7 @@ public void listTopicsShouldProperlyRemoveOffsetTopic() throws Exception { verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils); } @@ -167,6 +176,7 @@ public void deletingTopicThatDoesNotExistShouldReturnFalse() throws Exception { verifyZeroInteractions(zkUtils); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer, zkUtils); } @@ -180,6 +190,7 @@ public void deletingTopicThatExistShouldReturnTrue() throws Exception { assertTrue(kafkaService.deleteTopic("non_existent_topic")); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verify(adminUtils).deleteTopic(zkUtils, "non_existent_topic"); verifyNoMoreInteractions(kafkaConsumer); } @@ -193,6 +204,7 @@ public void makeSureDeletingTopicReturnsFalseWhenNoTopicsExist() throws Exceptio assertFalse(kafkaService.deleteTopic("non_existent_topic")); verify(kafkaConsumer).listTopics(); + verify(kafkaConsumer).close(); verifyNoMoreInteractions(kafkaConsumer); } @@ -230,6 +242,7 @@ public void getTopicShouldProperlyHandleTopicsThatDontExist() throws Exception { verify(kafkaConsumer).listTopics(); verify(kafkaConsumer, times(0)).partitionsFor("t"); + verify(kafkaConsumer).close(); verifyZeroInteractions(zkUtils); verifyNoMoreInteractions(kafkaConsumer); }