Skip to content

Commit

Permalink
Merge pull request #99 from airbnb/luca_luo/add_kafka_encryption_option
Browse files Browse the repository at this point in the history
[kafka] Add encryption option for message sent through kafka
  • Loading branch information
lucaluo committed Aug 14, 2015
2 parents 908fa71 + fdebe31 commit 8171bf5
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 7 deletions.
56 changes: 50 additions & 6 deletions plog-kafka/src/main/java/com/airbnb/plog/kafka/KafkaHandler.java
@@ -1,5 +1,6 @@
package com.airbnb.plog.kafka;

import com.airbnb.plog.kafka.KafkaProvider.EncryptionConfig;
import com.airbnb.plog.Message;
import com.airbnb.plog.handlers.Handler;
import com.eclipsesource.json.JsonObject;
Expand All @@ -12,8 +13,12 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.io.ByteArrayOutputStream;
import java.util.concurrent.atomic.AtomicLong;

import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;

@RequiredArgsConstructor
@Slf4j
public final class KafkaHandler extends SimpleChannelInboundHandler<Message> implements Handler {
Expand All @@ -23,15 +28,35 @@ public final class KafkaHandler extends SimpleChannelInboundHandler<Message> imp
private final AtomicLong failedToSendMessageExceptions = new AtomicLong(), seenMessages = new AtomicLong();
private final ProducerStats producerStats;
private final ProducerTopicMetrics producerAllTopicsStats;
private final EncryptionConfig encryptionConfig;
private SecretKeySpec keySpec = null;

protected KafkaHandler(
final String clientId,
final boolean propagate,
final String defaultTopic,
final Producer<byte[], byte[]> producer,
final EncryptionConfig encryptionConfig) {

protected KafkaHandler(final String clientId, final boolean propagate, final String defaultTopic,
final Producer<byte[], byte[]> producer) {
super();
this.propagate = propagate;
this.producerStats = ProducerStatsRegistry.getProducerStats(clientId);
this.producerAllTopicsStats = ProducerTopicStatsRegistry.getProducerTopicStats(clientId).getProducerAllTopicsStats();
this.producerAllTopicsStats =
ProducerTopicStatsRegistry.getProducerTopicStats(clientId).getProducerAllTopicsStats();
this.defaultTopic = defaultTopic;
this.producer = producer;
this.encryptionConfig = encryptionConfig;

if (encryptionConfig != null) {
final byte[] keyBytes = encryptionConfig.encryptionKey.getBytes();
keySpec = new SecretKeySpec(keyBytes, encryptionConfig.encryptionAlgorithm);
log.info("KafkaHandler start with encryption algorithm '"
+ encryptionConfig.encryptionAlgorithm + "' transformation '"
+ encryptionConfig.encryptionTransformation + "' provider '"
+ encryptionConfig.encryptionProvider + "'.");
} else {
log.info("KafkaHandler start without encryption.");
}
}

private static JsonObject meterToJsonObject(Meter meter) {
Expand All @@ -46,7 +71,14 @@ private static JsonObject meterToJsonObject(Meter meter) {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
seenMessages.incrementAndGet();
final byte[] payload = msg.asBytes();
byte[] payload = msg.asBytes();
if (encryptionConfig != null) {
try {
payload = encrypt(payload);
} catch (Exception e) {
log.error("Fail to encrypt message: ", e.getMessage());
}
}

boolean sawKtTag = false;

Expand All @@ -67,7 +99,7 @@ protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Excep
}
}

private boolean sendOrReportFailure(String topic, byte[] msg) {
private boolean sendOrReportFailure(String topic, final byte[] msg) {
final boolean nonNullTopic = !("null".equals(topic));
if (nonNullTopic) {
try {
Expand All @@ -80,7 +112,19 @@ private boolean sendOrReportFailure(String topic, byte[] msg) {
return nonNullTopic;
}

@Override
private byte[] encrypt(final byte[] plaintext) throws Exception {
Cipher cipher = Cipher.getInstance(
encryptionConfig.encryptionTransformation,encryptionConfig.encryptionProvider);
cipher.init(Cipher.ENCRYPT_MODE, keySpec);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
// IV size is the same as a block size and cipher dependent.
// This can be derived from consumer side by calling `cipher.getBlockSize()`.
outputStream.write(cipher.getIV());
outputStream.write(cipher.doFinal(plaintext));
return outputStream.toByteArray();
}

@Override
public JsonObject getStats() {
return new JsonObject()
.add("default_topic", defaultTopic)
Expand Down
Expand Up @@ -18,6 +18,13 @@
public final class KafkaProvider implements HandlerProvider {
private final static AtomicInteger clientId = new AtomicInteger();

static class EncryptionConfig {
public String encryptionKey;
public String encryptionAlgorithm;
public String encryptionTransformation;
public String encryptionProvider;
}

@Override
public Handler getHandler(Config config) throws Exception {
final String defaultTopic = config.getString("default_topic");
Expand Down Expand Up @@ -46,6 +53,17 @@ public Handler getHandler(Config config) throws Exception {
final ProducerConfig producerConfig = new ProducerConfig(properties);
final Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig);

return new KafkaHandler(clientId, propagate, defaultTopic, producer);
EncryptionConfig encryptionConfig = new EncryptionConfig();
try {
Config encryption = config.getConfig("encryption");
encryptionConfig.encryptionKey = encryption.getString("key");
encryptionConfig.encryptionAlgorithm = encryption.getString("algorithm");
encryptionConfig.encryptionTransformation = encryption.getString("transformation");
encryptionConfig.encryptionProvider = encryption.getString("provider");
} catch (ConfigException.Missing ignored) {
encryptionConfig = null;
}

return new KafkaHandler(clientId, propagate, defaultTopic, producer, encryptionConfig);
}
}

0 comments on commit 8171bf5

Please sign in to comment.