Skip to content
Permalink
Browse files
ARIES-1883 - kafka based implementation
  • Loading branch information
tmaret committed Jan 11, 2019
1 parent 4c15ecf commit 1ad22a3b9684e916b387db5e15e31fbc1d2bbbe8
Showing 11 changed files with 852 additions and 1 deletion.
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.aries.events</groupId>
<artifactId>org.apache.aries.events</artifactId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<artifactId>org.apache.aries.events.kafka</artifactId>

<dependencies>
<dependency>
<groupId>org.apache.aries.events</groupId>
<artifactId>org.apache.aries.events.api</artifactId>
<version>${project.version}</version>
</dependency>
<!-- Apache Kafka -->
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
<version>2.1.0_1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>


</project>
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aries.events.kafka;

import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;

@ObjectClassDefinition(name = "Apache Aries Events - Apache Kafka endpoint",
description = "Apache Kafka endpoint")
public @interface KafkaEndpoint {

@AttributeDefinition(name = "Kafka Bootstrap Servers",
description = "A comma separated list of host/port pairs to use for establishing the initial connection to the Kafka cluster.")
String kafkaBootstrapServers() default "localhost:9092";


}
@@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aries.events.kafka;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import org.apache.aries.events.api.Message;
import org.apache.aries.events.api.Messaging;
import org.apache.aries.events.api.Position;
import org.apache.aries.events.api.Seek;
import org.apache.aries.events.api.SubscribeRequestBuilder;
import org.apache.aries.events.api.SubscribeRequestBuilder.SubscribeRequest;
import org.apache.aries.events.api.Subscription;
import org.apache.aries.events.api.Type;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.Integer.parseInt;
import static java.lang.Long.parseLong;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Collections.singleton;
import static java.util.Collections.unmodifiableMap;
import static java.util.stream.StreamSupport.stream;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

@Type("kafka")
@Component(service = Messaging.class, configurationPolicy = ConfigurationPolicy.REQUIRE)
@Designate(ocd = KafkaEndpoint.class)
public class KafkaMessaging implements Messaging {

private static final Logger LOG = LoggerFactory.getLogger(KafkaMessaging.class);

/**
* The partition to send and receive records.
*/
private static final int PARTITION = 0;

/**
* Shared Kafka producer instance ({@code KafkaProducer}s are thread-safe).
*/
private KafkaProducer<String, byte[]> producer;

private Map<String, Object> producerConfig;

private KafkaEndpoint endPoint;

@Activate
public void activate(KafkaEndpoint endPoint) {
this.endPoint = endPoint;
producerConfig = new HashMap<>();
producerConfig.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfig.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
// We favour durability over throughput
// and thus requires full acknowledgment
// from replica leader and followers.
producerConfig.put(ACKS_CONFIG, "all");
producerConfig = unmodifiableMap(producerConfig);
}

@Deactivate
public void deactivate() {
closeQuietly(producer);
}

@Override
public void send(String topic, Message message) {
ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(topic, PARTITION, null, message.getPayload(), toHeaders(message.getProperties()));
try {
RecordMetadata metadata = kafkaProducer().send(record).get();
LOG.info(format("Sent to %s", metadata));
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(format("Failed to send mesage on topic %s", topic), e);
}
}

@Override
public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
SubscribeRequest request = requestBuilder.build();
KafkaConsumer<String, byte[]> consumer = buildKafkaConsumer(request.getSeek());

TopicPartition topicPartition = new TopicPartition(request.getTopic(), PARTITION);

Collection<TopicPartition> topicPartitions = singleton(topicPartition);
consumer.assign(topicPartitions);

if (request.getPosition() != null) {
consumer.seek(topicPartition, asKafkaPosition(request.getPosition()).getOffset());
} else if (request.getSeek() == Seek.earliest) {
consumer.seekToBeginning(topicPartitions);
} else {
consumer.seekToEnd(topicPartitions);
}

KafkaSubscription subscription = new KafkaSubscription(consumer, request.getCallback());
// TODO pool the threads
Thread thread = new Thread(subscription);
thread.setDaemon(true);
thread.start();
return subscription;
}

@Override
public Position positionFromString(String position) {
String[] chunks = position.split(":");
if (chunks.length != 2) {
throw new IllegalArgumentException(format("Illegal position format %s", position));
}
return new KafkaPosition(parseInt(chunks[0]), parseLong(chunks[1]));
}

static String positionToString(Position position) {
KafkaPosition kafkaPosition = asKafkaPosition(position);
return format("%s:%s", kafkaPosition.getPartition(), kafkaPosition.getOffset());
}

static Iterable<Header> toHeaders(Map<String, String> properties) {
return properties.entrySet().stream()
.map(KafkaMessaging::toHeader)
.collect(Collectors.toList());
}

static Map<String, String> toProperties(Headers headers) {
return stream(headers.spliterator(), true)
.collect(Collectors.toMap(Header::key, header -> new String(header.value(), UTF_8)));
}

static RecordHeader toHeader(Map.Entry<String, String> property) {
return new RecordHeader(property.getKey(), property.getValue().getBytes(UTF_8));
}

static Message toMessage(ConsumerRecord<String, byte[]> record) {
return new Message(record.value(), toProperties(record.headers()));
}


private synchronized KafkaProducer<String, byte[]> kafkaProducer() {
if (producer == null) {
producer = new KafkaProducer<>(producerConfig);
}
return producer;
}

private KafkaConsumer<String, byte[]> buildKafkaConsumer(Seek seek) {

String groupId = UUID.randomUUID().toString();

Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, endPoint.kafkaBootstrapServers());
consumerConfig.put(GROUP_ID_CONFIG, groupId);
consumerConfig.put(ENABLE_AUTO_COMMIT_CONFIG, false);
consumerConfig.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(AUTO_OFFSET_RESET_CONFIG, seek.name());

return new KafkaConsumer<>(unmodifiableMap(consumerConfig));
}


private void closeQuietly(Closeable closeable) {
if (closeable != null) {
try {
closeable.close();
} catch (IOException ignore) {
// ignore
}
}
}

private static KafkaPosition asKafkaPosition(Position position) {
if (! KafkaPosition.class.isInstance(position)) {
throw new IllegalArgumentException(format("Position %s must be and instance of %s", position, KafkaPosition.class.getCanonicalName()));
}
return (KafkaPosition) position;
}


}
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.aries.events.kafka;

import javax.annotation.Nonnull;

import org.apache.aries.events.api.Position;

public final class KafkaPosition implements Position {

private final int partition;

private final long offset;

public KafkaPosition(int partition, long offset) {
this.partition = partition;
this.offset = offset;
}

public int getPartition() {
return partition;
}

public long getOffset() {
return offset;
}

@Override
public String toString() {
return positionToString();
}

@Override
public String positionToString() {
return KafkaMessaging.positionToString(this);
}

@Override
public int compareTo(@Nonnull Position p) {
return Long.compare(offset, ((KafkaPosition)p).offset);
}
}

0 comments on commit 1ad22a3

Please sign in to comment.