Skip to content

Commit

Permalink
Add pretoucher, separate from appender to maintain position in store …
Browse files Browse the repository at this point in the history
…file. Addresses behaviour in #326
  • Loading branch information
epickrram committed Jul 21, 2017
1 parent 96ec441 commit 2f77273
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 110 deletions.
12 changes: 7 additions & 5 deletions src/main/java/net/openhft/chronicle/queue/PretouchHandler.java
Expand Up @@ -2,18 +2,20 @@

import net.openhft.chronicle.core.threads.EventHandler;
import net.openhft.chronicle.core.threads.HandlerPriority;
import net.openhft.chronicle.queue.impl.single.Pretoucher;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import org.jetbrains.annotations.NotNull;

public final class PretouchHandler implements EventHandler {
private ChronicleQueue queue;
private final Pretoucher pretoucher;

public PretouchHandler(ChronicleQueue queue) {
this.queue = queue;
public PretouchHandler(final SingleChronicleQueue queue) {
this.pretoucher = new Pretoucher(queue);
}

@Override
public boolean action() {
queue.acquireAppender().pretouch();
pretoucher.execute();
return false;
}

Expand All @@ -22,4 +24,4 @@ public boolean action() {
public HandlerPriority priority() {
return HandlerPriority.MONITOR;
}
}
}
@@ -0,0 +1,79 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.bytes.MappedBytes;
import net.openhft.chronicle.bytes.NewChunkListener;
import net.openhft.chronicle.queue.impl.WireStore;

import java.util.function.IntConsumer;

/**
* A class designed to be called from a long-lived thread.
*
* Upon invocation of the {@code execute()} method, this object will pre-touch pages in the supplied queue's underlying store file,
* attempting to keep ahead of any appenders to the queue.
*
* Resources held by this object will be released when the underlying queue is closed.
*
* Alternatively, the {@code shutdown()} method can be called to close the supplied queue and release any other resources.
* Invocation of the {@code execute()} method after {@code shutdown()} has been called with cause an {@code IllegalStateException} to be thrown.
*/
public final class Pretoucher {
private final SingleChronicleQueue queue;
private final NewChunkListener chunkListener;
private final IntConsumer cycleChangedListener;
private final PretoucherState pretoucherState;
private int currentCycle = Integer.MIN_VALUE;
private WireStore currentCycleWireStore;
private MappedBytes currentCycleMappedBytes;

public Pretoucher(final SingleChronicleQueue queue) {
this(queue, null, c -> {});
}

// visible for testing
Pretoucher(final SingleChronicleQueue queue, final NewChunkListener chunkListener,
final IntConsumer cycleChangedListener) {
this.queue = queue;
this.chunkListener = chunkListener;
this.cycleChangedListener = cycleChangedListener;
queue.addCloseListener(this, Pretoucher::releaseResources);
pretoucherState = new PretoucherState(this::getStoreWritePosition);
}

public void execute() {
assignCurrentCycle();
pretoucherState.pretouch(currentCycleMappedBytes);
}

public void shutdown() {
queue.close();
}

private void assignCurrentCycle() {
if (queue.cycle() != currentCycle) {
releaseResources();

currentCycleWireStore = queue.storeForCycle(queue.cycle(), queue.epoch(), true);
currentCycleMappedBytes = currentCycleWireStore.bytes();
currentCycle = queue.cycle();
if (chunkListener != null) {
currentCycleMappedBytes.setNewChunkListener(chunkListener);
}

cycleChangedListener.accept(queue.cycle());
}
}

private long getStoreWritePosition() {
return currentCycleWireStore.writePosition();
}

private void releaseResources() {
if (currentCycleWireStore != null) {
queue.release(currentCycleWireStore);
}
if (currentCycleMappedBytes != null) {
currentCycleMappedBytes.close();
}
}
}

This file was deleted.

@@ -0,0 +1,73 @@
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.bytes.NewChunkListener;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.queue.DirectoryUtils;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.WireType;
import org.junit.After;
import org.junit.Test;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static java.util.stream.IntStream.range;
import static net.openhft.chronicle.queue.DirectoryUtils.tempDir;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;

public class PretoucherTest {
private final File path = tempDir(PretoucherTest.class.getSimpleName());
private final AtomicLong clock = new AtomicLong(System.currentTimeMillis());
private final List<Integer> capturedCycles = new ArrayList<>();
private final CapturingChunkListener chunkListener = new CapturingChunkListener();

@Test
public void shouldHandleCycleRoll() throws Exception {
try (final SingleChronicleQueue queue = createQueue(path, clock::get)) {
final Pretoucher pretoucher = new Pretoucher(queue, chunkListener, capturedCycles::add);

range(0, 10).forEach(i -> {
try (final DocumentContext ctx = queue.acquireAppender().writingDocument()) {
ctx.wire().write().int32(i);
pretoucher.execute();
ctx.wire().write().bytes(new byte[1024]);
}
pretoucher.execute();
clock.addAndGet(TimeUnit.SECONDS.toMillis(5L));
});

assertThat(capturedCycles.size(), is(10));
assertThat(chunkListener.chunkMap.isEmpty(), is(false));
}
}

@After
public void deleteDir() throws Exception {
DirectoryUtils.deleteDir(path);
}

private static final class CapturingChunkListener implements NewChunkListener {
private final TreeMap<String, List<Integer>> chunkMap = new TreeMap<>();

@Override
public void onNewChunk(final String filename, final int chunk, final long delayMicros) {
chunkMap.computeIfAbsent(filename, f -> new ArrayList<>()).add(chunk);
}
}

private static SingleChronicleQueue createQueue(final File path, final TimeProvider timeProvider) {
return SingleChronicleQueueBuilder.
binary(path).
timeProvider(timeProvider).
rollCycle(RollCycles.TEST_SECONDLY).
testBlockSize().
wireType(WireType.BINARY).
build();
}
}

0 comments on commit 2f77273

Please sign in to comment.