Skip to content

Commit

Permalink
Queue #305 Memory and code leak via ThreadLocal values and suppliers.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Nov 4, 2016
1 parent 568cd6c commit 1b8a5b4
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
Expand Up @@ -24,6 +24,7 @@
import net.openhft.chronicle.core.Maths; import net.openhft.chronicle.core.Maths;
import net.openhft.chronicle.core.annotation.UsedViaReflection; import net.openhft.chronicle.core.annotation.UsedViaReflection;
import net.openhft.chronicle.core.io.Closeable; import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.threads.ThreadLocalHelper;
import net.openhft.chronicle.core.values.LongArrayValues; import net.openhft.chronicle.core.values.LongArrayValues;
import net.openhft.chronicle.core.values.LongValue; import net.openhft.chronicle.core.values.LongValue;
import net.openhft.chronicle.queue.impl.ExcerptContext; import net.openhft.chronicle.queue.impl.ExcerptContext;
Expand All @@ -33,11 +34,11 @@


import java.io.EOFException; import java.io.EOFException;
import java.io.StreamCorruptedException; import java.io.StreamCorruptedException;
import java.lang.ref.WeakReference;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.function.Supplier; import java.util.function.Supplier;


import static java.lang.ThreadLocal.withInitial;
import static net.openhft.chronicle.wire.Wires.NOT_INITIALIZED; import static net.openhft.chronicle.wire.Wires.NOT_INITIALIZED;


