Skip to content

Commit

Permalink
#315 Use a weak collection to hold resources which need closing to av…
Browse files Browse the repository at this point in the history
…oid a memory leak.
  • Loading branch information
peter-lawrey committed Nov 29, 2016
1 parent 8a5cf6a commit 41d9287
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 9 deletions.
Expand Up @@ -43,7 +43,6 @@
import java.nio.file.Path; import java.nio.file.Path;
import java.text.ParseException; import java.text.ParseException;
import java.util.*; import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -88,7 +87,7 @@ public class SingleChronicleQueue implements RollingChronicleQueue {
@NotNull @NotNull
private final BiFunction<RollingChronicleQueue, Wire, WireStore> storeFactory; private final BiFunction<RollingChronicleQueue, Wire, WireStore> storeFactory;
private final StoreRecoveryFactory recoverySupplier; private final StoreRecoveryFactory recoverySupplier;
private final Set<Runnable> closers = new CopyOnWriteArraySet<>(); private final Map<Object, Consumer> closers = new WeakHashMap<>();
private final boolean readOnly; private final boolean readOnly;
long firstAndLastCycleTime = 0; long firstAndLastCycleTime = 0;
int firstCycle = Integer.MAX_VALUE, lastCycle = Integer.MIN_VALUE; int firstCycle = Integer.MAX_VALUE, lastCycle = Integer.MIN_VALUE;
Expand Down Expand Up @@ -401,8 +400,10 @@ public long countExcerpts(long lowerIndex, long upperIndex) throws IllegalStateE
return result; return result;
} }


public void addCloseListener(Runnable closer) { public <T> void addCloseListener(T key, Consumer<T> closer) {
closers.add(closer); synchronized (closers) {
closers.put(key, closer);
}
} }


@Override @Override
Expand All @@ -414,7 +415,10 @@ public boolean isClosed() {
public void close() { public void close() {
if (isClosed.getAndSet(true)) if (isClosed.getAndSet(true))
return; return;
closers.forEach(Runnable::run); synchronized (closers) {
closers.forEach((k, v) -> v.accept(k));
closers.clear();
}
this.pool.close(); this.pool.close();
} }


Expand Down
Expand Up @@ -71,7 +71,6 @@ public interface InternalAppender {
* // StoreAppender * // StoreAppender
*/ */
static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalAppender { static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalAppender {
static final int HEAD_ROOM = 1 << 20;
@NotNull @NotNull
private final SingleChronicleQueue queue; private final SingleChronicleQueue queue;
private final StoreAppenderContext context; private final StoreAppenderContext context;
Expand All @@ -91,7 +90,7 @@ static class StoreAppender implements ExcerptAppender, ExcerptContext, InternalA


StoreAppender(@NotNull SingleChronicleQueue queue) { StoreAppender(@NotNull SingleChronicleQueue queue) {
this.queue = queue; this.queue = queue;
queue.addCloseListener(StoreAppender.this::close); queue.addCloseListener(this, StoreAppender::close);
context = new StoreAppenderContext(); context = new StoreAppenderContext();
} }


Expand Down Expand Up @@ -132,7 +131,7 @@ public void writeText(CharSequence text) throws UnrecoverableTimeoutException {
} }
} }


private void close() { void close() {
Wire w0 = wireForIndex; Wire w0 = wireForIndex;
wireForIndex = null; wireForIndex = null;
if (w0 != null) if (w0 != null)
Expand Down Expand Up @@ -364,6 +363,7 @@ public DocumentContext writingDocument(long index) {
assert checkAppendingThread(); assert checkAppendingThread();
context.wire = acquireBufferWire(); context.wire = acquireBufferWire();
context.wire.headerNumber(index); context.wire.headerNumber(index);
context.isClosed = false;
return context; return context;
} }


Expand Down Expand Up @@ -789,7 +789,7 @@ public StoreTailer(@NotNull final SingleChronicleQueue queue) {
this.queue = queue; this.queue = queue;
this.setCycle(Integer.MIN_VALUE); this.setCycle(Integer.MIN_VALUE);
this.index = 0; this.index = 0;
queue.addCloseListener(StoreTailer.this::close); queue.addCloseListener(this, StoreTailer::close);
indexSpacingMask = queue.rollCycle().defaultIndexSpacing() - 1; indexSpacingMask = queue.rollCycle().defaultIndexSpacing() - 1;
toStart(); toStart();
} }
Expand Down

0 comments on commit 41d9287

Please sign in to comment.