Skip to content

Commit

Permalink
Fix tests where files are not closed - fixed #680
Browse files Browse the repository at this point in the history
  • Loading branch information
dpisklov committed May 27, 2020
1 parent 1ebc95e commit 50fbe33
Show file tree
Hide file tree
Showing 26 changed files with 383 additions and 353 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package net.openhft.chronicle.queue.impl;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.queue.RollDetails;
import net.openhft.chronicle.queue.TailerDirection;
import org.jetbrains.annotations.NotNull;
Expand All @@ -31,7 +32,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

public class WireStorePool implements StoreReleasable {
public class WireStorePool extends AbstractCloseable implements StoreReleasable {
// must be power-of-two
private static final int ROLL_CYCLE_CACHE_SIZE = 64;
private static final int INDEX_MASK = ROLL_CYCLE_CACHE_SIZE - 1;
Expand All @@ -42,7 +43,6 @@ public class WireStorePool implements StoreReleasable {
private final StoreFileListener storeFileListener;
// protected by synchronized on acquire()
private final RollDetails[] cache = new RollDetails[ROLL_CYCLE_CACHE_SIZE];
private boolean isClosed = false;

private WireStorePool(@NotNull WireStoreSupplier supplier, StoreFileListener storeFileListener) {
this.supplier = supplier;
Expand All @@ -59,11 +59,8 @@ private static int cacheIndex(final int cycle) {
return cycle & INDEX_MASK;
}

public synchronized void close() {
if (isClosed)
return;
isClosed = true;

@Override
protected void performClose() {
stores.values().stream()
.map(Reference::get)
.filter(Objects::nonNull)
Expand All @@ -72,6 +69,7 @@ public synchronized void close() {

@Nullable
public synchronized WireStore acquire(final int cycle, final long epoch, boolean createIfAbsent) {
throwExceptionIfClosed();
final int cacheIndex = cacheIndex(cycle);
RollDetails rollDetails;
rollDetails = cache[cacheIndex];
Expand Down Expand Up @@ -121,6 +119,8 @@ public synchronized void release(@NotNull CommonStore store) {
if (ref != null && ref.get() == store) {
stores.remove(entry.getKey());
storeFileListener.onReleased(entry.getKey().cycle(), store.file());

store.close();
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.core.ReferenceCounted;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.util.ThrowingFunction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -10,11 +12,9 @@
import java.util.function.Function;

/**
* Thread-safe, self-cleaning cache for ReferenceCounted objects.
* <p>
* Created by Jerry Shea on 27/04/18.
* Thread-safe, self-cleaning cache for ReferenceCounted objects
*/
public class ReferenceCountedCache<K, T extends ReferenceCounted, V, E extends Throwable> {
public class ReferenceCountedCache<K, T extends ReferenceCounted & Closeable, V, E extends Throwable> extends AbstractCloseable {
private final Map<K, T> cache = new ConcurrentHashMap<>();
private final Function<T, V> transformer;
private final ThrowingFunction<K, T, E> creator;
Expand Down Expand Up @@ -46,4 +46,9 @@ V get(@NotNull final K key) throws E {
value.release();
}
}

@Override
protected void performClose() {
cache.forEach((k, v) -> v.close());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.annotation.PackageLocal;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.ThreadLocalHelper;
import net.openhft.chronicle.core.time.TimeProvider;
Expand Down Expand Up @@ -57,7 +58,7 @@
import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts.StoreAppender;
import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueueExcerpts.StoreTailer;

public class SingleChronicleQueue implements RollingChronicleQueue {
public class SingleChronicleQueue extends AbstractCloseable implements RollingChronicleQueue {

public static final String SUFFIX = ".cq4";
public static final String QUEUE_METADATA_FILE = "metadata" + SingleTableStore.SUFFIX;
Expand Down Expand Up @@ -611,25 +612,17 @@ public <T> void addCloseListener(T key, Consumer<T> closer) {
}
}

@Override
public boolean isClosed() {
return isClosed.get();
}

@SuppressWarnings("unchecked")
@Override
public void close() {

if (isClosed.getAndSet(true))
return;

protected void performClose() {
closeQuietly(directoryListing, queueLock, writeLock, lastAcknowledgedIndexReplicated, lastIndexReplicated);

synchronized (closers) {
closers.forEach((k, v) -> v.accept(k));
closers.clear();
}
this.pool.close();
this.storeSupplier.close();
closeQuietly(metaStore);
}

Expand Down Expand Up @@ -833,7 +826,7 @@ private static final class CachedCycleTree {

static long lastTimeMapped = 0;

private class StoreSupplier implements WireStoreSupplier {
private class StoreSupplier extends AbstractCloseable implements WireStoreSupplier {
private final AtomicReference<CachedCycleTree> cachedTree = new AtomicReference<>();
private final ReferenceCountedCache<File, MappedFile, MappedBytes, IOException> mappedFileCache =
new ReferenceCountedCache<>(MappedBytes::mappedBytes, SingleChronicleQueue.this::mappedFile);
Expand Down Expand Up @@ -928,6 +921,11 @@ public WireStore acquire(int cycle, boolean createIfAbsent) {
}
}

@Override
protected void performClose() {
mappedFileCache.close();
}

private void createFile(final File path) {
try {
File dir = path.getParentFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,11 @@ public SecretKeySpec key() {
}

protected void preBuild() {
initializeMetadata();
try {
initializeMetadata();
} catch (Exception ex) {
metaStore.close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import net.openhft.chronicle.bytes.util.DecoratedBufferUnderflowException;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.annotation.PackageLocal;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.pool.StringBuilderPool;
import net.openhft.chronicle.core.values.LongValue;
Expand Down Expand Up @@ -84,7 +85,7 @@ private static void releaseIfNotNullAndReferenced(@Nullable final Bytes bytesRef
//
// *************************************************************************

static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalAppender {
static class StoreAppender extends AbstractCloseable implements ExcerptAppender, ExcerptContext, InternalAppender {
@NotNull
private final SingleChronicleQueue queue;
@NotNull
Expand All @@ -108,7 +109,6 @@ static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalA
@Nullable
private Pretoucher pretoucher = null;
private NativeBytesStore<Void> batchTmp;
private final AtomicBoolean isClosed = new AtomicBoolean();

StoreAppender(@NotNull final SingleChronicleQueue queue,
@NotNull final WireStorePool storePool,
Expand Down Expand Up @@ -154,9 +154,8 @@ public void writeBytes(@NotNull final WriteBytesMarshallable marshallable) throw
}
}


void close() {
if (!isClosed.getAndSet(true)) {
@Override
protected void performClose() {
if (Jvm.isDebugEnabled(getClass()))
Jvm.debug().on(getClass(), "Closing store append for " + queue.file().getAbsolutePath());
final Wire w0 = wireForIndex;
Expand All @@ -175,7 +174,6 @@ void close() {
}
store = null;
storePool.close();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ protected void performClose() {
while (refCount.refCount() > 0) {
refCount.release();
}
Closeable.closeQuietly(writePosition);
Closeable.closeQuietly(indexing);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import net.openhft.chronicle.core.StackTrace;
import net.openhft.chronicle.core.annotation.UsedViaReflection;
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.IORuntimeException;
import net.openhft.chronicle.core.util.StringUtils;
import net.openhft.chronicle.core.values.LongValue;
Expand All @@ -46,6 +47,7 @@
import java.util.function.Supplier;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static net.openhft.chronicle.core.io.Closeable.closeQuietly;
import static net.openhft.chronicle.core.util.Time.sleep;
import static net.openhft.chronicle.core.util.Time.tickTime;

Expand Down Expand Up @@ -206,6 +208,7 @@ protected void performClose() {
while (refCount.refCount() > 0) {
refCount.release();
}
mappedFile.close();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,20 @@ public void execute() {
}

public Map<String, Histogram> readChronicle() {
final ChronicleQueue q = createQueue();
final ExcerptTailer tailer = q.createTailer();
final WireParselet parselet = parselet();
MessageHistory.set(new VanillaMessageHistory());
try (final MethodReader mr = new VanillaMethodReader(tailer, true, parselet, null, parselet)) {

while (!Thread.currentThread().isInterrupted() && mr.readOne()) {
++counter;
if (this.progress && counter % 1_000_000L == 0) {
System.out.println("Progress: " + counter);
try (final ChronicleQueue q = createQueue()) {
final ExcerptTailer tailer = q.createTailer();
final WireParselet parselet = parselet();
MessageHistory.set(new VanillaMessageHistory());
try (final MethodReader mr = new VanillaMethodReader(tailer, true, parselet, null, parselet)) {

while (!Thread.currentThread().isInterrupted() && mr.readOne()) {
++counter;
if (this.progress && counter % 1_000_000L == 0) {
System.out.println("Progress: " + counter);
}
}
}
}

return histos;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,31 +32,32 @@ private void test(final int pause) {
SetTimeProvider timeProvider = new SetTimeProvider();
timeProvider.currentTimeMillis(System.currentTimeMillis());
File tmpDir = getTmpDir();
SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(tmpDir).timeProvider(timeProvider).rollCycle(RollCycles.TEST_SECONDLY).build();
ExcerptAppender excerptAppender = queue.acquireAppender();
try (DocumentContext dc = excerptAppender.writingDocument()) {
dc.wire().write("hello").text("world");
}
try (SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(tmpDir).timeProvider(timeProvider).rollCycle(RollCycles.TEST_SECONDLY).build()) {
ExcerptAppender excerptAppender = queue.acquireAppender();
try (DocumentContext dc = excerptAppender.writingDocument()) {
dc.wire().write("hello").text("world");
}

timeProvider.advanceMillis(pause);
timeProvider.advanceMillis(pause);

{
ExcerptTailer tailer = queue.createTailer().direction(TailerDirection.BACKWARD);
tailer.toEnd();
try (DocumentContext dc = tailer.readingDocument()) {
assertEquals("world", dc.wire().read("hello").text());
{
ExcerptTailer tailer = queue.createTailer().direction(TailerDirection.BACKWARD);
tailer.toEnd();
try (DocumentContext dc = tailer.readingDocument()) {
assertEquals("world", dc.wire().read("hello").text());
}
}
}

// pretouch to create next cycle file ----- IF YOU COMMENT THIS LINE THE TEST PASSES
excerptAppender.pretouch();
// pretouch to create next cycle file ----- IF YOU COMMENT THIS LINE THE TEST PASSES
excerptAppender.pretouch();

{
ExcerptTailer tailer = queue.createTailer().direction(TailerDirection.BACKWARD);
tailer.toEnd();
try (DocumentContext dc = tailer.readingDocument()) {
assertTrue(dc.isPresent());
assertEquals("world", dc.wire().read("hello").text());
{
ExcerptTailer tailer = queue.createTailer().direction(TailerDirection.BACKWARD);
tailer.toEnd();
try (DocumentContext dc = tailer.readingDocument()) {
assertTrue(dc.isPresent());
assertEquals("world", dc.wire().read("hello").text());
}
}
}
}
Expand Down

0 comments on commit 50fbe33

Please sign in to comment.