Skip to content

Commit

Permalink
Restore support for Disruptor 3.x
Browse files Browse the repository at this point in the history
  • Loading branch information
ppkarwasz committed Dec 19, 2023
1 parent ee58635 commit 4a7f37a
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceReportingEventHandler;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
Expand Down Expand Up @@ -97,7 +98,9 @@ private static class Log4jEventWrapperHandler implements EventHandler<Log4jEvent
private Sequence sequenceCallback;
private int counter;

@Override
/*
* Overrides a method from Disruptor 4.x. Do not remove.
*/
public void setSequenceCallback(final Sequence sequenceCallback) {
this.sequenceCallback = sequenceCallback;
}
Expand All @@ -124,6 +127,12 @@ private void notifyIntermediateProgress(final long sequence) {
}
}

/**
* A version of Log4jEventWrapperHandler for LMAX Disruptor 3.x.
*/
private static final class Log4jEventWrapperHandler3 extends Log4jEventWrapperHandler
implements SequenceReportingEventHandler<Log4jEventWrapper> {}

/**
* Factory used to populate the RingBuffer with events. These event objects are then re-used during the life of the
* RingBuffer.
Expand Down Expand Up @@ -155,6 +164,10 @@ private void notifyIntermediateProgress(final long sequence) {
ringBufferElement.loggerConfig = loggerConfig;
};

private Log4jEventWrapperHandler createEventHandler() {
return DisruptorUtil.DISRUPTOR_VERSION == 3 ? new Log4jEventWrapperHandler3() : new Log4jEventWrapperHandler();
}

private int ringBufferSize;
private AsyncQueueFullPolicy asyncQueueFullPolicy;
private Boolean mutable = Boolean.FALSE;
Expand Down Expand Up @@ -220,7 +233,7 @@ public Thread newThread(final Runnable r) {
final ExceptionHandler<Log4jEventWrapper> errorHandler = DisruptorUtil.getAsyncLoggerConfigExceptionHandler();
disruptor.setDefaultExceptionHandler(errorHandler);

final Log4jEventWrapperHandler[] handlers = {new Log4jEventWrapperHandler()};
final Log4jEventWrapperHandler[] handlers = {createEventHandler()};
disruptor.handleEventsWith(handlers);

LOGGER.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public Thread newThread(final Runnable r) {
final ExceptionHandler<RingBufferLogEvent> errorHandler = DisruptorUtil.getAsyncLoggerExceptionHandler();
disruptor.setDefaultExceptionHandler(errorHandler);

final RingBufferLogEventHandler[] handlers = {new RingBufferLogEventHandler()};
final RingBufferLogEventHandler[] handlers = {RingBufferLogEventHandler.create()};
disruptor.handleEventsWith(handlers);

LOGGER.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,19 @@ static WaitStrategy createDefaultWaitStrategy(final String propertyName) {
LOGGER.trace(
"DefaultAsyncWaitStrategyFactory creating TimeoutBlockingWaitStrategy(timeout={}, unit=MILLIS)",
timeoutMillis);
// Check for the v 4.x version of the strategy
try {
final Class<? extends WaitStrategy> strategyClass =
(Class<? extends WaitStrategy>) Class.forName("com.lmax.disruptor.TimeoutBlockingWaitStrategy");
return strategyClass
.getConstructor(long.class, TimeUnit.class)
.newInstance(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (final ReflectiveOperationException e) {
LOGGER.debug(
"DefaultAsyncWaitStrategyFactory failed to load 'com.lmax.disruptor.TimeoutBlockingWaitStrategy', using '{}' instead.",
TimeoutBlockingWaitStrategy.class.getName());
}
// Use our version
return new TimeoutBlockingWaitStrategy(timeoutMillis, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ final class DisruptorUtil {
static final boolean ASYNC_CONFIG_SYNCHRONIZE_ENQUEUE_WHEN_QUEUE_FULL = PropertiesUtil.getProperties()
.getBooleanProperty("AsyncLoggerConfig.SynchronizeEnqueueWhenQueueFull", true);

static final int DISRUPTOR_VERSION =
LoaderUtil.isClassAvailable("com.lmax.disruptor.SequenceReportingEventHandler") ? 3 : 4;

private DisruptorUtil() {}

static WaitStrategy createWaitStrategy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.logging.log4j.core.async;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceReportingEventHandler;

/**
* This event handler gets passed messages from the RingBuffer as they become
Expand All @@ -32,7 +34,24 @@ public class RingBufferLogEventHandler implements EventHandler<RingBufferLogEven
private int counter;
private long threadId = -1;

@Override
/**
* Returns the appropriate {@link EventHandler} for the version of LMAX Disruptor used.
*/
public static RingBufferLogEventHandler create() {
return DisruptorUtil.DISRUPTOR_VERSION == 3
? new RingBufferLogEventHandler3()
: new RingBufferLogEventHandler();
}

/**
* @deprecated Use the {@link #create()} factory method instead.
*/
@Deprecated
public RingBufferLogEventHandler() {}

/*
* Overrides a method from Disruptor 4.x. Do not remove.
*/
public void setSequenceCallback(final Sequence sequenceCallback) {
this.sequenceCallback = sequenceCallback;
}
Expand Down Expand Up @@ -67,17 +86,25 @@ private void notifyCallback(final long sequence) {
/**
* Returns the thread ID of the background consumer thread, or {@code -1} if the background thread has not started
* yet.
*
* @return the thread ID of the background consumer thread, or {@code -1}
*/
public long getThreadId() {
return threadId;
}

@Override
/*
* Overrides a method from Disruptor 4.x. Do not remove.
*/
public void onStart() {
threadId = Thread.currentThread().getId();
}

@Override
/*
* Overrides a method from Disruptor 4.x. Do not remove.
*/
public void onShutdown() {}

private static class RingBufferLogEventHandler3 extends RingBufferLogEventHandler
implements SequenceReportingEventHandler<RingBufferLogEvent>, LifecycleAware {}
}
2 changes: 1 addition & 1 deletion log4j-parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
<commons-logging.version>1.3.0</commons-logging.version>
<!-- `com.conversantmedia:disruptor` version 1.2.16 requires Java 9: -->
<conversant.disruptor.version>1.2.15</conversant.disruptor.version>
<disruptor.version>4.0.0</disruptor.version>
<disruptor.version>3.4.4</disruptor.version>
<elasticsearch-java.version>8.11.2</elasticsearch-java.version>
<embedded-ldap.version>0.9.0</embedded-ldap.version>
<felix.version>7.0.5</felix.version>
Expand Down

0 comments on commit 4a7f37a

Please sign in to comment.