Skip to content
This repository has been archived by the owner on May 27, 2022. It is now read-only.

Commit

Permalink
! make kafka-logback-appender generic in the event type to support al…
Browse files Browse the repository at this point in the history
…ternative events like logback-access (#24). Rename PatternLayoutKafkaMessageEncoder to LayoutKafkaMessageEncoder (with deprecated class alias) (#9). Add javadoc since hint. Switch to 0.1.0-SNAPSHOT version.
  • Loading branch information
danielwegener committed Jan 29, 2016
1 parent ba6f048 commit c137d62
Show file tree
Hide file tree
Showing 25 changed files with 294 additions and 262 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
All notable changes to this project will be documented in this file.

## [Unreleased]
###Changed
- Class `com.github.danielwegener.logback.kafka.KafkaAppenderBase` has been inlined into the `KafkaAppender`.
- Class `com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder` has been renamed to `com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder` (#9). To ease the migration, there is still a deprecated class alias `PatternLayoutKafkaMessageEncoder`. You should change your logback.xml to `LayoutKafkaMessageEncoder` as soon as possible!
- `KafkaAppender`, `KeyingStrategy` and `LayoutKafkaMessageEncoder` are now generic and can now be used with alternative logback implementations like logback-access (#16)!

## [0.0.5] - 2015-12-23
###Changed
Expand Down Expand Up @@ -37,4 +41,4 @@ All notable changes to this project will be documented in this file.
[0.0.4]: https://github.com/danielwegener/logback-kafka-appender/compare/logback-kafka-appender-0.0.3...logback-kafka-appender-0.0.4
[0.0.3]: https://github.com/danielwegener/logback-kafka-appender/compare/logback-kafka-appender-0.0.2...logback-kafka-appender-0.0.3
[0.0.2]: https://github.com/danielwegener/logback-kafka-appender/compare/logback-kafka-appender-0.0.1...logback-kafka-appender-0.0.2
[0.0.1]: https://github.com/danielwegener/logback-kafka-appender/compare/465947...logback-kafka-appender-0.0.1
[0.0.1]: https://github.com/danielwegener/logback-kafka-appender/compare/465947...logback-kafka-appender-0.0.1
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ This is an example `logback.xml` that uses a common `PatternLayout` to encode a
<!-- This is the kafkaAppender -->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<!-- This is the default encoder that encodes every log message to an utf8-encoded string -->
<encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</layout>
Expand Down Expand Up @@ -145,11 +145,13 @@ Just roll your own `KafkaMessageEncoder`. The interface is quite simple:
```java
package com.github.danielwegener.logback.kafka.encoding;
public interface KafkaMessageEncoder<E> {
byte[] doEncode(E loggingEvent);
byte[] doEncode(E event);
}

```
Your encoder should be type-parameterized for any subtype of ILoggingEvent like in

Your encoder should be type-parameterized for any subtype of the type of event you want to support (typically `ILoggingEvent`) like in

```java
public class MyEncoder extends KafkaMessageEncoderBase<ILoggingEvent> { //...
```
Expand All @@ -174,9 +176,9 @@ The kafka producer client uses a messages key as partitioner. Thus `logback-kafk
|---|---|
| `RoundRobinPartitioningStrategy` (default) | Evenly distributes all written log messages over all available kafka partitions. This strategy may lead to unexpected read orders on clients. |
| `HostNamePartitioningStrategy` | This strategy uses the HOSTNAME to partition the log messages to kafka. This is useful because it ensures that all log messages issued by this host will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of hosts (compared to the number of partitions). |
| `ContextNamePartitioningStrategy` | This strategy uses logbacks CONTEXT_NAME to partition the log messages to kafka. This is ensures that all log messages logged by the same logging context will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of hosts (compared to the number of partitions). |
| `ThreadNamePartitioningStrategy` | This strategy uses the calling threads name as partitioning key. This ensures that all messages logged by the same thread will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of thread(-names) (compared to the number of partitions). |
| `LoggerNamePartitioningStrategy` | * This strategy uses the logger name as partitioning key. This ensures that all messages logged by the same logger will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of distinct loggers (compared to the number of partitions). |
| `ContextNamePartitioningStrategy` | This strategy uses logbacks CONTEXT_NAME to partition the log messages to kafka. This is ensures that all log messages logged by the same logging context will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of hosts (compared to the number of partitions). This strategy only works for `ILoggingEvents`. |
| `ThreadNamePartitioningStrategy` | This strategy uses the calling threads name as partitioning key. This ensures that all messages logged by the same thread will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of thread(-names) (compared to the number of partitions). This strategy only works for `ILoggingEvents`. |
| `LoggerNamePartitioningStrategy` | * This strategy uses the logger name as partitioning key. This ensures that all messages logged by the same logger will remain in the correct order for any consumer. But this strategy can lead to uneven log distribution for a small number of distinct loggers (compared to the number of partitions). This strategy only works for `ILoggingEvents`. |



Expand All @@ -188,7 +190,7 @@ If none of the above partitioners satisfies your requirements, you can easily im
package foo;
com.github.danielwegener.logback.kafka.keying.KeyingStrategy;

public class LevelKeyingStrategy implements KeyingStrategy {
public class LevelKeyingStrategy implements KeyingStrategy<ILoggingEvent> {
@Override
public byte[] createKey(ILoggingEvent e) {
return ByteBuffer.allocate(4).putInt(e.getLevel()).array();
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.0.6-SNAPSHOT</version>
<version>0.1.0-SNAPSHOT</version>
<parent>
<groupId>org.sonatype.oss</groupId>
<artifactId>oss-parent</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions src/example/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<!-- This example configuration is probably most unreliable under
failure conditions but wont block your application at all -->
<appender name="very-relaxed-and-fast-kafka-appender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</layout>
Expand Down Expand Up @@ -47,7 +47,7 @@
<!-- This example configuration is more restrictive and will try to ensure that every message
is eventually delivered in an ordered fashion (as long the logging application stays alive) -->
<appender name="very-restrictive-kafka-appender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="com.github.danielwegener.logback.kafka.encoding.PatternLayoutKafkaMessageEncoder">
<encoder class="com.github.danielwegener.logback.kafka.encoding.LayoutKafkaMessageEncoder">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</layout>
Expand Down Expand Up @@ -84,4 +84,4 @@
<appender-ref ref="very-relaxed-and-fast-kafka-appender" />
<appender-ref ref="very-restrictive-kafka-appender" />
</root>
</configuration>
</configuration>
Original file line number Diff line number Diff line change
@@ -1,10 +1,174 @@
package com.github.danielwegener.logback.kafka;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appender;
import ch.qos.logback.core.spi.AppenderAttachableImpl;
import com.github.danielwegener.logback.kafka.delivery.FailedDeliveryCallback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* See <a href="https://github.com/danielwegener/logback-kafka-appender">logback-kafka-appender at github</a>
* @since 0.0.1
*/
public class KafkaAppender extends KafkaAppenderBase<ILoggingEvent> {
public class KafkaAppender<E> extends KafkaAppenderConfig<E> {

/**
* Kafka clients uses this prefix for its slf4j logging.
* This appender defers appends of any Kafka logs since it could cause harmful infinite recursion/self feeding effects.
*/
private static final String KAFKA_LOGGER_PREFIX = "org.apache.kafka.clients";

private LazyProducer lazyProducer = null;
private final AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>();
private final ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
private final FailedDeliveryCallback<E> failedDeliveryCallback = new FailedDeliveryCallback<E>() {
@Override
public void onFailedDelivery(E evt, Throwable throwable) {
aai.appendLoopOnAppenders(evt);
}
};

public KafkaAppender() {
// setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer)
addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
}

@Override
public void doAppend(E e) {
ensureDeferredAppends();
if (e instanceof ILoggingEvent && ((ILoggingEvent)e).getLoggerName().startsWith(KAFKA_LOGGER_PREFIX)) {
deferAppend(e);
} else {
super.doAppend(e);
}
}

@Override
public void start() {
// only error free appenders should be activated
if (!checkPrerequisites()) return;

lazyProducer = new LazyProducer();

super.start();
}

@Override
public void stop() {
super.stop();
if (lazyProducer != null && lazyProducer.isInitialized()) {
try {
lazyProducer.get().close();
} catch (KafkaException e) {
this.addWarn("Failed to shut down kafka producer: " + e.getMessage(), e);
}
lazyProducer = null;
}
}

@Override
public void addAppender(Appender<E> newAppender) {
aai.addAppender(newAppender);
}

@Override
public Iterator<Appender<E>> iteratorForAppenders() {
return aai.iteratorForAppenders();
}

@Override
public Appender<E> getAppender(String name) {
return aai.getAppender(name);
}

@Override
public boolean isAttached(Appender<E> appender) {
return aai.isAttached(appender);
}

@Override
public void detachAndStopAllAppenders() {
aai.detachAndStopAllAppenders();
}

@Override
public boolean detachAppender(Appender<E> appender) {
return aai.detachAppender(appender);
}

@Override
public boolean detachAppender(String name) {
return aai.detachAppender(name);
}

@Override
protected void append(E e) {
final byte[] payload = encoder.doEncode(e);
final byte[] key = keyingStrategy.createKey(e);
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[],byte[]>(topic, key, payload);
deliveryStrategy.send(lazyProducer.get(), record, e, failedDeliveryCallback);
}

protected Producer<byte[], byte[]> createProducer() {
return new KafkaProducer<byte[], byte[]>(new HashMap<String, Object>(producerConfig));
}

private void deferAppend(E event) {
queue.add(event);
}

// drains queue events to super
private void ensureDeferredAppends() {
E event;

while ((event = queue.poll()) != null) {
super.doAppend(event);
}
}

/**
* Lazy initializer for producer, patterned after commons-lang.
*
* @see <a href="https://commons.apache.org/proper/commons-lang/javadocs/api-3.4/org/apache/commons/lang3/concurrent/LazyInitializer.html">LazyInitializer</a>
*/
private class LazyProducer {

private volatile Producer<byte[], byte[]> producer;

public Producer<byte[], byte[]> get() {
Producer<byte[], byte[]> result = this.producer;
if (result == null) {
synchronized(this) {
result = this.producer;
if(result == null) {
this.producer = result = this.initialize();
}
}
}

return result;
}

protected Producer<byte[], byte[]> initialize() {
Producer<byte[], byte[]> producer = null;
try {
producer = createProducer();
} catch (Exception e) {
addError("error creating producer", e);
}
return producer;
}

public boolean isInitialized() { return producer != null; }
}

}

0 comments on commit c137d62

Please sign in to comment.