Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;

import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
Expand Down Expand Up @@ -368,14 +369,10 @@ private class CheckTimer implements Runnable {

int checks = 0;
int failedChecks = 0;
long timeBefore = 0;

final int sleepMillis = timeout / 1000000; // truncates
final int sleepNanos = timeout % 1000000;

@Override
public void run() {
long lastFlushTime = 0;
long lastFlushTime = System.nanoTime();

while (!closed) {
// We flush on the timer if there are pending syncs there and we've waited at least one
Expand All @@ -386,17 +383,22 @@ public void run() {
if (pendingSync) {
if (useSleep) {
// if using sleep, we will always flush
flush();
lastFlushTime = System.nanoTime();
flush();

} else if (bufferObserver != null && System.nanoTime() > lastFlushTime + timeout) {
lastFlushTime = System.nanoTime();
// if not using flush we will spin and do the time checks manually
flush();
lastFlushTime = System.nanoTime();
}

}

sleepIfPossible();
//it could wait until the timeout is expired
final long timeFromTheLastFlush = System.nanoTime() - lastFlushTime;
final long timeToSleep = timeFromTheLastFlush - timeout;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are basically discounting the time it took to sync?

looks good!

if (timeToSleep > 0) {
sleepIfPossible(timeToSleep);
}

try {
spinLimiter.acquire();
Expand All @@ -415,35 +417,28 @@ public void run() {
* we will on that case verify up to MAX_CHECKS if nano sleep is behaving well.
* if more than 50% of the checks have failed we will cancel the sleep and just use regular spin
*/
private void sleepIfPossible() {
private void sleepIfPossible(long nanosToSleep) {
if (useSleep) {
if (checks < MAX_CHECKS_ON_SLEEP) {
timeBefore = System.nanoTime();
}

try {
sleep(sleepMillis, sleepNanos);
} catch (InterruptedException e) {
throw new ActiveMQInterruptedException(e);
} catch (Exception e) {
useSleep = false;
ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
}

if (checks < MAX_CHECKS_ON_SLEEP) {
long realTimeSleep = System.nanoTime() - timeBefore;

// I'm letting the real time to be up to 50% than the requested sleep.
if (realTimeSleep > timeout * 1.5) {
failedChecks++;
}
final long startSleep = System.nanoTime();
sleep(nanosToSleep);
final long elapsedSleep = System.nanoTime() - startSleep;
if (checks < MAX_CHECKS_ON_SLEEP) {
// I'm letting the real time to be up to 50% than the requested sleep.
if (elapsedSleep > nanosToSleep * 1.5) {
failedChecks++;
}

if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
ActiveMQJournalLogger.LOGGER.debug("Thread.sleep with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
useSleep = false;
if (++checks >= MAX_CHECKS_ON_SLEEP) {
if (failedChecks > MAX_CHECKS_ON_SLEEP * 0.5) {
ActiveMQJournalLogger.LOGGER.debug("LockSupport.parkNanos with nano seconds is not working as expected, Your kernel possibly doesn't support real time. the Journal TimedBuffer will spin for timeouts");
useSleep = false;
}
}
}
} catch (Exception e) {
useSleep = false;
ActiveMQJournalLogger.LOGGER.warn(e.getMessage() + ", disabling sleep on TimedBuffer, using spin now", e);
}
}
}
Expand All @@ -456,12 +451,11 @@ public void close() {
/**
* Sub classes (tests basically) can use this to override how the sleep is being done
*
* @param sleepMillis
* @param sleepNanos
* @throws InterruptedException
*/
protected void sleep(int sleepMillis, int sleepNanos) throws InterruptedException {
Thread.sleep(sleepMillis, sleepNanos);
protected void sleep(long sleepNanos) {
LockSupport.parkNanos(sleepNanos);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -217,6 +221,245 @@ public void onError(int errorCode, String errorMessage) {

}

private static void spinSleep(long timeout) {
if (timeout > 0) {
final long deadline = System.nanoTime() + timeout;
while (System.nanoTime() < deadline) {
//spin wait
}
}
}

private static final class NonBlockingObserver implements TimedBufferObserver, AutoCloseable {

private long flushes = 0;
private final ByteBuffer dummyBuffer;
private final Thread asyncIOWriter;
private final AtomicLong flushRequest = new AtomicLong(0);
private final AtomicLong flushesDone = new AtomicLong(0);

private NonBlockingObserver(int bufferSize, long deviceTime) {
this.asyncIOWriter = new Thread(() -> {
long flushes = 0;
while (!Thread.interrupted()) {
if (flushRequest.get() > flushes) {
final long flushesToBePerformed = flushRequest.get() - flushes;
//during the flush time no new flush request can be taken!
for (int i = 0; i < flushesToBePerformed; i++) {
spinSleep(deviceTime);
flushes++;
//make progress to let others being notified
flushesDone.lazySet(flushes);
}
}
}
});
dummyBuffer = ByteBuffer.allocate(bufferSize);
asyncIOWriter.start();
}

@Override
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
assert sync;
assert dummyBuffer == buffer;
if (buffer.position() > 0) {
dummyBuffer.clear();
flushes++;
//ask the device to perform a flush
flushRequest.lazySet(flushes);
}
}

/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
*/
@Override
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
assert maxSize <= dummyBuffer.capacity();
dummyBuffer.limit(minSize);
return dummyBuffer;
}

@Override
public int getRemainingBytes() {
return Integer.MAX_VALUE;
}

@Override
public void close() {
asyncIOWriter.interrupt();
}

public void waitUntilFlushIsDone(long flush) {
//spin wait to be more reactive
while (flushesDone.get() < flush) {

}
}

public long flushesDone() {
return flushesDone.get();
}
}

private static final class BlockingObserver implements TimedBufferObserver, AutoCloseable {

private long flushes = 0;
private final ByteBuffer dummyBuffer;
private final long deviceTime;
private final AtomicLong flushesDone = new AtomicLong(0);

private BlockingObserver(int bufferSize, long deviceTime) {
this.dummyBuffer = ByteBuffer.allocate(bufferSize);
this.deviceTime = deviceTime;
}

@Override
public void flushBuffer(final ByteBuffer buffer, final boolean sync, final List<IOCallback> callbacks) {
assert sync;
assert dummyBuffer == buffer;
if (dummyBuffer.position() > 0) {
dummyBuffer.clear();
//emulate the flush time of a blocking device with a precise sleep
spinSleep(deviceTime);
flushes++;
//publish the number of flushes happened
flushesDone.lazySet(flushes);
}
}

/* (non-Javadoc)
* @see org.apache.activemq.artemis.utils.timedbuffer.TimedBufferObserver#newBuffer(int, int)
*/
@Override
public ByteBuffer newBuffer(final int minSize, final int maxSize) {
assert maxSize <= dummyBuffer.capacity();
dummyBuffer.limit(minSize);
return dummyBuffer;
}

@Override
public int getRemainingBytes() {
return Integer.MAX_VALUE;
}

@Override
public void close() {
//no op
}

public void waitUntilFlushIsDone(long flush) {
//spin wait to be more reactive
while (flushesDone.get() < flush) {

}
}

public long flushesDone() {
return flushesDone.get();
}

}

private static final EncodingSupport LONG_ENCODER = new EncodingSupport() {
@Override
public int getEncodeSize() {
return Long.BYTES;
}

@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeLong(1L);
}

@Override
public void decode(ActiveMQBuffer buffer) {

}
};

/**
* This test is showing the behaviour of the TimedBuffer with a blocking API (NIO/MAPPED) and
* how the timeout value is not == 1/IOPS like the ASYNCIO case
*/
@Test
public void timeoutShouldMatchFlushIOPSWithNotBlockingFlush() {
//use a large timeout in order to be reactive
final long timeout = TimeUnit.SECONDS.toNanos(2);
assert ((int) timeout) > 0;
//it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match
final long deviceTime = timeout;
final int bufferSize = Env.osPageSize();
final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false);
timedBuffer.start();
try (NonBlockingObserver observer = new NonBlockingObserver(bufferSize, deviceTime)) {
timedBuffer.setObserver(observer);
//do not call checkSize because we already know that will succeed
timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance());
//wait the first flush to happen
observer.waitUntilFlushIsDone(1);
//for a not-blocking flush I'm expecting the TimedBuffer has near to finished sleeping now
assert observer.flushesDone() == 1;
//issue a new write
timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance());
//the countdown on the TimedBuffer is already started even before this addBytes
final long endOfWriteRequest = System.nanoTime();
//wait until it will succeed
observer.waitUntilFlushIsDone(2);
final long flushDone = System.nanoTime();
final long elapsedTime = flushDone - endOfWriteRequest;
assert observer.flushesDone() == 2;
//it is much more than what is expected!!if it will fail it means that the timed IOPS = 1/(timeout + blockingDeviceFlushTime)!!!!!!
//while it has to be IOPS = 1/timeout
System.out.println("elapsed time: " + elapsedTime + " with timeout: " + timeout);
final long maxExpected = timeout + deviceTime;
Assert.assertTrue("elapsed = " + elapsedTime + " max expected = " + maxExpected, elapsedTime <= maxExpected);
} finally {
timedBuffer.stop();
}
}

/**
* This test is showing the behaviour of the TimedBuffer with a blocking API (NIO/MAPPED) and
* how the timeout value is not == 1/IOPS like the ASYNCIO case
*/
@Test
public void timeoutShouldMatchFlushIOPSWithBlockingFlush() {
//use a large timeout in order to be reactive
final long timeout = TimeUnit.SECONDS.toNanos(2);
assert ((int) timeout) > 0;
//it is optimistic: the timeout and the blockingDeviceFlushTime are a perfect match
final long deviceTime = timeout;
final int bufferSize = Env.osPageSize();
final TimedBuffer timedBuffer = new TimedBuffer(bufferSize, (int) timeout, false);
timedBuffer.start();
try (BlockingObserver observer = new BlockingObserver(bufferSize, deviceTime)) {
timedBuffer.setObserver(observer);
//do not call checkSize because we already know that will succeed
timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance());
//wait the first flush to happen
observer.waitUntilFlushIsDone(1);
//for a blocking flush I'm expecting the TimedBuffer has started sleeping now
assert observer.flushesDone() == 1;
//issue a new write
timedBuffer.addBytes(LONG_ENCODER, true, DummyCallback.getInstance());
//the countdown on the TimedBuffer is already started even before this addBytes
final long endOfWriteRequest = System.nanoTime();
//wait until it will succeed
observer.waitUntilFlushIsDone(2);
final long flushDone = System.nanoTime();
final long elapsedTime = flushDone - endOfWriteRequest;
assert observer.flushesDone() == 2;
//it is much more than what is expected!!if it will fail it means that the timed IOPS = 1/(timeout + blockingDeviceFlushTime)!!!!!!
//while it has to be IOPS = 1/timeout
System.out.println("elapsed time: " + elapsedTime + " with timeout: " + timeout);
final long maxExpected = timeout + deviceTime;
Assert.assertTrue("elapsed = " + elapsedTime + " max expected = " + maxExpected, elapsedTime <= maxExpected);
} finally {
timedBuffer.stop();
}
}

@Test
public void testTimingAndFlush() throws Exception {
final ArrayList<ByteBuffer> buffers = new ArrayList<>();
Expand Down