Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use disruptor with multi consumers and get wrong #419

Closed
AnswerNo2 opened this issue Apr 9, 2022 · 2 comments
Closed

use disruptor with multi consumers and get wrong #419

AnswerNo2 opened this issue Apr 9, 2022 · 2 comments

Comments

@AnswerNo2
Copy link

AnswerNo2 commented Apr 9, 2022

disruptor version:

  <dependency>
      <groupId>com.lmax</groupId>
      <artifactId>disruptor</artifactId>
      <version>3.4.3</version>
  </dependency>

java version: 1.8

springboot version:2.5

10 Threads consumer message, after 5minitus later, an thread occurred:

image

one thread occurred TIMED_WAITING, and then, disruptor can not consumer messge any more, stopped,

Notice:i produced About 5 messages per second

@configuration
public class RingBufferConfiguration {

@Autowired
private ApplicationContext applicationContext;

@Bean
public RingBuffer<MessageVO> ringBuffer() {
    //  ring buffer size
    int bufferSize = 1024 * 1024;;
    //固定线程数
    int nThreads = 10;
    ExecutorService executor = Executors.newFixedThreadPool(nThreads);

    EventFactory<MessageVO> factory = new EventFactory<MessageVO>() {
        @Override
        public MessageVO newInstance() {
            return new MessageVO();
        }
    };

    // create ringBuffer
    RingBuffer<MessageVO> ringBuffer = RingBuffer.create(ProducerType.SINGLE, factory, bufferSize,  new BlockingWaitStrategy());
    SequenceBarrier barriers = ringBuffer.newBarrier();
    // 10 threads consumer message
    RiskCalculateHandler[] consumers = new RiskCalculateHandler[50];
    RiskCalculateService riskCalculateService = applicationContext.getBean(RiskCalculateService.class);
    AccountChangeHandler accountChangeHandler = applicationContext.getBean(AccountChangeHandler.class);
    for (int i = 0; i < consumers.length; i++) {
        consumers[i] = new RiskCalculateHandler(riskCalculateService, accountChangeHandler);
    }
    WorkerPool<MessageVO> workerPool = new WorkerPool<MessageVO>(ringBuffer, barriers,
            new RiskExceptionHandler(ringBuffer, riskCalculateService), consumers);
    ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
    RingBuffer<MessageVO> ringBufferFinal = workerPool.start(executor);
    return ringBufferFinal;
}

}

image

image

How to solve this question ?

@noakcn
Copy link

noakcn commented Apr 9, 2022 via email

@grumpyjames
Copy link
Member

The usual cause of that method blocking is that the ringbuffer is full.

I would guess that the consumers are not able to process the events as fast as they arriving. You could check this by adding logging to the consumers to track their progress, or other logging to periodically measure the depth of the ringbuffer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants