Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
METRON-891: Updated kafka service to create a ConsumerFactory per thr…
Browse files Browse the repository at this point in the history
…ead to prevent possible concurrency issues.

METRON-891: Updated kafka to use spring's ConsumerFactory functionality for Kafka. This will help with potential multi-threading issues.

METRON-891: Fixed unit test issues and checkstyle issues.

METRON-891: Really updating dependencies...

METRON-891: Fixing license after autoformat add p tags around link.
  • Loading branch information
jjmeyer0 committed May 11, 2017
1 parent 716bda3 commit d24185a
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 144 deletions.
1 change: 1 addition & 0 deletions dependencies_with_url.csv
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,4 @@ org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.or
net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/byteseek
org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
org.springframework.security.kerberos:spring-security-kerberos-core:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
org.springframework.kafka:spring-kafka:jar:1.1.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-kafka
34 changes: 31 additions & 3 deletions metron-interface/metron-rest/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,36 @@
<spring.kerberos.version>1.0.1.RELEASE</spring.kerberos.version>
<swagger.version>2.5.0</swagger.version>
<mysql.client.version>5.1.40</mysql.client.version>
<spring-kafka.version>1.1.1.RELEASE</spring-kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
Expand Down Expand Up @@ -75,7 +103,7 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</exclusions>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
Expand Down Expand Up @@ -112,8 +140,8 @@
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,56 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.Properties;
import java.util.HashMap;
import java.util.Map;

import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;

