Skip to content

Commit

Permalink
ARTEMIS-3701 Do no block libaio on compacting or closing
Browse files Browse the repository at this point in the history
I am adding a test showing it is safe to not wait pending callbacks before closing a file.
With this I can just close the file and let the kernel to deal with sending the completions.
  • Loading branch information
clebertsuconic committed Mar 4, 2022
1 parent e0ca92d commit e949e38
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@

public interface SequentialFile {

default boolean isPending() {
return false;
}

default void waitNotPending() {
return;
}

boolean isOpen();

boolean exists();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.nio.ByteBuffer;
import java.util.PriorityQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand All @@ -33,8 +32,6 @@
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioFile;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
import org.apache.activemq.artemis.utils.AutomaticLatch;
import org.apache.activemq.artemis.utils.Waiter;
import org.jboss.logging.Logger;

/** This class is implementing Runnable to reuse a callback to close it. */
Expand All @@ -44,14 +41,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {

private boolean opened = false;

private volatile boolean pendingClose = false;

private LibaioFile aioFile;

private final AIOSequentialFileFactory aioFactory;

private final AutomaticLatch pendingCallbacks = new AutomaticLatch();

/**
* Used to determine the next writing sequence
*/
Expand Down Expand Up @@ -106,49 +99,9 @@ public void close() throws IOException, InterruptedException, ActiveMQException
close(true, true);
}

private void actualClose() {
try {
aioFile.close();
} catch (Throwable e) {
// an exception here would means a double
logger.debug("Exeption while closing file - " + e.getMessage(), e);
} finally {
aioFile = null;
pendingClose = false;
aioFactory.afterClose();
}
}

@Override
public boolean isPending() {
return pendingClose;
}

@Override
public void waitNotPending() {
try {
for (short retryPending = 0; pendingClose && retryPending < 60; retryPending++) {
if (pendingCallbacks.await(1, TimeUnit.SECONDS)) {
break;
}
}
if (pendingClose) {
if (!Waiter.waitFor(() -> !pendingClose, TimeUnit.SECONDS, 60, TimeUnit.NANOSECONDS, 1000)) {
AIOSequentialFileFactory.threadDump("File " + getFileName() + " still has pending IO before closing it");
}
}
} catch (InterruptedException e) {
// nothing to be done here, other than log it and forward it
logger.warn(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}

@Override
public synchronized void close(boolean waitSync, boolean blockOnWait) throws IOException, InterruptedException, ActiveMQException {
// a double call on close, should result on it waitingNotPending before another close is called
waitNotPending();

if (!opened) {
return;
}
Expand All @@ -157,16 +110,16 @@ public synchronized void close(boolean waitSync, boolean blockOnWait) throws IOE

super.close();
opened = false;
pendingClose = true;
this.timedBuffer = null;

if (waitSync) {
pendingCallbacks.afterCompletion(this::actualClose);
if (blockOnWait) {
pendingCallbacks.await();
}
} else {
actualClose();
try {
aioFile.close();
} catch (Throwable e) {
// an exception here would means a double
logger.debug("Exeption while closing file - " + e.getMessage(), e);
} finally {
aioFile = null;
aioFactory.afterClose();
}
}

Expand All @@ -190,7 +143,6 @@ public void open() throws Exception {
@Override
public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException {
// in case we are opening a file that was just closed, we need to wait previous executions to be done
waitNotPending();
if (opened) {
return;
}
Expand Down Expand Up @@ -322,13 +274,10 @@ AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCa
boolean releaseBuffer) {
AIOSequentialFileFactory.AIOSequentialCallback callback = aioFactory.getCallback();
callback.init(this.nextWritingSequence.getAndIncrement(), originalCallback, aioFile, this, buffer, releaseBuffer);
pendingCallbacks.countUp();
return callback;
}

void done(AIOSequentialFileFactory.AIOSequentialCallback callback) {
pendingCallbacks.countDown();

if (callback.writeSequence == -1) {
callback.sequentialDone();
}
Expand Down Expand Up @@ -373,7 +322,7 @@ public long size() throws Exception {

@Override
public String toString() {
return "AIOSequentialFile{" + getFileName() + ", opened=" + opened + ", pendingClose=" + pendingClose + ", pendingCallbacks=" + pendingCallbacks + '}';
return "AIOSequentialFile{" + getFileName() + ", opened=" + opened + '}';
}

private void checkOpened() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.util.internal.PlatformDependent;
Expand All @@ -41,7 +40,6 @@
import org.apache.activemq.artemis.nativo.jlibaio.SubmitInfo;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.PowerOf2Util;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
import org.jboss.logging.Logger;
import org.jctools.queues.MpmcArrayQueue;
Expand All @@ -64,8 +62,6 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
}
}

private final ReusableLatch pendingClose = new ReusableLatch(0);

private final ReuseBuffersController buffersControl = new ReuseBuffersController();

private volatile boolean reuseBuffers = true;
Expand All @@ -81,11 +77,9 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
private static final String AIO_TEST_FILE = ".aio-test";

public void beforeClose() {
pendingClose.countUp();
}

public void afterClose() {
pendingClose.countDown();
}

public AIOSequentialFileFactory(final File journalDir, int maxIO) {
Expand Down Expand Up @@ -305,15 +299,6 @@ public void stop() {
if (this.running.compareAndSet(true, false)) {
buffersControl.stop();

try {
// if we stop libaioContext before we finish this, we will never get confirmation on items previously sent
if (!pendingClose.await(1, TimeUnit.MINUTES)) {
threadDump("Timeout on waiting for asynchronous close");
}
} catch (Throwable throwableToLog) {
logger.warn(throwableToLog.getMessage(), throwableToLog);
}

libaioContext.close();
libaioContext = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void setNegReclaimCriteria() {

@Override
public boolean isCanReclaim() {
return reclaimable && posReclaimCriteria && negReclaimCriteria && !file.isPending();
return reclaimable && posReclaimCriteria && negReclaimCriteria;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,8 @@ public void setExecutor(final Executor fileExecutor) {
}

public void clear() throws Exception {
for (JournalFile file : dataFiles) {
file.getFile().waitNotPending();
}

dataFiles.clear();


for (JournalFile file : freeFiles) {
file.getFile().waitNotPending();
}

freeFiles.clear();

freeFilesCount.set(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2050,10 +2050,6 @@ private ArrayList<JournalFile> getDataListToCompact() throws Exception {
}

processBackup();

for (JournalFile file : dataFilesToProcess) {
file.getFile().waitNotPending();
}
return dataFilesToProcess;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public void testOpenClose() throws Exception {
file.close(true, false);
}
} finally {
file.waitNotPending();
factory.stop();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.tests.integration.journal;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.nativo.jlibaio.LibaioContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

public class AsyncOpenCloseTest extends ActiveMQTestBase {

private static final Logger logger = Logger.getLogger(AsyncOpenCloseTest.class);

@Test
public void testCloseOnSubmit() throws Exception {
Assume.assumeTrue(LibaioContext.isLoaded());
AtomicInteger errors = new AtomicInteger(0);

SequentialFileFactory factory = new AIOSequentialFileFactory(temporaryFolder.getRoot(), (Throwable error, String message, SequentialFile file) -> errors.incrementAndGet(), 4 * 1024);
factory.start();

SequentialFile file = factory.createSequentialFile("fileAIO.bin");
file.open(1024, true);

final int WRITES = 100;
final int RECORD_SIZE = 4 * 1024;
final int OPEN_TIMES = 25;

file.fill(WRITES * RECORD_SIZE);

ByteBuffer buffer = factory.newBuffer(RECORD_SIZE);
ActiveMQBuffer buffer2 = ActiveMQBuffers.wrappedBuffer(buffer);

try {

file.close(true, false);
AtomicInteger submit = new AtomicInteger(0);

ReusableLatch valve = new ReusableLatch(0);

byte writtenByte = (byte) 'a';
for (int nclose = 0; nclose < OPEN_TIMES; nclose++) {
logger.debug("************************************************** test " + nclose);
writtenByte++;
if (writtenByte >= (byte) 'z') {
writtenByte = (byte) 'a';
}
buffer2.setIndex(0, 0);
for (int s = 0; s < RECORD_SIZE; s++) {
buffer2.writeByte(writtenByte);
}
file.open(1024, true);
CyclicBarrier blocked = new CyclicBarrier(2);
for (int i = 0; i < WRITES; i++) {
if (i == 10) {
valve.countUp();
}
file.position(i * RECORD_SIZE);
submit.incrementAndGet();
buffer2.setIndex(0, RECORD_SIZE);
file.write(buffer2, true, new IOCallback() {
@Override
public void done() {
try {
if (!valve.await(1, TimeUnit.MILLISECONDS)) {
logger.debug("blocking");
blocked.await();
valve.await(10, TimeUnit.SECONDS);
logger.debug("unblocking");
}
} catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
submit.decrementAndGet();
}

@Override
public void onError(int errorCode, String errorMessage) {
errors.incrementAndGet();
}
});
}
blocked.await();
logger.debug("Closing");
file.close(false, false);
// even though the callback is blocked, the content of the file should already be good as written
validateFile(file, (byte) writtenByte);
valve.countDown();
Wait.assertEquals(0, submit::get, 5000, 10);

}
Wait.assertEquals(0, submit::get);
} finally {
factory.releaseBuffer(buffer);
factory.stop();
}

Assert.assertEquals(0, errors.get());

}

private void validateFile(SequentialFile file, byte writtenByte) throws IOException {
FileInputStream fileInputStream = new FileInputStream(file.getJavaFile());
byte[] wholeFile = fileInputStream.readAllBytes();
for (int i = 0; i < wholeFile.length; i++) {
Assert.assertEquals(writtenByte, (byte) wholeFile[i]);
}
fileInputStream.close();
}

}

0 comments on commit e949e38

Please sign in to comment.