/** /**
Expand All @@ -48,8 +49,9 @@ class SCQIndexing implements Demarshallable, WriteMarshallable, Closeable {
private final int indexSpacing, indexSpacingBits; private final int indexSpacing, indexSpacingBits;
private final LongValue index2Index; private final LongValue index2Index;
private final LongValue nextEntryToBeIndexed; private final LongValue nextEntryToBeIndexed;
private final ThreadLocal<LongArrayValuesHolder> index2indexArray; private final Supplier<LongArrayValues> longArraySupplier;
private final ThreadLocal<LongArrayValuesHolder> indexArray; private final ThreadLocal<WeakReference<LongArrayValuesHolder>> index2indexArray;
private final ThreadLocal<WeakReference<LongArrayValuesHolder>> indexArray;
private final WriteMarshallable index2IndexTemplate; private final WriteMarshallable index2IndexTemplate;
private final WriteMarshallable indexTemplate; private final WriteMarshallable indexTemplate;
LongValue writePosition; LongValue writePosition;
Expand Down Expand Up @@ -79,12 +81,21 @@ public SCQIndexing(int indexCount, int indexSpacing, LongValue index2Index, Long
this.indexSpacingBits = Maths.intLog2(indexSpacing); this.indexSpacingBits = Maths.intLog2(indexSpacing);
this.index2Index = index2Index; this.index2Index = index2Index;
this.nextEntryToBeIndexed = nextEntryToBeIndexed; this.nextEntryToBeIndexed = nextEntryToBeIndexed;
this.index2indexArray = withInitial(() -> new LongArrayValuesHolder(longArraySupplier.get())); this.longArraySupplier = longArraySupplier;
this.indexArray = withInitial(() -> new LongArrayValuesHolder(longArraySupplier.get())); this.index2indexArray = new ThreadLocal<>();
this.indexArray = new ThreadLocal<>();
this.index2IndexTemplate = w -> w.writeEventName(() -> "index2index").int64array(indexCount); this.index2IndexTemplate = w -> w.writeEventName(() -> "index2index").int64array(indexCount);
this.indexTemplate = w -> w.writeEventName(() -> "index").int64array(indexCount); this.indexTemplate = w -> w.writeEventName(() -> "index").int64array(indexCount);
} }


private LongArrayValuesHolder getIndex2IndexArray() {
return ThreadLocalHelper.getTL(index2indexArray, longArraySupplier, las -> new LongArrayValuesHolder(las.get()));
}

private LongArrayValuesHolder getIndexArray() {
return ThreadLocalHelper.getTL(indexArray, longArraySupplier, las -> new LongArrayValuesHolder(las.get()));
}

public long toAddress0(long index) { public long toAddress0(long index) {
long siftedIndex = index >> (indexSpacingBits + indexCountBits); long siftedIndex = index >> (indexSpacingBits + indexCountBits);
long mask = indexCount - 1L; long mask = indexCount - 1L;
Expand Down Expand Up @@ -167,7 +178,7 @@ long acquireIndex2Index0(StoreRecovery recovery, ExcerptContext ec, long timeout


@NotNull @NotNull
private LongArrayValues arrayForAddress(@NotNull Wire wire, long secondaryAddress) { private LongArrayValues arrayForAddress(@NotNull Wire wire, long secondaryAddress) {
LongArrayValuesHolder holder = indexArray.get(); LongArrayValuesHolder holder = getIndexArray();
if (holder.address == secondaryAddress) if (holder.address == secondaryAddress)
return holder.values; return holder.values;
holder.address = secondaryAddress; holder.address = secondaryAddress;
Expand Down Expand Up @@ -495,7 +506,7 @@ long sequenceForPosition(@NotNull StoreRecovery recovery,
LongArrayValues getIndex2index(StoreRecovery recovery, ExcerptContext ec, long timeoutMS) throws LongArrayValues getIndex2index(StoreRecovery recovery, ExcerptContext ec, long timeoutMS) throws
EOFException, UnrecoverableTimeoutException, StreamCorruptedException { EOFException, UnrecoverableTimeoutException, StreamCorruptedException {


LongArrayValuesHolder holder = index2indexArray.get(); LongArrayValuesHolder holder = getIndex2IndexArray();
LongArrayValues values = holder.values; LongArrayValues values = holder.values;
if (((Byteable) values).bytesStore() != null || timeoutMS == 0) if (((Byteable) values).bytesStore() != null || timeoutMS == 0)
return values; return values;
Expand Down
Expand Up @@ -21,6 +21,7 @@
import net.openhft.chronicle.core.Jvm; import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS; import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.threads.EventLoop; import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.ThreadLocalHelper;
import net.openhft.chronicle.core.time.TimeProvider; import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.core.util.StringUtils; import net.openhft.chronicle.core.util.StringUtils;
import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptAppender;
Expand All @@ -36,6 +37,7 @@
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.*; import java.io.*;
import java.lang.ref.WeakReference;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
Expand All @@ -56,13 +58,14 @@ public class SingleChronicleQueue implements RollingChronicleQueue {


public static final String SUFFIX = ".cq4"; public static final String SUFFIX = ".cq4";
private static final Logger LOG = LoggerFactory.getLogger(SingleChronicleQueue.class); private static final Logger LOG = LoggerFactory.getLogger(SingleChronicleQueue.class);
protected final ThreadLocal<ExcerptAppender> excerptAppenderThreadLocal = ThreadLocal.withInitial(this::newAppender); protected final ThreadLocal<WeakReference<ExcerptAppender>> excerptAppenderThreadLocal = new ThreadLocal<>();
protected final int sourceId; protected final int sourceId;
final Supplier<Pauser> pauserSupplier; final Supplier<Pauser> pauserSupplier;
final long timeoutMS; final long timeoutMS;
@NotNull @NotNull
final File path; final File path;
final AtomicBoolean isClosed = new AtomicBoolean(); final AtomicBoolean isClosed = new AtomicBoolean();
private final ThreadLocal<WeakReference<ExcerptContext>> tlTailer = new ThreadLocal<>();
@NotNull @NotNull
private final RollCycle rollCycle; private final RollCycle rollCycle;
@NotNull @NotNull
Expand All @@ -86,11 +89,10 @@ public class SingleChronicleQueue implements RollingChronicleQueue {
private final BiFunction<RollingChronicleQueue, Wire, WireStore> storeFactory; private final BiFunction<RollingChronicleQueue, Wire, WireStore> storeFactory;
private final StoreRecoveryFactory recoverySupplier; private final StoreRecoveryFactory recoverySupplier;
private final Set<Runnable> closers = new CopyOnWriteArraySet<>(); private final Set<Runnable> closers = new CopyOnWriteArraySet<>();
private final ThreadLocal<ExcerptContext> tlTailer; private final boolean readOnly;
long firstAndLastCycleTime = 0; long firstAndLastCycleTime = 0;
int firstCycle = Integer.MAX_VALUE, lastCycle = Integer.MIN_VALUE; int firstCycle = Integer.MAX_VALUE, lastCycle = Integer.MIN_VALUE;
private int deltaCheckpointInterval; private int deltaCheckpointInterval;
private final boolean readOnly;


protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builder) { protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builder) {
rollCycle = builder.rollCycle(); rollCycle = builder.rollCycle();
Expand Down Expand Up @@ -126,7 +128,10 @@ protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builde
sourceId = builder.sourceId(); sourceId = builder.sourceId();
recoverySupplier = builder.recoverySupplier(); recoverySupplier = builder.recoverySupplier();
readOnly = builder.readOnly(); readOnly = builder.readOnly();
tlTailer = ThreadLocal.withInitial(() -> new SingleChronicleQueueExcerpts.StoreTailer(this)); }

ExcerptContext getContext() {
return ThreadLocalHelper.getTL(tlTailer, this, SingleChronicleQueueExcerpts.StoreTailer::new);
} }


@NotNull @NotNull
Expand Down Expand Up @@ -273,7 +278,7 @@ public ExcerptAppender acquireAppender() {
if (readOnly) { if (readOnly) {
throw new IllegalStateException("Can't append to a read-only chronicle"); throw new IllegalStateException("Can't append to a read-only chronicle");
} }
return excerptAppenderThreadLocal.get(); return ThreadLocalHelper.getTL(excerptAppenderThreadLocal, this, SingleChronicleQueue::newAppender);
} }


@NotNull @NotNull
Expand All @@ -296,7 +301,7 @@ public int nextCycle(int cycle, @NotNull TailerDirection direction) throws Parse
private long exceptsPerCycle(long cycle) { private long exceptsPerCycle(long cycle) {
WireStore wireStore = storeForCycle((int) cycle, epoch, false); WireStore wireStore = storeForCycle((int) cycle, epoch, false);
try { try {
return wireStore.sequenceForPosition(tlTailer.get(), wireStore.writePosition(), return wireStore.sequenceForPosition(getContext(), wireStore.writePosition(),
true) + 1; true) + 1;
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
Expand Down
Expand Up @@ -77,7 +77,7 @@ public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{ return Arrays.asList(new Object[][]{
// {WireType.TEXT}, // {WireType.TEXT},
{WireType.BINARY}, {WireType.BINARY},
{WireType.DELTA_BINARY} // {WireType.DELTA_BINARY}
//{ WireType.FIELDLESS_BINARY } //{ WireType.FIELDLESS_BINARY }
}); });
} }
Expand Down

0 comments on commit 1b8a5b4

Please sign in to comment.