Skip to content

Commit

Permalink
force users to explicitly set an ExceptionHandler on the Disruptor in…
Browse files Browse the repository at this point in the history
…stance
  • Loading branch information
pricem committed Feb 6, 2015
1 parent e3ce22b commit 6313bda
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 20 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ apply plugin: 'checkstyle'
defaultTasks 'checkstyleTest', 'checkstyleMain', 'build'

group = 'com.lmax'
version = new Version(major: 2, minor: 0, revision: 0)
version = new Version(major: 2, minor: 0, revision: 1)

ext {
fullName = 'Disruptor-Proxy'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public interface RingBufferProxyGenerator
* @param implementation the implementation object to be invoked by the Disruptor event handler
* @return an implementation of type T
*/
<T> T createRingBufferProxy(final Class<T> definition, final Disruptor<ProxyMethodInvocation> disruptor, final OverflowStrategy overflowStrategy, final T implementation);
<T> T createRingBufferProxy(final Class<T> definition, final Disruptor<ProxyMethodInvocation> disruptor,
final OverflowStrategy overflowStrategy, final T implementation);

/**
* Create a disruptor proxy with multiple implementation instances
Expand Down
48 changes: 48 additions & 0 deletions src/main/java/com/lmax/tool/disruptor/Validation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2015 LMAX Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.lmax.tool.disruptor;

import com.lmax.disruptor.dsl.Disruptor;

import java.lang.reflect.Field;

public enum Validation
{
VALIDATION;

public void ensureDisruptorInstanceHasAnExceptionHandler(final Disruptor<?> disruptor)
{
try
{
final Field field = Disruptor.class.getDeclaredField("exceptionHandler");
field.setAccessible(true);
if(field.get(disruptor) == null)
{
throw new IllegalStateException("Please supply an ExceptionHandler to the Disruptor instance. " +
"The default Disruptor behaviour is to stop processing when an exception occurs.");
}
}
catch (NoSuchFieldException e)
{
throw new RuntimeException("Unable to inspect Disruptor instance", e);
}
catch (IllegalAccessException e)
{
throw new RuntimeException("Unable to inspect Disruptor instance", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

package com.lmax.tool.disruptor.bytecode;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.tool.disruptor.Invoker;
import com.lmax.tool.disruptor.InvokerEventHandler;
import com.lmax.tool.disruptor.OverflowStrategy;
import com.lmax.tool.disruptor.ProxyMethodInvocation;
import com.lmax.tool.disruptor.ResetHandler;
import com.lmax.tool.disruptor.Resetable;
import com.lmax.tool.disruptor.RingBufferProxyGenerator;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import javassist.CannotCompileException;
import javassist.ClassPool;
import javassist.CtClass;
Expand All @@ -40,6 +40,7 @@
import java.util.HashMap;
import java.util.Map;

import static com.lmax.tool.disruptor.Validation.VALIDATION;
import static com.lmax.tool.disruptor.bytecode.ByteCodeHelper.addInterface;
import static com.lmax.tool.disruptor.bytecode.ByteCodeHelper.createField;
import static com.lmax.tool.disruptor.bytecode.ByteCodeHelper.createMethod;
Expand All @@ -66,6 +67,8 @@ public GeneratedRingBufferProxyGenerator()
@Override
public <T> T createRingBufferProxy(final Class<T> definition, final Disruptor<ProxyMethodInvocation> disruptor, final OverflowStrategy overflowStrategy, final T implementation)
{
VALIDATION.ensureDisruptorInstanceHasAnExceptionHandler(disruptor);

disruptor.handleEventsWith(new InvokerEventHandler<T>(implementation));

final ArgumentHolderGenerator argumentHolderGenerator = new ArgumentHolderGenerator(classPool);
Expand All @@ -86,6 +89,8 @@ public <T> T createRingBufferProxy(final Class<T> definition, final Disruptor<Pr
public <T> T createRingBufferProxy(final Class<T> definition, final Disruptor<ProxyMethodInvocation> disruptor,
final OverflowStrategy overflowStrategy, final T... implementations)
{
VALIDATION.ensureDisruptorInstanceHasAnExceptionHandler(disruptor);

if (implementations.length < 1)
{
throw new IllegalArgumentException("Must have at least one implementation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static com.lmax.tool.disruptor.Validation.VALIDATION;
import static java.lang.Thread.currentThread;
import static java.lang.reflect.Proxy.newProxyInstance;

Expand All @@ -44,6 +45,8 @@ public final class ReflectiveRingBufferProxyGenerator implements RingBufferProxy
@Override
public <T> T createRingBufferProxy(final Class<T> definition, final Disruptor<ProxyMethodInvocation> disruptor, final OverflowStrategy overflowStrategy, final T implementation)
{
VALIDATION.ensureDisruptorInstanceHasAnExceptionHandler(disruptor);

final RingBufferInvocationHandler invocationHandler = createInvocationHandler(definition, disruptor, overflowStrategy);
preallocateArgumentHolders(disruptor.getRingBuffer());

Expand All @@ -60,6 +63,8 @@ public <T> T createRingBufferProxy(final Class<T> definition, final Disruptor<Pr
public <T> T createRingBufferProxy(final Class<T> definition, final Disruptor<ProxyMethodInvocation> disruptor,
final OverflowStrategy overflowStrategy, final T... implementations)
{
VALIDATION.ensureDisruptorInstanceHasAnExceptionHandler(disruptor);

if (implementations.length < 1)
{
throw new IllegalArgumentException("Must have at least one implementation");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.lmax.tool.disruptor;

import com.lmax.disruptor.FatalExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.junit.Test;
Expand All @@ -40,12 +41,21 @@ protected AbstractRingBufferProxyGeneratorTest(final GeneratorType generatorType
this.generatorType = generatorType;
}

@Test(expected = IllegalStateException.class)
public void shouldThrowExceptionIfDisruptorInstanceDoesNotHaveAnExceptionHandler() throws Exception
{
final Disruptor<ProxyMethodInvocation> disruptor =
new Disruptor<ProxyMethodInvocation>(new RingBufferProxyEventFactory(), 1024, Executors.newSingleThreadExecutor());
final RingBufferProxyGeneratorFactory generatorFactory = new RingBufferProxyGeneratorFactory();
final RingBufferProxyGenerator ringBufferProxyGenerator = generatorFactory.create(generatorType);
final ListenerImpl implementation = new ListenerImpl();
ringBufferProxyGenerator.createRingBufferProxy(Listener.class, disruptor, OverflowStrategy.DROP, implementation);
}

@Test
public void shouldProxy()
{
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Disruptor<ProxyMethodInvocation> disruptor =
new Disruptor<ProxyMethodInvocation>(new RingBufferProxyEventFactory(), 1024, executor);
final Disruptor<ProxyMethodInvocation> disruptor = createDisruptor(Executors.newSingleThreadExecutor(), 1024);
final RingBufferProxyGeneratorFactory generatorFactory = new RingBufferProxyGeneratorFactory();
final RingBufferProxyGenerator ringBufferProxyGenerator = generatorFactory.create(generatorType);

Expand All @@ -69,7 +79,7 @@ public void shouldProxy()
}

disruptor.shutdown();
executor.shutdown();
Executors.newSingleThreadExecutor().shutdown();

assertThat(implementation.getLastStringValue(), is("single string 2"));
assertThat(implementation.getLastFloatValue(), is((float) 2));
Expand All @@ -79,12 +89,17 @@ public void shouldProxy()
assertThat(implementation.getLastDoubleArray(), is(equalTo(new Double[] {(double) 2})));
}

private Disruptor<ProxyMethodInvocation> createDisruptor(final ExecutorService executor, final int ringBufferSize)
{
final Disruptor<ProxyMethodInvocation> disruptor = new Disruptor<ProxyMethodInvocation>(new RingBufferProxyEventFactory(), ringBufferSize, executor);
disruptor.handleExceptionsWith(new FatalExceptionHandler());
return disruptor;
}

@Test
public void shouldProxyMultipleImplementations()
{
final ExecutorService executor = Executors.newCachedThreadPool();
final Disruptor<ProxyMethodInvocation> disruptor =
new Disruptor<ProxyMethodInvocation>(new RingBufferProxyEventFactory(), 1024, executor);
final Disruptor<ProxyMethodInvocation> disruptor = createDisruptor(Executors.newCachedThreadPool(), 1024);
final RingBufferProxyGeneratorFactory generatorFactory = new RingBufferProxyGeneratorFactory();
final RingBufferProxyGenerator ringBufferProxyGenerator = generatorFactory.create(generatorType);

Expand Down Expand Up @@ -112,7 +127,7 @@ public void shouldProxyMultipleImplementations()
}

disruptor.shutdown();
executor.shutdown();
Executors.newCachedThreadPool().shutdown();

for (ListenerImpl implementation : implementations)
{
Expand All @@ -128,9 +143,7 @@ public void shouldProxyMultipleImplementations()
@Test
public void shouldDropMessagesIfRingBufferIsFull() throws Exception
{
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Disruptor<ProxyMethodInvocation> disruptor =
new Disruptor<ProxyMethodInvocation>(new RingBufferProxyEventFactory(), 4, executor);
final Disruptor<ProxyMethodInvocation> disruptor = createDisruptor(Executors.newSingleThreadExecutor(), 4);
final RingBufferProxyGeneratorFactory generatorFactory = new RingBufferProxyGeneratorFactory();
final RingBufferProxyGenerator ringBufferProxyGenerator = generatorFactory.create(generatorType);

Expand All @@ -149,17 +162,15 @@ public void shouldDropMessagesIfRingBufferIsFull() throws Exception
Thread.sleep(250L);

disruptor.shutdown();
executor.shutdown();
Executors.newSingleThreadExecutor().shutdown();

assertThat(implementation.getInvocationCount(), is(4));
}

@Test
public void shouldNotifyBatchListenerImplementationOfEndOfBatch() throws Exception
{
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Disruptor<ProxyMethodInvocation> disruptor =
new Disruptor<ProxyMethodInvocation>(new RingBufferProxyEventFactory(), 4, executor);
final Disruptor<ProxyMethodInvocation> disruptor = createDisruptor(Executors.newSingleThreadExecutor(), 4);
final RingBufferProxyGeneratorFactory generatorFactory = new RingBufferProxyGeneratorFactory();
final RingBufferProxyGenerator ringBufferProxyGenerator = generatorFactory.create(generatorType);

Expand Down Expand Up @@ -195,7 +206,7 @@ public void shouldNotifyBatchListenerImplementationOfEndOfBatch() throws Excepti
}

disruptor.shutdown();
executor.shutdown();
Executors.newSingleThreadExecutor().shutdown();

assertThat(implementation.getBatchCount() > firstBatchCount, is(true));
}
Expand Down

0 comments on commit 6313bda

Please sign in to comment.