diff --git a/CHANGELOG.md b/CHANGELOG.md index 78f3e48..46fc76d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to this project will be documented in this file. ## [Unreleased] +###Changed +- Class `com.github.danielwegener.logback.kafka.KafkaAppenderBase` has been inlined into the `KafkaAppender`. +- Class `com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder` has been renamed to `com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder` (#9). To ease the migration, there is still a deprecated class alias `PatternLayoutKafkaMessageEncoder`. You should change your logback.xml to `LayoutKafkaMessageEncoder` as soon as possible! +- `KafkaAppender`, `KeyingStrategy` and `LayoutKafkaMessageEncoder` are now generic and can now be used with alternative logback implementations like logback-access (#16)! ## [0.0.5] - 2015-12-23 ###Changed @@ -37,4 +41,4 @@ All notable changes to this project will be documented in this file. [0.0.4]: https://github.com/danielwegener/logback-kafka-appender/compare/logback-kafka-appender-0.0.3...logback-kafka-appender-0.0.4 [0.0.3]: https://github.com/danielwegener/logback-kafka-appender/compare/logback-kafka-appender-0.0.2...logback-kafka-appender-0.0.3 [0.0.2]: https://github.com/danielwegener/logback-kafka-appender/compare/logback-kafka-appender-0.0.1...logback-kafka-appender-0.0.2 -[0.0.1]: https://github.com/danielwegener/logback-kafka-appender/compare/465947...logback-kafka-appender-0.0.1 \ No newline at end of file +[0.0.1]: https://github.com/danielwegener/logback-kafka-appender/compare/465947...logback-kafka-appender-0.0.1 diff --git a/README.md b/README.md index 8f7c63b..067e8f7 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ This is an example `logback.xml` that uses a common `PatternLayout` to encode a - + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n @@ -145,11 +145,13 @@ Just roll your own `KafkaMessageEncoder`. The interface is quite simple: ```java package com.github.danielwegener.logback.kafka.encoding; public interface KafkaMessageEncoder { - byte[] doEncode(E loggingEvent); + byte[] doEncode(E event); } ``` -Your encoder should be type-parameterized for any subtype of ILoggingEvent like in + +Your encoder should be type-parameterized for any subtype of the type of event you want to support (typically `ILoggingEvent`) like in + ```java public class MyEncoder extends KafkaMessageEncoderBase { //... ``` @@ -174,9 +176,9 @@ The kafka producer client uses a messages key as partitioner. Thus `logback-kafk |---|---| | `RoundRobinPartitioningStrategy` (default) | Evenly distributes all written log messages over all available kafka partitions. This strategy may lead to unexpected read orders on clients. | | `HostNamePartitioningStrategy` | This strategy uses the HOSTNAME to partition the log messages to kafka. This is useful because it ensures that all log messages issued by this host will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of hosts (compared to the number of partitions). | -| `ContextNamePartitioningStrategy` | This strategy uses logbacks CONTEXT_NAME to partition the log messages to kafka. This is ensures that all log messages logged by the same logging context will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of hosts (compared to the number of partitions). | -| `ThreadNamePartitioningStrategy` | This strategy uses the calling threads name as partitioning key. This ensures that all messages logged by the same thread will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of thread(-names) (compared to the number of partitions). | -| `LoggerNamePartitioningStrategy` | * This strategy uses the logger name as partitioning key. This ensures that all messages logged by the same logger will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of distinct loggers (compared to the number of partitions). | +| `ContextNamePartitioningStrategy` | This strategy uses logbacks CONTEXT_NAME to partition the log messages to kafka. This is ensures that all log messages logged by the same logging context will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of hosts (compared to the number of partitions). This strategy only works for `ILoggingEvents`. | +| `ThreadNamePartitioningStrategy` | This strategy uses the calling threads name as partitioning key. This ensures that all messages logged by the same thread will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of thread(-names) (compared to the number of partitions). This strategy only works for `ILoggingEvents`. | +| `LoggerNamePartitioningStrategy` | * This strategy uses the logger name as partitioning key. This ensures that all messages logged by the same logger will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of distinct loggers (compared to the number of partitions). This strategy only works for `ILoggingEvents`. | @@ -188,7 +190,7 @@ If none of the above partitioners satisfies your requirements, you can easily im package foo; com.github.danielwegener.logback.kafka.keying.KeyingStrategy; -public class LevelKeyingStrategy implements KeyingStrategy { +public class LevelKeyingStrategy implements KeyingStrategy { @Override public byte[] createKey(ILoggingEvent e) { return ByteBuffer.allocate(4).putInt(e.getLevel()).array(); diff --git a/pom.xml b/pom.xml index 649ae24..13782b4 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.github.danielwegener logback-kafka-appender - 0.0.6-SNAPSHOT + 0.1.0-SNAPSHOT org.sonatype.oss oss-parent diff --git a/src/example/resources/logback.xml b/src/example/resources/logback.xml index 7826ae4..c8c6e49 100644 --- a/src/example/resources/logback.xml +++ b/src/example/resources/logback.xml @@ -16,7 +16,7 @@ - + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n @@ -47,7 +47,7 @@ - + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n @@ -84,4 +84,4 @@ - \ No newline at end of file + diff --git a/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppender.java b/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppender.java index 84d00e4..104af50 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppender.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppender.java @@ -1,10 +1,174 @@ package com.github.danielwegener.logback.kafka; import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Appender; +import ch.qos.logback.core.spi.AppenderAttachableImpl; +import com.github.danielwegener.logback.kafka.delivery.FailedDeliveryCallback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.serialization.ByteArraySerializer; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; /** - * See logback-kafka-appender at github + * @since 0.0.1 */ -public class KafkaAppender extends KafkaAppenderBase { +public class KafkaAppender extends KafkaAppenderConfig { + + /** + * Kafka clients uses this prefix for its slf4j logging. + * This appender defers appends of any Kafka logs since it could cause harmful infinite recursion/self feeding effects. + */ + private static final String KAFKA_LOGGER_PREFIX = "org.apache.kafka.clients"; + + private LazyProducer lazyProducer = null; + private final AppenderAttachableImpl aai = new AppenderAttachableImpl(); + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + private final FailedDeliveryCallback failedDeliveryCallback = new FailedDeliveryCallback() { + @Override + public void onFailedDelivery(E evt, Throwable throwable) { + aai.appendLoopOnAppenders(evt); + } + }; + + public KafkaAppender() { + // setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer) + addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + } + + @Override + public void doAppend(E e) { + ensureDeferredAppends(); + if (e instanceof ILoggingEvent && ((ILoggingEvent)e).getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) { + deferAppend(e); + } else { + super.doAppend(e); + } + } + + @Override + public void start() { + // only error free appenders should be activated + if (!checkPrerequisites()) return; + + lazyProducer = new LazyProducer(); + + super.start(); + } + + @Override + public void stop() { + super.stop(); + if (lazyProducer != null && lazyProducer.isInitialized()) { + try { + lazyProducer.get().close(); + } catch (KafkaException e) { + this.addWarn("Failed to shut down kafka producer: " + e.getMessage(), e); + } + lazyProducer = null; + } + } + + @Override + public void addAppender(Appender newAppender) { + aai.addAppender(newAppender); + } + + @Override + public Iterator> iteratorForAppenders() { + return aai.iteratorForAppenders(); + } + + @Override + public Appender getAppender(String name) { + return aai.getAppender(name); + } + + @Override + public boolean isAttached(Appender appender) { + return aai.isAttached(appender); + } + + @Override + public void detachAndStopAllAppenders() { + aai.detachAndStopAllAppenders(); + } + + @Override + public boolean detachAppender(Appender appender) { + return aai.detachAppender(appender); + } + + @Override + public boolean detachAppender(String name) { + return aai.detachAppender(name); + } + + @Override + protected void append(E e) { + final byte[] payload = encoder.doEncode(e); + final byte[] key = keyingStrategy.createKey(e); + final ProducerRecord record = new ProducerRecord(topic, key, payload); + deliveryStrategy.send(lazyProducer.get(), record, e, failedDeliveryCallback); + } + + protected Producer createProducer() { + return new KafkaProducer(new HashMap(producerConfig)); + } + + private void deferAppend(E event) { + queue.add(event); + } + + // drains queue events to super + private void ensureDeferredAppends() { + E event; + + while ((event = queue.poll()) != null) { + super.doAppend(event); + } + } + + /** + * Lazy initializer for producer, patterned after commons-lang. + * + * @see LazyInitializer + */ + private class LazyProducer { + + private volatile Producer producer; + + public Producer get() { + Producer result = this.producer; + if (result == null) { + synchronized(this) { + result = this.producer; + if(result == null) { + this.producer = result = this.initialize(); + } + } + } + + return result; + } + + protected Producer initialize() { + Producer producer = null; + try { + producer = createProducer(); + } catch (Exception e) { + addError("error creating producer", e); + } + return producer; + } + + public boolean isInitialized() { return producer != null; } + } } diff --git a/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppenderBase.java b/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppenderBase.java deleted file mode 100644 index 3810d4d..0000000 --- a/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppenderBase.java +++ /dev/null @@ -1,171 +0,0 @@ -package com.github.danielwegener.logback.kafka; - -import ch.qos.logback.classic.spi.ILoggingEvent; -import ch.qos.logback.core.Appender; -import ch.qos.logback.core.spi.AppenderAttachableImpl; -import com.github.danielwegener.logback.kafka.delivery.FailedDeliveryCallback; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.serialization.ByteArraySerializer; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class KafkaAppenderBase extends KafkaAppenderConfig { - - /** - * Kafka clients uses this prefix for its slf4j logging. - * This appender defers appends of any Kafka logs since it could cause harmful infinite recursion/self feeding effects. - */ - private static final String KAFKA_LOGGER_PREFIX = "org.apache.kafka.clients"; - - private LazyProducer lazyProducer = null; - private final AppenderAttachableImpl aai = new AppenderAttachableImpl(); - private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); - private final FailedDeliveryCallback failedDeliveryCallback = new FailedDeliveryCallback() { - @Override - public void onFailedDelivery(E evt, Throwable throwable) { - aai.appendLoopOnAppenders(evt); - } - }; - - public KafkaAppenderBase() { - // setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer) - addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); - } - - @Override - public void doAppend(E e) { - ensureDeferredAppends(); - if (e.getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) { - deferAppend(e); - } else { - super.doAppend(e); - } - } - - @Override - public void start() { - // only error free appenders should be activated - if (!checkPrerequisites()) return; - - lazyProducer = new LazyProducer(); - - super.start(); - } - - @Override - public void stop() { - super.stop(); - if (lazyProducer != null && lazyProducer.isInitialized()) { - try { - lazyProducer.get().close(); - } catch (KafkaException e) { - this.addWarn("Failed to shut down kafka producer: " + e.getMessage(), e); - } - lazyProducer = null; - } - } - - @Override - public void addAppender(Appender newAppender) { - aai.addAppender(newAppender); - } - - @Override - public Iterator> iteratorForAppenders() { - return aai.iteratorForAppenders(); - } - - @Override - public Appender getAppender(String name) { - return aai.getAppender(name); - } - - @Override - public boolean isAttached(Appender appender) { - return aai.isAttached(appender); - } - - @Override - public void detachAndStopAllAppenders() { - aai.detachAndStopAllAppenders(); - } - - @Override - public boolean detachAppender(Appender appender) { - return aai.detachAppender(appender); - } - - @Override - public boolean detachAppender(String name) { - return aai.detachAppender(name); - } - - @Override - protected void append(E e) { - final byte[] payload = encoder.doEncode(e); - final byte[] key = keyingStrategy.createKey(e); - final ProducerRecord record = new ProducerRecord(topic, key, payload); - deliveryStrategy.send(lazyProducer.get(), record, e, failedDeliveryCallback); - } - - protected Producer createProducer() { - return new KafkaProducer(new HashMap(producerConfig)); - } - - private void deferAppend(E event) { - queue.add(event); - } - - // drains queue events to super - private void ensureDeferredAppends() { - E event; - - while ((event = queue.poll()) != null) { - super.doAppend(event); - } - } - - /** - * Lazy initializer for producer, patterned after commons-lang. - * - * @see LazyInitializer - */ - private class LazyProducer { - - private volatile Producer producer; - - public Producer get() { - Producer result = this.producer; - if (result == null) { - synchronized(this) { - result = this.producer; - if(result == null) { - this.producer = result = this.initialize(); - } - } - } - - return result; - } - - protected Producer initialize() { - Producer producer = null; - try { - producer = createProducer(); - } catch (Exception e) { - addError("error creating producer", e); - } - return producer; - } - - public boolean isInitialized() { return producer != null; } - } - -} diff --git a/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppenderConfig.java b/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppenderConfig.java index 4cb5fc6..1ed9315 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppenderConfig.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/KafkaAppenderConfig.java @@ -14,12 +14,15 @@ import java.util.Map; import java.util.Set; +/** + * @since 0.0.1 + */ public abstract class KafkaAppenderConfig extends UnsynchronizedAppenderBase implements AppenderAttachable { protected String topic = null; protected KafkaMessageEncoder encoder = null; - protected KeyingStrategy keyingStrategy = null; + protected KeyingStrategy keyingStrategy = null; protected DeliveryStrategy deliveryStrategy; public static final Set KNOWN_PRODUCER_CONFIG_KEYS = new HashSet(); @@ -103,7 +106,7 @@ public void setTopic(String topic) { this.topic = topic; } - public void setKeyingStrategy(KeyingStrategy keyingStrategy) { + public void setKeyingStrategy(KeyingStrategy keyingStrategy) { this.keyingStrategy = keyingStrategy; } diff --git a/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java index 4f19aba..9eae002 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/delivery/AsynchronousDeliveryStrategy.java @@ -6,6 +6,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +/** + * @since 0.0.1 + */ public class AsynchronousDeliveryStrategy implements DeliveryStrategy { @Override diff --git a/src/main/java/com/github/danielwegener/logback/kafka/delivery/BlockingDeliveryStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/delivery/BlockingDeliveryStrategy.java index 8efe95c..79ae5b7 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/delivery/BlockingDeliveryStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/delivery/BlockingDeliveryStrategy.java @@ -15,7 +15,7 @@ /** * DeliveryStrategy that waits on the producer if the output buffer is full. * The wait timeout is configurable with {@link BlockingDeliveryStrategy#setTimeout(long)} - * + * @since 0.0.1 */ public class BlockingDeliveryStrategy extends ContextAwareBase implements DeliveryStrategy { diff --git a/src/main/java/com/github/danielwegener/logback/kafka/delivery/DeliveryStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/delivery/DeliveryStrategy.java index 697e1d0..ab67b78 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/delivery/DeliveryStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/delivery/DeliveryStrategy.java @@ -5,6 +5,7 @@ /** * Interface for DeliveryStrategies. + * @since 0.0.1 */ public interface DeliveryStrategy { diff --git a/src/main/java/com/github/danielwegener/logback/kafka/delivery/FailedDeliveryCallback.java b/src/main/java/com/github/danielwegener/logback/kafka/delivery/FailedDeliveryCallback.java index 433d043..b9ca2c7 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/delivery/FailedDeliveryCallback.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/delivery/FailedDeliveryCallback.java @@ -1,5 +1,8 @@ package com.github.danielwegener.logback.kafka.delivery; +/** + * @since 0.0.1 + */ public interface FailedDeliveryCallback { void onFailedDelivery(E evt, Throwable throwable); } diff --git a/src/main/java/com/github/danielwegener/logback/kafka/encoding/KafkaMessageEncoder.java b/src/main/java/com/github/danielwegener/logback/kafka/encoding/KafkaMessageEncoder.java index eb8d652..dc4e375 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/encoding/KafkaMessageEncoder.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/encoding/KafkaMessageEncoder.java @@ -3,14 +3,15 @@ /** * An Encoder that is able to take an {@code E} and return a {byte[]}. * This Encoder should naturally be referential transparent. - * @param the type of the logging event. + * @since 0.0.1 + * @param the type of the event. */ public interface KafkaMessageEncoder { /** * Encodes a loggingEvent into a byte array. - * @param loggingEvent the loggingEvent to be encoded. + * @param event the loggingEvent to be encoded. */ - byte[] doEncode(E loggingEvent); + byte[] doEncode(E event); } diff --git a/src/main/java/com/github/danielwegener/logback/kafka/encoding/KafkaMessageEncoderBase.java b/src/main/java/com/github/danielwegener/logback/kafka/encoding/KafkaMessageEncoderBase.java index 2093b95..d10bad3 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/encoding/KafkaMessageEncoderBase.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/encoding/KafkaMessageEncoderBase.java @@ -5,6 +5,7 @@ /** * A base class for {@link KafkaMessageEncoder}'s that are {@link ContextAwareBase} and have a {@link LifeCycle} + * @since 0.0.1 */ public abstract class KafkaMessageEncoderBase extends ContextAwareBase implements KafkaMessageEncoder, LifeCycle { diff --git a/src/main/java/com/github/danielwegener/logback/kafka/encoding/LayoutKafkaMessageEncoder.java b/src/main/java/com/github/danielwegener/logback/kafka/encoding/LayoutKafkaMessageEncoder.java new file mode 100644 index 0000000..cb95dd4 --- /dev/null +++ b/src/main/java/com/github/danielwegener/logback/kafka/encoding/LayoutKafkaMessageEncoder.java @@ -0,0 +1,56 @@ +package com.github.danielwegener.logback.kafka.encoding; + +import ch.qos.logback.core.Layout; + +import java.nio.charset.Charset; + +/** + * A KafkaMessageEncoder that can be configured with a {@link Layout} and a {@link Charset} and creates + * a serialized string for each event using the given layout. + * @since 0.1.0 + */ +public class LayoutKafkaMessageEncoder extends KafkaMessageEncoderBase { + + public LayoutKafkaMessageEncoder() { + } + + public LayoutKafkaMessageEncoder(Layout layout, Charset charset) { + this.layout = layout; + this.charset = charset; + } + + private Layout layout; + private Charset charset; + private static final Charset UTF8 = Charset.forName("UTF-8"); + + @Override + public void start() { + if (charset == null) { + addInfo("No charset specified for PatternLayoutKafkaEncoder. Using default UTF8 encoding."); + charset = UTF8; + } + super.start(); + } + + @Override + public byte[] doEncode(E event) { + final String message = layout.doLayout(event); + return message.getBytes(charset); + } + + public void setLayout(Layout layout) { + this.layout = layout; + } + + public void setCharset(Charset charset) { + this.charset = charset; + } + + public Layout getLayout() { + return layout; + } + + public Charset getCharset() { + return charset; + } +} diff --git a/src/main/java/com/github/danielwegener/logback/kafka/encoding/PatternLayoutKafkaMessageEncoder.java b/src/main/java/com/github/danielwegener/logback/kafka/encoding/PatternLayoutKafkaMessageEncoder.java index ae57f9c..ceb2599 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/encoding/PatternLayoutKafkaMessageEncoder.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/encoding/PatternLayoutKafkaMessageEncoder.java @@ -1,56 +1,19 @@ package com.github.danielwegener.logback.kafka.encoding; -import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Layout; import java.nio.charset.Charset; /** - * A KafkaMessageEncoder that can be configured with a {@link Layout} and a {@link Charset} and creates - * a serialized string for each logging event using the given layout. + * @deprecated Use LayoutKafkaMessageEncoder instead! + * @since 0.0.1 */ -public class PatternLayoutKafkaMessageEncoder extends KafkaMessageEncoderBase { +@Deprecated +public class PatternLayoutKafkaMessageEncoder extends LayoutKafkaMessageEncoder { - public PatternLayoutKafkaMessageEncoder() { - } - - public PatternLayoutKafkaMessageEncoder(Layout layout, Charset charset) { - this.layout = layout; - this.charset = charset; - } - - private Layout layout; - private Charset charset; - private static final Charset UTF8 = Charset.forName("UTF-8"); - - @Override - public void start() { - if (charset == null) { - addInfo("No charset specified for PatternLayoutKafkaEncoder. Using default UTF8 encoding."); - charset = UTF8; - } - super.start(); - } - - @Override - public byte[] doEncode(ILoggingEvent loggingEvent) { - final String message = layout.doLayout(loggingEvent); - return message.getBytes(charset); - } - - public void setLayout(Layout layout) { - this.layout = layout; - } - - public void setCharset(Charset charset) { - this.charset = charset; - } - - public Layout getLayout() { - return layout; - } + public PatternLayoutKafkaMessageEncoder() {} - public Charset getCharset() { - return charset; + public PatternLayoutKafkaMessageEncoder(Layout layout, Charset charset) { + super(layout, charset); } -} \ No newline at end of file +} diff --git a/src/main/java/com/github/danielwegener/logback/kafka/keying/ContextNameKeyingStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/keying/ContextNameKeyingStrategy.java index 1f6f5e3..7152d57 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/keying/ContextNameKeyingStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/keying/ContextNameKeyingStrategy.java @@ -11,8 +11,9 @@ * This strategy uses logbacks CONTEXT_NAME to partition the log messages to kafka. * This is ensures that all log messages logged by the same logging context will remain in the correct order for any consumer. * But this strategy can lead to uneven log distribution for a small number of hosts (compared to the number of partitions). + * @since 0.0.1 */ -public class ContextNameKeyingStrategy extends ContextAwareBase implements KeyingStrategy { +public class ContextNameKeyingStrategy extends ContextAwareBase implements KeyingStrategy { private byte[] contextNameHash = null; diff --git a/src/main/java/com/github/danielwegener/logback/kafka/keying/HostNameKeyingStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/keying/HostNameKeyingStrategy.java index 4b6e320..4d3d91f 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/keying/HostNameKeyingStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/keying/HostNameKeyingStrategy.java @@ -1,6 +1,5 @@ package com.github.danielwegener.logback.kafka.keying; -import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Context; import ch.qos.logback.core.CoreConstants; import ch.qos.logback.core.spi.ContextAwareBase; @@ -11,8 +10,9 @@ * This strategy uses the HOSTNAME to partition the log messages to kafka. * This is useful because it ensures that all log messages issued by this host will remain in the correct order for any consumer. * But this strategy can lead to uneven log distribution for a small number of hosts (compared to the number of partitions). + * @since 0.0.1 */ -public class HostNameKeyingStrategy extends ContextAwareBase implements KeyingStrategy { +public class HostNameKeyingStrategy extends ContextAwareBase implements KeyingStrategy { private byte[] hostnameHash = null; @@ -28,7 +28,7 @@ public void setContext(Context context) { } @Override - public byte[] createKey(ILoggingEvent e) { + public byte[] createKey(Object e) { return hostnameHash; } } diff --git a/src/main/java/com/github/danielwegener/logback/kafka/keying/KeyingStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/keying/KeyingStrategy.java index 54318ae..953ef07 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/keying/KeyingStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/keying/KeyingStrategy.java @@ -4,14 +4,15 @@ import ch.qos.logback.classic.spi.ILoggingEvent; /** - * A strategy that can create byte array key for a given {@link ILoggingEvent} + * A strategy that can create byte array key for a given {@link ILoggingEvent}. + * @since 0.0.1 */ -public interface KeyingStrategy { +public interface KeyingStrategy { /** * creates a byte array key for the given {@link ch.qos.logback.classic.spi.ILoggingEvent} * @param e the logging event */ - byte[] createKey(ILoggingEvent e); + byte[] createKey(E e); } diff --git a/src/main/java/com/github/danielwegener/logback/kafka/keying/LoggerNameKeyingStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/keying/LoggerNameKeyingStrategy.java index a8bb790..fc4a468 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/keying/LoggerNameKeyingStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/keying/LoggerNameKeyingStrategy.java @@ -8,8 +8,9 @@ * This strategy uses the logger name as partitioning key. This ensures that all messages logged by the * same logger will remain in the correct order for any consumer. * But this strategy can lead to uneven log distribution for a small number of distinct loggers (compared to the number of partitions). + * @since 0.0.1 */ -public class LoggerNameKeyingStrategy implements KeyingStrategy { +public class LoggerNameKeyingStrategy implements KeyingStrategy { @Override public byte[] createKey(ILoggingEvent e) { diff --git a/src/main/java/com/github/danielwegener/logback/kafka/keying/RoundRobinKeyingStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/keying/RoundRobinKeyingStrategy.java index 962706a..ead8bf9 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/keying/RoundRobinKeyingStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/keying/RoundRobinKeyingStrategy.java @@ -1,16 +1,14 @@ package com.github.danielwegener.logback.kafka.keying; - -import ch.qos.logback.classic.spi.ILoggingEvent; - /** * Evenly distributes all written log messages over all available kafka partitions. * This strategy can lead to unexpected read orders on clients. + * @since 0.0.1 */ -public class RoundRobinKeyingStrategy implements KeyingStrategy { +public class RoundRobinKeyingStrategy implements KeyingStrategy { @Override - public byte[] createKey(ILoggingEvent e) { + public byte[] createKey(Object e) { return null; } } diff --git a/src/main/java/com/github/danielwegener/logback/kafka/keying/ThreadNameKeyingStrategy.java b/src/main/java/com/github/danielwegener/logback/kafka/keying/ThreadNameKeyingStrategy.java index 5473c1d..6b70c1d 100644 --- a/src/main/java/com/github/danielwegener/logback/kafka/keying/ThreadNameKeyingStrategy.java +++ b/src/main/java/com/github/danielwegener/logback/kafka/keying/ThreadNameKeyingStrategy.java @@ -8,8 +8,9 @@ * This strategy uses the calling threads name as partitioning key. This ensures that all messages logged by the * same thread will remain in the correct order for any consumer. * But this strategy can lead to uneven log distribution for a small number of thread(-names) (compared to the number of partitions). + * @since 0.0.1 */ -public class ThreadNameKeyingStrategy implements KeyingStrategy { +public class ThreadNameKeyingStrategy implements KeyingStrategy { @Override public byte[] createKey(ILoggingEvent e) { diff --git a/src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderIT.java b/src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderIT.java index 623652d..d98c48e 100644 --- a/src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderIT.java +++ b/src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderIT.java @@ -8,7 +8,7 @@ import ch.qos.logback.classic.spi.LoggingEvent; import ch.qos.logback.core.status.Status; import ch.qos.logback.core.status.StatusListener; -import com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder; +import com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder; import com.github.danielwegener.logback.kafka.keying.RoundRobinKeyingStrategy; import com.github.danielwegener.logback.kafka.util.TestKafka; import kafka.consumer.Consumer; @@ -26,7 +26,6 @@ import java.io.IOException; import java.nio.charset.Charset; -import java.util.Collections; import java.util.Properties; import java.util.Random; @@ -40,7 +39,7 @@ public class KafkaAppenderIT { public ErrorCollector collector= new ErrorCollector(); private TestKafka kafka; - private KafkaAppenderBase unit; + private KafkaAppender unit; private LoggerContext loggerContext; @@ -68,12 +67,12 @@ public void addStatusEvent(Status status) { }); loggerContext.putProperty("HOSTNAME","localhost"); - unit = new KafkaAppenderBase(); + unit = new KafkaAppender(); final PatternLayout patternLayout = new PatternLayout(); patternLayout.setPattern("%msg"); patternLayout.setContext(loggerContext); patternLayout.start(); - unit.setEncoder(new PatternLayoutKafkaMessageEncoder(patternLayout, Charset.forName("UTF-8"))); + unit.setEncoder(new LayoutKafkaMessageEncoder(patternLayout, Charset.forName("UTF-8"))); unit.setTopic("logs"); unit.setName("TestKafkaAppender"); unit.setContext(loggerContext); diff --git a/src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderBaseTest.java b/src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderTest.java similarity index 95% rename from src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderBaseTest.java rename to src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderTest.java index 999f59a..06dc3dd 100644 --- a/src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderBaseTest.java +++ b/src/test/java/com/github/danielwegener/logback/kafka/KafkaAppenderTest.java @@ -25,13 +25,13 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.*; -public class KafkaAppenderBaseTest { +public class KafkaAppenderTest { - private final KafkaAppenderBase unit = new KafkaAppenderBase(); + private final KafkaAppender unit = new KafkaAppender(); private final LoggerContext ctx = new LoggerContext(); @SuppressWarnings("unchecked") private final KafkaMessageEncoder encoder = mock(KafkaMessageEncoder.class); - private final KeyingStrategy keyingStrategy = mock(KeyingStrategy.class); + private final KeyingStrategy keyingStrategy = mock(KeyingStrategy.class); @SuppressWarnings("unchecked") private final DeliveryStrategy deliveryStrategy = mock(DeliveryStrategy.class); diff --git a/src/test/java/com/github/danielwegener/logback/kafka/encoding/PatternLayoutKafkaMessageEncoderTest.java b/src/test/java/com/github/danielwegener/logback/kafka/encoding/LayoutKafkaMessageEncoderTest.java similarity index 86% rename from src/test/java/com/github/danielwegener/logback/kafka/encoding/PatternLayoutKafkaMessageEncoderTest.java rename to src/test/java/com/github/danielwegener/logback/kafka/encoding/LayoutKafkaMessageEncoderTest.java index 8a73bb7..aca7327 100644 --- a/src/test/java/com/github/danielwegener/logback/kafka/encoding/PatternLayoutKafkaMessageEncoderTest.java +++ b/src/test/java/com/github/danielwegener/logback/kafka/encoding/LayoutKafkaMessageEncoderTest.java @@ -4,6 +4,7 @@ import ch.qos.logback.classic.Logger; import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.PatternLayout; +import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.classic.spi.LoggingEvent; import ch.qos.logback.core.Layout; import org.hamcrest.Matchers; @@ -15,11 +16,11 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -public class PatternLayoutKafkaMessageEncoderTest { +public class LayoutKafkaMessageEncoderTest { private static final Charset UTF8 = Charset.forName("UTF-8"); private final PatternLayout layout = new PatternLayout(); - private final PatternLayoutKafkaMessageEncoder unit = new PatternLayoutKafkaMessageEncoder(layout, UTF8); + private final LayoutKafkaMessageEncoder unit = new LayoutKafkaMessageEncoder(layout, UTF8); @Test public void testStart() { @@ -50,4 +51,4 @@ public void testGetCharset() { } -} \ No newline at end of file +} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 63af537..565c634 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -1,7 +1,7 @@ - + %msg