Skip to content

Commit

Permalink
Migrate to kafka 2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
seneque committed Dec 19, 2018
1 parent 8f5230b commit ca00938
Show file tree
Hide file tree
Showing 17 changed files with 41 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.ExtendedDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -56,18 +55,18 @@
* @see CryptoSerializer
* @see Decryptor
*/
public class CryptoDeserializer<T> implements ExtendedDeserializer<T> {
public class CryptoDeserializer<T> implements Deserializer<T> {

private static final Logger log = LoggerFactory.getLogger(CryptoDeserializer.class);

private final ExtendedDeserializer<? extends T> rawDeserializer;
private final Deserializer<? extends T> rawDeserializer;
private final Decryptor decryptor;

/**
* @param rawDeserializer deserializer to deserialize clear data
* @param decryptor Decryptor used to decrypt the data
*/
public CryptoDeserializer(ExtendedDeserializer<? extends T> rawDeserializer, Decryptor decryptor) {
public CryptoDeserializer(Deserializer<? extends T> rawDeserializer, Decryptor decryptor) {

this.rawDeserializer = rawDeserializer;
this.decryptor = decryptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.quicksign.kafka.crypto;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.ExtendedDeserializer;

public class CryptoDeserializerFactory {

Expand All @@ -32,6 +31,6 @@ public CryptoDeserializerFactory(Decryptor decryptor) {
}

public <T> CryptoDeserializer<T> buildFrom(Deserializer<T> rawDeserializer) {
return new CryptoDeserializer<>(ExtendedDeserializer.Wrapper.ensureExtended(rawDeserializer), decryptor);
return new CryptoDeserializer<>(rawDeserializer, decryptor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -57,12 +56,12 @@
*
* @param <T>
*/
public class CryptoSerializer<T> implements ExtendedSerializer<T> {
public class CryptoSerializer<T> implements Serializer<T> {

private static final Logger log = LoggerFactory.getLogger(CryptoSerializer.class);


private final ExtendedSerializer<? super T> rawSerializer;
private final Serializer<? super T> rawSerializer;
private final Encryptor encryptor;
private final ThreadLocal<byte[]> keyRefHolder;

Expand All @@ -71,7 +70,7 @@ public class CryptoSerializer<T> implements ExtendedSerializer<T> {
* @param encryptor {@link Encryptor} to encrypt data
* @param keyRefHolder {@link ThreadLocal} used to communicate the key reference when using Kafka Stream (unused for regular Kafka Producer)
*/
public CryptoSerializer(ExtendedSerializer<? super T> rawSerializer, Encryptor encryptor, ThreadLocal<byte[]> keyRefHolder) {
public CryptoSerializer(Serializer<? super T> rawSerializer, Encryptor encryptor, ThreadLocal<byte[]> keyRefHolder) {
this.rawSerializer = rawSerializer;
this.encryptor = encryptor;
this.keyRefHolder = keyRefHolder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
package io.quicksign.kafka.crypto;

import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;

public class CryptoSerializerFactory {
Expand All @@ -32,6 +31,6 @@ public CryptoSerializerFactory(Encryptor encryptor) {
}

public <T> CryptoSerializer<T> buildFrom(Serializer<T> rawSerializer) {
return new CryptoSerializer<>(ExtendedSerializer.Wrapper.ensureExtended(rawSerializer), encryptor, null);
return new CryptoSerializer<>(rawSerializer, encryptor, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;

import io.quicksign.kafka.crypto.KafkaCryptoConstants;
Expand All @@ -34,9 +33,9 @@
*
* @param <T>
*/
public class CryptoAwareSerializerWrapper<T> implements ExtendedSerializer<T> {
public class CryptoAwareSerializerWrapper<T> implements Serializer<T> {

private final ExtendedSerializer<T> rawSerializer;
private final Serializer<T> rawSerializer;
private final KeyReferenceExtractor keyReferenceExtractor;
private final ThreadLocal<byte[]> keyRefHolder;

Expand All @@ -46,7 +45,7 @@ public class CryptoAwareSerializerWrapper<T> implements ExtendedSerializer<T> {
* @param keyRefHolder the ThreadLocal to share the keyref (only used in the context of a Kafka Stream)
*/
public CryptoAwareSerializerWrapper(Serializer<T> rawSerializer, KeyReferenceExtractor keyReferenceExtractor, ThreadLocal<byte[]> keyRefHolder) {
this.rawSerializer = ExtendedSerializer.Wrapper.ensureExtended(rawSerializer);
this.rawSerializer = rawSerializer;
this.keyReferenceExtractor = keyReferenceExtractor;
this.keyRefHolder = keyRefHolder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* and put in the context the cryptographic key reference, that will be used to encrypt the record value.</p>
*
* <p>This based on the fact that {@link org.apache.kafka.clients.producer.KafkaProducer#doSend(org.apache.kafka.clients.producer.ProducerRecord, org.apache.kafka.clients.producer.Callback) KafkaProducer}
* and {@link org.apache.kafka.streams.processor.internals.RecordCollectorImpl#send(java.lang.String, java.lang.Object, java.lang.Object, java.lang.Integer, java.lang.Long, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer) RecordCollector}
* and {@link org.apache.kafka.streams.processor.internals.RecordCollectorImpl#send(java.lang.String, java.lang.Object, java.lang.Object, org.apache.kafka.common.header.Headers, java.lang.Integer, java.lang.Long, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer) RecordCollector}
* (for streams)
* call key serialization before value serialization.
* </p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
*/
package io.quicksign.kafka.crypto.pairing.serdes;

import org.apache.kafka.common.serialization.ExtendedDeserializer;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

Expand Down Expand Up @@ -65,11 +63,8 @@ public <T> Serde<T> buildFrom(Serde<T> rawSerde) {
}

private <T> Serde<T> buildFrom(Serde<T> rawSerde, ThreadLocal<byte[]> keyRefHolder) {
ExtendedDeserializer<T> rawExtendedDeserializer = ExtendedDeserializer.Wrapper.ensureExtended(rawSerde.deserializer());
ExtendedSerializer<T> rawExtendedSerializer = ExtendedSerializer.Wrapper.ensureExtended(rawSerde.serializer());

return Serdes.serdeFrom(new CryptoSerializer<>(rawExtendedSerializer, encryptor, keyRefHolder),
new CryptoDeserializer<>(rawExtendedDeserializer, decryptor));
return Serdes.serdeFrom(new CryptoSerializer<>(rawSerde.serializer(), encryptor, keyRefHolder),
new CryptoDeserializer<>(rawSerde.deserializer(), decryptor));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
package io.quicksign.kafka.crypto.pairing.serdes;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Serialized;
Expand Down Expand Up @@ -60,6 +61,13 @@ public Serialized<K, V> toSerialized() {
return Serialized.with(keySerde, valueSerde);
}

/**
* Build a {@link Grouped} using the keySerde and valueSerde of the pair
*/
public Grouped<K, V> toGrouped() {
return Grouped.with(keySerde, valueSerde);
}

/**
* Build a {@link Produced} using the keySerde and valueSerde of the pair
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*/
package io.quicksign.kafka.crypto.pairing.serializer;

import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;

import io.quicksign.kafka.crypto.CryptoSerializer;
Expand Down Expand Up @@ -51,7 +50,7 @@ public CryptoSerializerPairFactory(Encryptor encryptor, KeyReferenceExtractor ke
@Override
public <K, V> SerializerPair<K, V> build(Serializer<K> keySerializer, Serializer<V> valueSerializer) {
Serializer<K> newKeySerializer = new CryptoAwareSerializerWrapper<K>(keySerializer, keyReferenceExtractor, null);
Serializer<V> newvalueSerializer = new CryptoSerializer<>(ExtendedSerializer.Wrapper.ensureExtended(valueSerializer), encryptor, null);
Serializer<V> newvalueSerializer = new CryptoSerializer<>(valueSerializer, encryptor, null);
return new SerializerPair<>(newKeySerializer, newvalueSerializer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ExtendedDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
Expand All @@ -42,7 +42,7 @@ public class CryptoDeserializerTest {
Decryptor decryptor;

@Mock
ExtendedDeserializer<String> rawDeserializer;
Deserializer<String> rawDeserializer;

@InjectMocks
CryptoDeserializer<String> cryptoDeserializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ExtendedSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
Expand All @@ -40,7 +40,7 @@
public class CryptoSerializerTest {

@Mock
ExtendedSerializer<String> rawSerializer;
Serializer<String> rawSerializer;

@Mock
Encryptor encryptor;
Expand Down
4 changes: 2 additions & 2 deletions doc/running-docker-kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ You need to have a running Kafka in order to run the examples, here's how you ca
[source,indent=0]
....
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=192.168.99.100 landoop/fast-data-dev:cp4.0.0
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=192.168.99.100 landoop/fast-data-dev:2.0.1
....
*On linux*
[source,indent=0]
....
docker run --rm --net=host -e ADV_HOST=localhost landoop/fast-data-dev:cp4.0.0
docker run --rm --net=host -e ADV_HOST=localhost landoop/fast-data-dev:2.0.1
....
====
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<kafka.version>1.1.0</kafka.version>
<kafka.version>2.1.0</kafka.version>
</properties>

<distributionManagement>
Expand Down
2 changes: 1 addition & 1 deletion samples/generatedkey-sample/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ version: '3'

services:
kafka:
image: landoop/fast-data-dev:cp4.0.0
image: landoop/fast-data-dev:2.0.1
network_mode: "host"
2 changes: 1 addition & 1 deletion samples/kafkastream-with-keyrepo-sample/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ version: '3'

services:
kafka:
image: landoop/fast-data-dev:cp4.0.0
image: landoop/fast-data-dev:2.0.1
network_mode: "host"
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
*/
package io.quicksign.kafka.crypto.samples.stream.keyrepo;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
Expand Down Expand Up @@ -72,12 +71,11 @@ public void run() {
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(name + "_balance");


Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, name);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsConfig config = new StreamsConfig(props);
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, name);
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());


StreamsBuilder streamsBuilder = new StreamsBuilder();
Expand All @@ -90,7 +88,7 @@ public void run() {
.reduce((s1, s2) -> "" + (Integer.valueOf(s1) + Integer.valueOf(s2)),
serdesPair.applyTo(Materialized.as(storeSupplier)));

KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), config);
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
kafkaStreams.start();

// end::stream[]
Expand Down
2 changes: 1 addition & 1 deletion samples/keyrepo-sample/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ version: '3'

services:
kafka:
image: landoop/fast-data-dev:cp4.0.0
image: landoop/fast-data-dev:2.0.1
network_mode: "host"

0 comments on commit ca00938

Please sign in to comment.