/**
* Configuration used for connecting to Kafka.
*/
@Configuration
@Profile("!" + TEST_PROFILE)
public class KafkaConfig {

/**
* The Spring environment.
*/
private Environment environment;

/**
* Construvtor used to inject {@link Environment}.
* @param environment Spring environment to inject.
*/
@Autowired
public KafkaConfig(Environment environment) {
public KafkaConfig(final Environment environment) {
this.environment = environment;
}

/**
* The client used for ZooKeeper.
*/
@Autowired
private ZkClient zkClient;

/**
* Bean for ZooKeeper
*/
@Bean
public ZkUtils zkUtils() {
return ZkUtils.apply(zkClient, false);
}

@Bean(destroyMethod = "close")
public KafkaConsumer<String, String> kafkaConsumer() {
Properties props = new Properties();
/**
* Create properties that will be used by {@link this#createConsumerFactory()}
*
* @return Configurations used by {@link this#createConsumerFactory()}.
*/
@Bean
public Map<String, Object> consumerProperties() {
final Map<String, Object> props = new HashMap<>();
props.put("bootstrap.servers", environment.getProperty(MetronRestConstants.KAFKA_BROKER_URL_SPRING_PROPERTY));
props.put("group.id", "metron-rest");
props.put("enable.auto.commit", "false");
Expand All @@ -64,9 +87,24 @@ public KafkaConsumer<String, String> kafkaConsumer() {
if (environment.getProperty(MetronRestConstants.KERBEROS_ENABLED_SPRING_PROPERTY, Boolean.class, false)) {
props.put("security.protocol", "SASL_PLAINTEXT");
}
return new KafkaConsumer<>(props);
return props;
}

/**
* Create a {@link ConsumerFactory} which will be used for certain Kafka interactions within config API.
*
* @return a {@link ConsumerFactory} used to create {@link KafkaConsumer} for interactions with Kafka.
*/
@Bean
public ConsumerFactory<String, String> createConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}

/**
* Create a bean for {@link AdminUtils$}. This is primarily done to make testing a bit easier.
*
* @return {@link AdminUtils$} is written in scala. We return a reference to this class.
*/
@Bean
public AdminUtils$ adminUtils() {
return AdminUtils$.MODULE$;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,62 +35,78 @@

import java.util.Set;

/**
* The API resource that is use to interact with Kafka.
*/
@RestController
@RequestMapping("/api/v1/kafka")
public class KafkaController {

@Autowired
private KafkaService kafkaService;
/**
* Service used to interact with Kafka.
*/
@Autowired
private KafkaService kafkaService;

@ApiOperation(value = "Creates a new Kafka topic")
@ApiOperation(value = "Creates a new Kafka topic")
@ApiResponses({
@ApiResponse(message = "Returns saved Kafka topic", code = 200)
@RequestMapping(value = "/topic", method = RequestMethod.POST)
ResponseEntity<KafkaTopic> save(@ApiParam(name="topic", value="Kafka topic", required=true)@RequestBody KafkaTopic topic) throws RestException {
return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED);
}
})
@RequestMapping(value = "/topic", method = RequestMethod.POST)
ResponseEntity<KafkaTopic> save(final @ApiParam(name = "topic", value = "Kafka topic", required = true) @RequestBody KafkaTopic topic) throws RestException {
return new ResponseEntity<>(kafkaService.createTopic(topic), HttpStatus.CREATED);
}

@ApiOperation(value = "Retrieves a Kafka topic")
@ApiResponses(value = { @ApiResponse(message = "Returns Kafka topic", code = 200),
@ApiResponse(message = "Kafka topic is missing", code = 404) })
@RequestMapping(value = "/topic/{name}", method = RequestMethod.GET)
ResponseEntity<KafkaTopic> get(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException {
KafkaTopic kafkaTopic = kafkaService.getTopic(name);
if (kafkaTopic != null) {
return new ResponseEntity<>(kafkaTopic, HttpStatus.OK);
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
@ApiOperation(value = "Retrieves a Kafka topic")
@ApiResponses(value = {
@ApiResponse(message = "Returns Kafka topic", code = 200),
@ApiResponse(message = "Kafka topic is missing", code = 404)
})
@RequestMapping(value = "/topic/{name}", method = RequestMethod.GET)
ResponseEntity<KafkaTopic> get(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException {
KafkaTopic kafkaTopic = kafkaService.getTopic(name);
if (kafkaTopic != null) {
return new ResponseEntity<>(kafkaTopic, HttpStatus.OK);
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
}

@ApiOperation(value = "Retrieves all Kafka topics")
@ApiOperation(value = "Retrieves all Kafka topics")
@ApiResponses({
@ApiResponse(message = "Returns a list of all Kafka topics", code = 200)
@RequestMapping(value = "/topic", method = RequestMethod.GET)
ResponseEntity<Set<String>> list() throws Exception {
return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK);
}
})
@RequestMapping(value = "/topic", method = RequestMethod.GET)
ResponseEntity<Set<String>> list() throws Exception {
return new ResponseEntity<>(kafkaService.listTopics(), HttpStatus.OK);
}

@ApiOperation(value = "Deletes a Kafka topic")
@ApiResponses(value = { @ApiResponse(message = "Kafka topic was deleted", code = 200),
@ApiResponse(message = "Kafka topic is missing", code = 404) })
@RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE)
ResponseEntity<Void> delete(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException {
if (kafkaService.deleteTopic(name)) {
return new ResponseEntity<>(HttpStatus.OK);
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
@ApiOperation(value = "Deletes a Kafka topic")
@ApiResponses(value = {
@ApiResponse(message = "Kafka topic was deleted", code = 200),
@ApiResponse(message = "Kafka topic is missing", code = 404)
})
@RequestMapping(value = "/topic/{name}", method = RequestMethod.DELETE)
ResponseEntity<Void> delete(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException {
if (kafkaService.deleteTopic(name)) {
return new ResponseEntity<>(HttpStatus.OK);
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
}

@ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent offset")
@ApiResponses(value = { @ApiResponse(message = "Returns sample message", code = 200),
@ApiResponse(message = "Either Kafka topic is missing or contains no messages", code = 404) })
@RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET)
ResponseEntity<String> getSample(@ApiParam(name="name", value="Kafka topic name", required=true)@PathVariable String name) throws RestException {
String sampleMessage = kafkaService.getSampleMessage(name);
if (sampleMessage != null) {
return new ResponseEntity<>(sampleMessage, HttpStatus.OK);
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
@ApiOperation(value = "Retrieves a sample message from a Kafka topic using the most recent offset")
@ApiResponses(value = {
@ApiResponse(message = "Returns sample message", code = 200),
@ApiResponse(message = "Either Kafka topic is missing or contains no messages", code = 404)
})
@RequestMapping(value = "/topic/{name}/sample", method = RequestMethod.GET)
ResponseEntity<String> getSample(final @ApiParam(name = "name", value = "Kafka topic name", required = true) @PathVariable String name) throws RestException {
String sampleMessage = kafkaService.getSampleMessage(name);
if (sampleMessage != null) {
return new ResponseEntity<>(sampleMessage, HttpStatus.OK);
} else {
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,49 @@

import java.util.Set;

/**
* This is a set of operations created to interact with Kafka.
*/
public interface KafkaService {
String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";
/**
* Please see the following for documentation.
*
* @see <a href="https://kafka.apache.org/documentation/#impl_offsettracking">Kafka offset tracking documentation</a>.
*/
String CONSUMER_OFFSETS_TOPIC = "__consumer_offsets";

KafkaTopic createTopic(KafkaTopic topic) throws RestException;
/**
* Create a topic in Kafka for given information.
* @param topic The information used to create a Kafka topic.
* @return The Kafka topic created.
* @throws RestException If exceptions occur when creating a topic they should be wrapped in a {@link RestException}.
*/
KafkaTopic createTopic(KafkaTopic topic) throws RestException;

boolean deleteTopic(String name);
/**
* Delete a topic for a given name.
* @param name The name of the topic to delete.
* @return If topic was deleted true; otherwise false.
*/
boolean deleteTopic(String name);

KafkaTopic getTopic(String name);
/**
* Retrieves the Kafka topic for a given name.
* @param name The name of the Kafka topic to retrieve.
* @return A {@link KafkaTopic} with the name of {@code name}. Null if topic with name, {@code name}, doesn't exist.
*/
KafkaTopic getTopic(String name);

Set<String> listTopics();
/**
* Returns a set of all topics.
* @return A set of all topics in Kafka.
*/
Set<String> listTopics();

String getSampleMessage(String topic);
/**
* Return a single sample message from a given topic.
* @param topic The name of the topic to retrieve a sample message from.
* @return A string representation of the sample message retrieved. If topic doesn't exist null will be returned.
*/
String getSampleMessage(String topic);
}

0 comments on commit d24185a

Please sign in to comment.