Skip to content

Commit

Permalink
Enabled checking that all AbstractCloseable are closed after test, #680
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed May 28, 2020
1 parent 8c33303 commit 616eac2
Show file tree
Hide file tree
Showing 53 changed files with 241 additions and 244 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package net.openhft.chronicle.queue.impl;

import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.core.ReferenceCounted;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.wire.Demarshallable;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.io.File;

public interface CommonStore extends ReferenceCounted, Demarshallable, WriteMarshallable, Closeable {
public interface CommonStore extends Demarshallable, WriteMarshallable {
/**
* @return the file associated with this store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
*/
package net.openhft.chronicle.queue.impl;

import org.jetbrains.annotations.Nullable;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.TailerDirection;
import net.openhft.chronicle.queue.impl.single.QueueLock;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.text.ParseException;

public interface RollingChronicleQueue extends ChronicleQueue, StoreReleasable {
public interface RollingChronicleQueue extends ChronicleQueue {

long epoch();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package net.openhft.chronicle.queue.impl;

import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.values.LongValue;
import net.openhft.chronicle.queue.impl.table.Metadata;

import java.util.function.Function;

public interface TableStore<T extends Metadata> extends CommonStore {
public interface TableStore<T extends Metadata> extends CommonStore, Closeable {

/**
* Acquire {@link LongValue} mapped to underlying file, providing atomic operations on the value that is shared
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package net.openhft.chronicle.queue.impl;

import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore;
import net.openhft.chronicle.wire.Wire;

import java.util.function.BiFunction;

@FunctionalInterface
public interface WireStoreFactory extends BiFunction<RollingChronicleQueue, Wire, WireStore> {
public interface WireStoreFactory extends BiFunction<RollingChronicleQueue, Wire, SingleChronicleQueueStore> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import net.openhft.chronicle.core.io.AbstractCloseable;
import net.openhft.chronicle.queue.RollDetails;
import net.openhft.chronicle.queue.TailerDirection;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -32,14 +33,14 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

public class WireStorePool extends AbstractCloseable implements StoreReleasable {
public class WireStorePool extends AbstractCloseable {
// must be power-of-two
private static final int ROLL_CYCLE_CACHE_SIZE = 64;
private static final int INDEX_MASK = ROLL_CYCLE_CACHE_SIZE - 1;
@NotNull
private final WireStoreSupplier supplier;
@NotNull
private final Map<RollDetails, WeakReference<WireStore>> stores;
private final Map<RollDetails, WeakReference<SingleChronicleQueueStore>> stores;
private final StoreFileListener storeFileListener;
// protected by synchronized on acquire()
private final RollDetails[] cache = new RollDetails[ROLL_CYCLE_CACHE_SIZE];
Expand Down Expand Up @@ -68,7 +69,7 @@ protected void performClose() {
}

@Nullable
public synchronized WireStore acquire(final int cycle, final long epoch, boolean createIfAbsent) {
public synchronized SingleChronicleQueueStore acquire(final int cycle, final long epoch, boolean createIfAbsent) {
throwExceptionIfClosed();
final int cacheIndex = cacheIndex(cycle);
RollDetails rollDetails;
Expand All @@ -78,8 +79,8 @@ public synchronized WireStore acquire(final int cycle, final long epoch, boolean
cache[cacheIndex] = rollDetails;
}

WeakReference<WireStore> reference = stores.get(rollDetails);
WireStore store;
WeakReference<SingleChronicleQueueStore> reference = stores.get(rollDetails);
SingleChronicleQueueStore store;
if (reference != null) {
store = reference.get();
if (store != null) {
Expand All @@ -106,16 +107,15 @@ public int nextCycle(final int currentCycle, @NotNull TailerDirection direction)
return supplier.nextCycle(currentCycle, direction);
}

@Override
public synchronized void release(@NotNull CommonStore store) {
public synchronized void release(@NotNull SingleChronicleQueueStore store) {

store.release();

long refCount = store.refCount();
assert refCount >= 0;
if (refCount == 0) {
for (Map.Entry<RollDetails, WeakReference<WireStore>> entry : stores.entrySet()) {
WeakReference<WireStore> ref = entry.getValue();
for (Map.Entry<RollDetails, WeakReference<SingleChronicleQueueStore>> entry : stores.entrySet()) {
WeakReference<SingleChronicleQueueStore> ref = entry.getValue();
if (ref != null && ref.get() == store) {
stores.remove(entry.getKey());
storeFileListener.onReleased(entry.getKey().cycle(), store.file());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
package net.openhft.chronicle.queue.impl;

import net.openhft.chronicle.queue.TailerDirection;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore;
import org.jetbrains.annotations.Nullable;

import java.text.ParseException;
import java.util.NavigableSet;

public interface WireStoreSupplier {
@Nullable
WireStore acquire(int cycle, boolean createIfAbsent);
SingleChronicleQueueStore acquire(int cycle, boolean createIfAbsent);

/**
* the next available cycle, no cycle will be created by this method, typically used by a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.impl.WireStore;

import java.io.Closeable;
import java.util.function.IntConsumer;
Expand All @@ -32,7 +31,7 @@ public final class Pretoucher implements Closeable {
private final PretoucherState pretoucherState;
private final TimeProvider pretouchTimeProvider;
private int currentCycle = Integer.MIN_VALUE;
private WireStore currentCycleWireStore;
private SingleChronicleQueueStore currentCycleWireStore;
private MappedBytes currentCycleMappedBytes;

public Pretoucher(final SingleChronicleQueue queue) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class SingleChronicleQueue extends AbstractCloseable implements RollingCh
@NotNull
private final TimeProvider time;
@NotNull
private final BiFunction<RollingChronicleQueue, Wire, WireStore> storeFactory;
private final BiFunction<RollingChronicleQueue, Wire, SingleChronicleQueueStore> storeFactory;
private final Map<Object, Consumer> closers = new WeakHashMap<>();
private final boolean readOnly;
@NotNull
Expand Down Expand Up @@ -279,7 +279,7 @@ public String fileAbsolutePath() {
@Override
public String dumpLastHeader() {
StringBuilder sb = new StringBuilder(256);
WireStore wireStore = storeForCycle(lastCycle(), epoch, false);
SingleChronicleQueueStore wireStore = storeForCycle(lastCycle(), epoch, false);
if (wireStore != null) {
try {
sb.append(wireStore.dumpHeader());
Expand All @@ -295,7 +295,7 @@ public String dumpLastHeader() {
public String dump() {
StringBuilder sb = new StringBuilder(1024);
for (int i = firstCycle(), max = lastCycle(); i <= max; i++) {
CommonStore commonStore = storeForCycle(i, epoch, false);
SingleChronicleQueueStore commonStore = storeForCycle(i, epoch, false);
if (commonStore != null) {
try {
// sb.append("# ").append(wireStore.bytes().mappedFile().file()).append("\n");
Expand Down Expand Up @@ -491,7 +491,7 @@ public ExcerptTailer createTailer() {

@Nullable
@Override
public final WireStore storeForCycle(int cycle, final long epoch, boolean createIfAbsent) {
public final SingleChronicleQueueStore storeForCycle(int cycle, final long epoch, boolean createIfAbsent) {
return this.pool.acquire(cycle, epoch, createIfAbsent);
}

Expand Down Expand Up @@ -626,8 +626,7 @@ protected void performClose() {
closeQuietly(metaStore);
}

@Override
public final void release(@Nullable CommonStore store) {
public final void release(@Nullable SingleChronicleQueueStore store) {
if (store != null)
this.pool.release(store);
}
Expand Down Expand Up @@ -791,7 +790,7 @@ void cleanupStoreFilesWithNoData() {
int cycle = cycle();
int lastCycle = lastCycle();
while (lastCycle < cycle && lastCycle >= 0) {
final WireStore store = storeSupplier.acquire(lastCycle, false);
final SingleChronicleQueueStore store = storeSupplier.acquire(lastCycle, false);
if (store == null)
return;
try {
Expand Down Expand Up @@ -834,7 +833,7 @@ private class StoreSupplier extends AbstractCloseable implements WireStoreSuppli

@SuppressWarnings("resource")
@Override
public WireStore acquire(int cycle, boolean createIfAbsent) {
public SingleChronicleQueueStore acquire(int cycle, boolean createIfAbsent) {

SingleChronicleQueue that = SingleChronicleQueue.this;
@NotNull final RollingResourcesCache.Resource dateValue = that
Expand Down Expand Up @@ -876,7 +875,7 @@ public WireStore acquire(int cycle, boolean createIfAbsent) {
wire.pauser(pauserSupplier.get());
wire.headerNumber(rollCycle.toIndex(cycle, 0) - 1);

WireStore wireStore;
SingleChronicleQueueStore wireStore;
try {
if (!readOnly && createIfAbsent && wire.writeFirstHeader()) {
wireStore = storeFactory.apply(that, wire);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import net.openhft.chronicle.core.values.LongValue;
import net.openhft.chronicle.queue.*;
import net.openhft.chronicle.queue.batch.BatchAppender;
import net.openhft.chronicle.queue.impl.*;
import net.openhft.chronicle.queue.impl.ExcerptContext;
import net.openhft.chronicle.queue.impl.WireStore;
import net.openhft.chronicle.queue.impl.WireStorePool;
import net.openhft.chronicle.wire.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -92,11 +94,11 @@ static class StoreAppender extends AbstractCloseable implements ExcerptAppender,
private final WriteLock writeLock;
@NotNull
private final StoreAppenderContext context;
private final ClosableResources<?> closableResources;
private final ClosableResources closableResources;
private final WireStorePool storePool;
private final boolean checkInterrupts;
@Nullable
WireStore store;
SingleChronicleQueueStore store;
private int cycle = Integer.MIN_VALUE;
@Nullable
private Wire wire;
Expand All @@ -118,7 +120,7 @@ static class StoreAppender extends AbstractCloseable implements ExcerptAppender,
this.checkInterrupts = checkInterrupts;
this.writeLock = queue.writeLock();
this.context = new StoreAppenderContext();
this.closableResources = new ClosableResources<>(storePool);
this.closableResources = new ClosableResources(queue);

// always put references to "this" last.
queue.addCloseListener(this, StoreAppender::close);
Expand Down Expand Up @@ -276,7 +278,7 @@ private void setCycle2(final int cycle, final boolean createIfAbsent) {

SingleChronicleQueue queue = this.queue;

WireStore store = this.store;
SingleChronicleQueueStore store = this.store;

this.store = storePool.acquire(cycle, queue.epoch(), createIfAbsent);

Expand Down Expand Up @@ -871,15 +873,15 @@ public boolean isNotComplete() {
}
}

private static final class ClosableResources<T extends StoreReleasable> {
private static final class ClosableResources {
@NotNull
private final T storeReleasable;
private final SingleChronicleQueue storeReleasable;
private final AtomicBoolean released = new AtomicBoolean();
private volatile Bytes wireReference = null;
private volatile Bytes wireForIndexReference = null;
private volatile CommonStore storeReference = null;
private volatile SingleChronicleQueueStore storeReference = null;

private ClosableResources(@NotNull final T storeReleasable) {
private ClosableResources(@NotNull final SingleChronicleQueue storeReleasable) {
this.storeReleasable = storeReleasable;
}

Expand All @@ -896,7 +898,7 @@ private void releaseResources() {
releaseIfNotNull(wireReference);

// Object is no longer reachable, check that it has not already been released
CommonStore storeReference = this.storeReference;
SingleChronicleQueueStore storeReference = this.storeReference;

if (storeReference != null) {
storeReleasable.release(storeReference);
Expand All @@ -914,11 +916,11 @@ public static class StoreTailer implements ExcerptTailer, SourceContext, Excerpt
private final SingleChronicleQueue queue;
private final LongValue indexValue;
private final StoreTailerContext context = new StoreTailerContext();
private final ClosableResources<?> closableResources;
private final ClosableResources closableResources;
private final MoveToState moveToState = new MoveToState();
long index; // index of the next read.
@Nullable
WireStore store;
SingleChronicleQueueStore store;
private int cycle;
private TailerDirection direction = TailerDirection.FORWARD;
private Wire wireForIndex;
Expand All @@ -940,7 +942,7 @@ public StoreTailer(@NotNull final SingleChronicleQueue queue, final LongValue in
this.setCycle(Integer.MIN_VALUE);
this.index = 0;
queue.addCloseListener(this, StoreTailer::close);
closableResources = new ClosableResources<>(queue);
closableResources = new ClosableResources(queue);

if (indexValue == null) {
toStart();
Expand Down Expand Up @@ -1559,7 +1561,7 @@ private long approximateLastIndex() {
if (lastCycle == Integer.MIN_VALUE)
return Long.MIN_VALUE;

final WireStore wireStore = queue.storeForCycle(lastCycle, queue.epoch(), false);
final SingleChronicleQueueStore wireStore = queue.storeForCycle(lastCycle, queue.epoch(), false);
this.setCycle(lastCycle);
if (wireStore == null)
throw new IllegalStateException("Store not found for cycle " + Long.toHexString(lastCycle) + ". Probably the files were removed?");
Expand Down Expand Up @@ -1670,7 +1672,7 @@ private ExcerptTailer optimizedToEnd() {
return this;
}

final WireStore wireStore = queue.storeForCycle(lastCycle, queue.epoch(), false);
final SingleChronicleQueueStore wireStore = queue.storeForCycle(lastCycle, queue.epoch(), false);
this.setCycle(lastCycle);
if (wireStore == null)
throw new IllegalStateException("Store not found for cycle " + Long.toHexString(lastCycle) + ". Probably the files were removed? lastCycle=" + lastCycle);
Expand Down Expand Up @@ -1868,7 +1870,7 @@ private boolean cycle(final int cycle) {
if (this.cycle == cycle && state == FOUND_CYCLE)
return true;

final WireStore nextStore = queue.storeForCycle(cycle, queue.epoch(), false);
final SingleChronicleQueueStore nextStore = queue.storeForCycle(cycle, queue.epoch(), false);

if (nextStore == null && store == null)
return false;
Expand Down

0 comments on commit 616eac2

Please sign in to comment.