From 1b8a5b46c8b4e7373df20ef4eb70b393f06bed93 Mon Sep 17 00:00:00 2001 From: Peter Lawrey Date: Fri, 4 Nov 2016 14:30:02 +0000 Subject: [PATCH] Queue #305 Memory and code leak via ThreadLocal values and suppliers. --- .../queue/impl/single/SCQIndexing.java | 25 +++++++++++++------ .../impl/single/SingleChronicleQueue.java | 17 ++++++++----- .../impl/single/SingleChronicleQueueTest.java | 2 +- 3 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java index 570cea57b2..95e18d6d77 100755 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SCQIndexing.java @@ -24,6 +24,7 @@ import net.openhft.chronicle.core.Maths; import net.openhft.chronicle.core.annotation.UsedViaReflection; import net.openhft.chronicle.core.io.Closeable; +import net.openhft.chronicle.core.threads.ThreadLocalHelper; import net.openhft.chronicle.core.values.LongArrayValues; import net.openhft.chronicle.core.values.LongValue; import net.openhft.chronicle.queue.impl.ExcerptContext; @@ -33,11 +34,11 @@ import java.io.EOFException; import java.io.StreamCorruptedException; +import java.lang.ref.WeakReference; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Supplier; -import static java.lang.ThreadLocal.withInitial; import static net.openhft.chronicle.wire.Wires.NOT_INITIALIZED; /** @@ -48,8 +49,9 @@ class SCQIndexing implements Demarshallable, WriteMarshallable, Closeable { private final int indexSpacing, indexSpacingBits; private final LongValue index2Index; private final LongValue nextEntryToBeIndexed; - private final ThreadLocal index2indexArray; - private final ThreadLocal indexArray; + private final Supplier longArraySupplier; + private final ThreadLocal> index2indexArray; + private final ThreadLocal> indexArray; private final WriteMarshallable index2IndexTemplate; private final WriteMarshallable indexTemplate; LongValue writePosition; @@ -79,12 +81,21 @@ public SCQIndexing(int indexCount, int indexSpacing, LongValue index2Index, Long this.indexSpacingBits = Maths.intLog2(indexSpacing); this.index2Index = index2Index; this.nextEntryToBeIndexed = nextEntryToBeIndexed; - this.index2indexArray = withInitial(() -> new LongArrayValuesHolder(longArraySupplier.get())); - this.indexArray = withInitial(() -> new LongArrayValuesHolder(longArraySupplier.get())); + this.longArraySupplier = longArraySupplier; + this.index2indexArray = new ThreadLocal<>(); + this.indexArray = new ThreadLocal<>(); this.index2IndexTemplate = w -> w.writeEventName(() -> "index2index").int64array(indexCount); this.indexTemplate = w -> w.writeEventName(() -> "index").int64array(indexCount); } + private LongArrayValuesHolder getIndex2IndexArray() { + return ThreadLocalHelper.getTL(index2indexArray, longArraySupplier, las -> new LongArrayValuesHolder(las.get())); + } + + private LongArrayValuesHolder getIndexArray() { + return ThreadLocalHelper.getTL(indexArray, longArraySupplier, las -> new LongArrayValuesHolder(las.get())); + } + public long toAddress0(long index) { long siftedIndex = index >> (indexSpacingBits + indexCountBits); long mask = indexCount - 1L; @@ -167,7 +178,7 @@ long acquireIndex2Index0(StoreRecovery recovery, ExcerptContext ec, long timeout @NotNull private LongArrayValues arrayForAddress(@NotNull Wire wire, long secondaryAddress) { - LongArrayValuesHolder holder = indexArray.get(); + LongArrayValuesHolder holder = getIndexArray(); if (holder.address == secondaryAddress) return holder.values; holder.address = secondaryAddress; @@ -495,7 +506,7 @@ long sequenceForPosition(@NotNull StoreRecovery recovery, LongArrayValues getIndex2index(StoreRecovery recovery, ExcerptContext ec, long timeoutMS) throws EOFException, UnrecoverableTimeoutException, StreamCorruptedException { - LongArrayValuesHolder holder = index2indexArray.get(); + LongArrayValuesHolder holder = getIndex2IndexArray(); LongArrayValues values = holder.values; if (((Byteable) values).bytesStore() != null || timeoutMS == 0) return values; diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java index 602fcc9d34..d1a869a01c 100755 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java @@ -21,6 +21,7 @@ import net.openhft.chronicle.core.Jvm; import net.openhft.chronicle.core.OS; import net.openhft.chronicle.core.threads.EventLoop; +import net.openhft.chronicle.core.threads.ThreadLocalHelper; import net.openhft.chronicle.core.time.TimeProvider; import net.openhft.chronicle.core.util.StringUtils; import net.openhft.chronicle.queue.ExcerptAppender; @@ -36,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.io.*; +import java.lang.ref.WeakReference; import java.lang.reflect.Method; import java.nio.file.Files; import java.nio.file.Path; @@ -56,13 +58,14 @@ public class SingleChronicleQueue implements RollingChronicleQueue { public static final String SUFFIX = ".cq4"; private static final Logger LOG = LoggerFactory.getLogger(SingleChronicleQueue.class); - protected final ThreadLocal excerptAppenderThreadLocal = ThreadLocal.withInitial(this::newAppender); + protected final ThreadLocal> excerptAppenderThreadLocal = new ThreadLocal<>(); protected final int sourceId; final Supplier pauserSupplier; final long timeoutMS; @NotNull final File path; final AtomicBoolean isClosed = new AtomicBoolean(); + private final ThreadLocal> tlTailer = new ThreadLocal<>(); @NotNull private final RollCycle rollCycle; @NotNull @@ -86,11 +89,10 @@ public class SingleChronicleQueue implements RollingChronicleQueue { private final BiFunction storeFactory; private final StoreRecoveryFactory recoverySupplier; private final Set closers = new CopyOnWriteArraySet<>(); - private final ThreadLocal tlTailer; + private final boolean readOnly; long firstAndLastCycleTime = 0; int firstCycle = Integer.MAX_VALUE, lastCycle = Integer.MIN_VALUE; private int deltaCheckpointInterval; - private final boolean readOnly; protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builder) { rollCycle = builder.rollCycle(); @@ -126,7 +128,10 @@ protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builde sourceId = builder.sourceId(); recoverySupplier = builder.recoverySupplier(); readOnly = builder.readOnly(); - tlTailer = ThreadLocal.withInitial(() -> new SingleChronicleQueueExcerpts.StoreTailer(this)); + } + + ExcerptContext getContext() { + return ThreadLocalHelper.getTL(tlTailer, this, SingleChronicleQueueExcerpts.StoreTailer::new); } @NotNull @@ -273,7 +278,7 @@ public ExcerptAppender acquireAppender() { if (readOnly) { throw new IllegalStateException("Can't append to a read-only chronicle"); } - return excerptAppenderThreadLocal.get(); + return ThreadLocalHelper.getTL(excerptAppenderThreadLocal, this, SingleChronicleQueue::newAppender); } @NotNull @@ -296,7 +301,7 @@ public int nextCycle(int cycle, @NotNull TailerDirection direction) throws Parse private long exceptsPerCycle(long cycle) { WireStore wireStore = storeForCycle((int) cycle, epoch, false); try { - return wireStore.sequenceForPosition(tlTailer.get(), wireStore.writePosition(), + return wireStore.sequenceForPosition(getContext(), wireStore.writePosition(), true) + 1; } catch (Exception e) { throw new IllegalStateException(e); diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java index 5df0f72d06..ee672a3613 100755 --- a/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueTest.java @@ -77,7 +77,7 @@ public static Collection data() { return Arrays.asList(new Object[][]{ // {WireType.TEXT}, {WireType.BINARY}, - {WireType.DELTA_BINARY} +// {WireType.DELTA_BINARY} //{ WireType.FIELDLESS_BINARY } }); }