What is the correct way to add/remove handlers dynamically? #30

Closed
lutovich opened this Issue Nov 10, 2012 · 8 comments

Comments

Projects
None yet
3 participants

Hello, I have the situation when I need dynamically add and remove handlers.
My code looks like the this:

addHandler(EventHandler handler) {
    SequenceBarrier barrier = ringBuffer.newBarrier();
    BatchEventProcessor processor = new BatchEventProcessor(ringBuffer, barrier, handler);
    processor.getSequence().set(barrier.getCursor());
    sequenceGroup.add(batchEventProcessor.getSequence());
    processor.getSequence().set(ringBuffer.getCursor());
    processor.run();
}

removeHandler(BatchEventProcessor processor) {
    processor.halt();
    sequenceGroup.remove(processor.getSequence());
} 

Is this a correct setup?

I'm asking this because very rarely I'm getting the following exception on handler removal:
java.lang.ArrayIndexOutOfBoundsException: 0
at com.lmax.disruptor.SequenceGroup.remove(SequenceGroup.java:116)
at removeHandler()

and I was not able to find the issue for now :(

Using disruptor 2.10.3 with java 1.7.3 on Windows 7 x64.

Thanks in advance!

Contributor

mikeb01 commented Nov 11, 2012

Your code looks correct. I'll see if I can reproduce the issue. If you have a simple unit test the reproduces the issue it would be appreciated.

Contributor

mikeb01 commented Nov 11, 2012

I think I know what the problem is. I get the same exception if I try to remove a Sequence that does not exist. E.g. this fails:

SequenceGroup group = new SequenceGroup();
group.add(new Sequence());
group.add(new Sequence());
group.remove(new Sequence());

I'll look at fixing this both in master and on the 2.x branch.

mikeb01 closed this in 2ccc6f6 Nov 11, 2012

Thank you for the fix.
I have some questions about it.

  1. Is it safe to delete all occurrences of the given Sequence from the SequenceGroup and not only the first one?
  2. Is this:
processor.getSequence().set(barrier.getCursor());
...
processor.getSequence().set(ringBuffer.getCursor());

the place where I can get the different (by reference) Sequence in processor and sequenceGroup? And if I get different sequences than after your fix the handler will stop but the sequence won't be deleted from the sequenceGroup.

Will highly appreciate your answers.

Contributor

mikeb01 commented Nov 11, 2012

  1. Yes. You shouldn't really be adding multiple instances, but just in case we remove them all.
  2. I don't really understand the question that you're asking.

If i got that ArrayIndexOutOfBoundsException than I was trying to remove the sequence that was not present in the sequence group. I do not understand how it can happen, because I'm passing the BatchEventProcessor associated with a particular EventHandler to the removeHandler(...) method (I'm keeping a Map<EventHandler, BatchEventProcessor> for that).

Contributor

mikeb01 commented Nov 11, 2012

I don't know, but that was the only use case I could find that failed in the way that you reported. Try the head of the version-2.x branch and see if that fixes the problem.

Yes, it fixes the problem.
The following code is the one that showed the issue initially:

import com.lmax.disruptor.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

class MyDisruptor {

    private final RingBuffer<Holder> ringBuffer;
    private final SequenceGroup sequenceGroup = new SequenceGroup();
    private final Map<EventHandler<Holder>, BatchEventProcessor<Holder>> handlersWithProcessors = new ConcurrentHashMap<>();

    public MyDisruptor() {
        ringBuffer = new RingBuffer<>(Holder.EVENT_FACTORY, new SingleThreadedClaimStrategy(4096), new BlockingWaitStrategy());
        ringBuffer.setGatingSequences(sequenceGroup);
    }

    public void addHandler(EventHandler<Holder> handler) {
        SequenceBarrier barrier = ringBuffer.newBarrier();
        BatchEventProcessor<Holder> processor = new BatchEventProcessor<>(ringBuffer, barrier, handler);
        processor.getSequence().set(barrier.getCursor());
        sequenceGroup.add(processor.getSequence());
        processor.getSequence().set(ringBuffer.getCursor());
        handlersWithProcessors.put(handler, processor);
        processor.run();
    }

    public void removeHandler(EventHandler<Holder> handler) {
        BatchEventProcessor<Holder> processor = handlersWithProcessors.remove(handler);
        processor.halt();
        sequenceGroup.remove(processor.getSequence());
    }

    public void publishValue(String value) {
        long nextSequence = ringBuffer.next();
        Holder holder = ringBuffer.get(nextSequence);
        holder.setValue(value);
        ringBuffer.publish(nextSequence);
    }

    public boolean hasHandlers() {
        return handlersWithProcessors.size() != 0;
    }
}

class Holder {

    private String value;

    public static final EventFactory<Holder> EVENT_FACTORY = new EventFactory<Holder>() {
        @Override
        public Holder newInstance() {
            return new Holder();
        }
    };

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

public class TestCase {

    private static final MyDisruptor disruptor = new MyDisruptor();

    private static class Handler implements EventHandler<Holder> {

        private int count = 0;

        @Override
        public void onEvent(Holder holder, long l, boolean b) throws Exception {
            count++;
            if (count == 5000 && !holder.getValue().isEmpty()) {
                disruptor.removeHandler(this);
            }
        }
    }

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 100000; i++) {
            for (int j = 0; j < 4; j++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        disruptor.addHandler(new Handler());
                    }
                }).start();
            }

            while (disruptor.hasHandlers()) {
                disruptor.publishValue(String.valueOf(System.currentTimeMillis()));
            }
        }
    }
}

+1 for the sample code :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment