Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -303,3 +303,15 @@ The following files include code modified from Dropwizard Metrics project.
Copyright (c) 2010-2013 Coda Hale, Yammer.com, 2014-2021 Dropwizard Team
Project page: https://github.com/dropwizard/metrics
License: https://github.com/dropwizard/metrics/blob/release/4.2.x/LICENSE

--------------------------------------------------------------------------------

The following files include code modified from LMax Disruptor project.

./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/disruptor/*

LMax Disruptor is open source software licensed under the Apache License 2.0 and supported by the Apache Software Foundation.
Project page: https://github.com/LMAX-Exchange/disruptor
License: https://github.com/LMAX-Exchange/disruptor/blob/master/LICENCE.txt

--------------------------------------------------------------------------------
4 changes: 0 additions & 4 deletions iotdb-core/datanode/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;

import com.lmax.disruptor.RingBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -183,7 +183,7 @@ public void onTransferred() {

/////////////////////////////// Queue size Reporting ///////////////////////////////

public void recordDisruptorSize(final RingBuffer<?> ringBuffer) {
public void recordDisruptorSize(final RingBuffer ringBuffer) {
if (shouldPrintMessage) {
disruptorSize = ringBuffer.getBufferSize() - (int) ringBuffer.remainingCapacity();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.Disruptor;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.EventHandler;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.RingBuffer;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -75,9 +73,8 @@ public DisruptorQueue(
32,
Math.toIntExact(
allocatedMemoryBlock.getMemoryUsageInBytes() / ringBufferEntrySizeInBytes)),
THREAD_FACTORY,
ProducerType.MULTI,
new BlockingWaitStrategy());
THREAD_FACTORY);

disruptor.handleEventsWith(
(container, sequence, endOfBatch) -> {
final PipeRealtimeEvent realtimeEvent = container.getEvent();
Expand Down Expand Up @@ -127,7 +124,7 @@ private void mayPrintExceedingLog() {

private static class EventContainer {

private PipeRealtimeEvent event;
private volatile PipeRealtimeEvent event;

private EventContainer() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;

import com.lmax.disruptor.ExceptionHandler;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor.ExceptionHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public void stopAssignTo(final PipeRealtimeDataRegionSource extractor) {
matcher.deregister(extractor);
}

public boolean notMoreExtractorNeededToBeAssigned() {
public boolean notMoreSourceNeededToBeAssigned() {
return matcher.getRegisterCount() == 0;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Batch event processor for consuming events
*
* <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor)
* and simplified for IoTDB's Pipe module (removed complex lifecycle management).
*
* <p>Core algorithm preserved from LMAX Disruptor:
*
* <ul>
* <li>Batch processing loop
* <li>Sequence tracking
* <li>endOfBatch detection
* </ul>
*
* @param <T> event type
*/
public final class BatchEventProcessor<T> implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchEventProcessor.class);

private final RingBuffer<T> ringBuffer;
private final SequenceBarrier sequenceBarrier;
private final EventHandler<? super T> eventHandler;
private final Sequence sequence = new Sequence();
private ExceptionHandler<? super T> exceptionHandler = new DefaultExceptionHandler<>();
private volatile boolean running = true;

public BatchEventProcessor(
RingBuffer<T> ringBuffer, SequenceBarrier barrier, EventHandler<? super T> eventHandler) {
this.ringBuffer = ringBuffer;
this.sequenceBarrier = barrier;
this.eventHandler = eventHandler;
}

public Sequence getSequence() {
return sequence;
}

public void setExceptionHandler(ExceptionHandler<? super T> exceptionHandler) {
this.exceptionHandler = exceptionHandler;
}

public void halt() {
running = false;
}

@Override
public void run() {
long nextSequence = sequence.get() + 1L;

while (running) {
try {
// Wait for available sequence
final long availableSequence = sequenceBarrier.waitFor(nextSequence);

// Batch process all available events
nextSequence = processAvailableEvents(nextSequence, availableSequence);

} catch (final InterruptedException ex) {
if (running) {
Thread.currentThread().interrupt();
LOGGER.info("Processor interrupted");
}
break;
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, ringBuffer.get(nextSequence));
sequence.set(nextSequence);
nextSequence++;
}
}

if (!running) {
drainRemainingPublishedEvents(nextSequence);
}
LOGGER.info("Processor stopped");
}

private long processAvailableEvents(long nextSequence, long availableSequence) throws Throwable {
while (nextSequence <= availableSequence) {
final T event = ringBuffer.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}

sequence.set(availableSequence);
return nextSequence;
}

private void drainRemainingPublishedEvents(long nextSequence) {
final long availableSequence = sequenceBarrier.getCursor();
if (availableSequence < nextSequence) {
return;
}

final long highestPublishedSequence =
sequenceBarrier.getHighestPublishedSequence(nextSequence, availableSequence);
while (nextSequence <= highestPublishedSequence) {
final T event = ringBuffer.get(nextSequence);
try {
eventHandler.onEvent(event, nextSequence, nextSequence == highestPublishedSequence);
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
} finally {
sequence.set(nextSequence);
}
nextSequence++;
}
}

private static class DefaultExceptionHandler<T> implements ExceptionHandler<T> {
@Override
public void handleEventException(Throwable ex, long sequence, T event) {
LoggerFactory.getLogger(getClass()).error("Exception processing: {} {}", sequence, event, ex);
}

@Override
public void handleOnStartException(Throwable ex) {
LoggerFactory.getLogger(getClass()).error("Exception during onStart()", ex);
}

@Override
public void handleOnShutdownException(Throwable ex) {
LoggerFactory.getLogger(getClass()).error("Exception during onShutdown()", ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.source.dataregion.realtime.disruptor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadFactory;

/**
* Simplified Disruptor implementation for IoTDB Pipe
*
* <p>This implementation is based on LMAX Disruptor (https://github.com/LMAX-Exchange/disruptor)
* and simplified for IoTDB's specific use case in the Pipe module.
*
* <p>Key simplifications:
*
* <ul>
* <li>Single event handler support (no complex dependency graphs)
* <li>Simplified lifecycle management
* <li>Removed wait strategies (using simple sleep-based waiting)
* </ul>
*
* @param <T> event type
*/
public class Disruptor<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(Disruptor.class);

private final RingBuffer<T> ringBuffer;
private final ThreadFactory threadFactory;
private BatchEventProcessor<T> processor;
private Thread processorThread;
private ExceptionHandler<? super T> exceptionHandler;
private volatile boolean started = false;

/**
* Create a Disruptor instance
*
* @param eventFactory factory for creating pre-allocated events
* @param ringBufferSize buffer size (must be power of 2)
* @param threadFactory factory for creating consumer thread
*/
public Disruptor(EventFactory<T> eventFactory, int ringBufferSize, ThreadFactory threadFactory) {
this.ringBuffer = RingBuffer.createMultiProducer(eventFactory, ringBufferSize);
this.threadFactory = threadFactory;
}

/**
* Configure event handler for processing events
*
* <p>Creates a batch event processor that will run in its own thread
*
* @param handler event handler implementation
* @return this instance for method chaining
*/
public Disruptor<T> handleEventsWith(final EventHandler<? super T> handler) {
SequenceBarrier barrier = ringBuffer.newBarrier();
processor = new BatchEventProcessor<>(ringBuffer, barrier, handler);

if (exceptionHandler != null) {
processor.setExceptionHandler(exceptionHandler);
}

ringBuffer.addGatingSequences(processor.getSequence());
return this;
}

/**
* Set exception handler for error handling
*
* @param exceptionHandler handler for processing exceptions
*/
public void setDefaultExceptionHandler(ExceptionHandler<? super T> exceptionHandler) {
this.exceptionHandler = exceptionHandler;
if (processor != null) {
processor.setExceptionHandler(exceptionHandler);
}
}

public RingBuffer<T> start() {
if (started) {
throw new IllegalStateException("Disruptor already started");
}

if (processor == null) {
throw new IllegalStateException("No event handler configured");
}

processorThread = threadFactory.newThread(processor);
processorThread.start();
started = true;

LOGGER.info("Disruptor started with buffer size: {}", ringBuffer.getBufferSize());
return ringBuffer;
}

public void shutdown() {
if (!started) {
return;
}

if (processor != null) {
processor.halt();
}

if (processorThread != null) {
try {
processorThread.interrupt();
processorThread.join(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted waiting for processor to stop");
}
if (processorThread.isAlive()) {
LOGGER.warn("Timed out waiting for processor to stop");
}
}

started = false;
LOGGER.info("Disruptor shutdown completed");
}
}
Loading
Loading