Skip to content

Commit

Permalink
started to added delta wire support to chronicle queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Aug 17, 2016
1 parent c8dcf94 commit 4351478
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-bom</artifactId>
<version>1.13.20</version>
<version>1.13.21-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Expand Up @@ -25,7 +25,6 @@
import org.jetbrains.annotations.Nullable;

import java.io.File;
import java.util.TimeZone;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -107,4 +106,6 @@ static SingleChronicleQueueBuilder singleText(String basePath) {
B storeFileListener(StoreFileListener storeFileListener);

StoreFileListener storeFileListener();


}
Expand Up @@ -85,10 +85,4 @@ default void pretouch() {

boolean padToCacheAlign();

/**
* Write an EOF marker on the current cycle if it is about to roll.
* It would do this any way if a new message wis written, but this doesn't create a new cycle or add a message.
*/
void writeEndOfCycleIfRequired();

}
Expand Up @@ -237,12 +237,7 @@ private void resetPosition() throws UnrecoverableTimeoutException {
return;
}


final long headerNumber = store.sequenceForPosition(this, position, true);
// Thread.yield();
// long headerNumber2 = store.sequenceForPosition(this, position);
// assert headerNumber == headerNumber2;
// System.err.println("==== " + Thread.currentThread().getName()+" pos: "+position+" hdr: "+headerNumber);
wire.headerNumber(queue.rollCycle().toIndex(cycle, headerNumber + 1) - 1);
assert lazyIndexing || wire.headerNumber() != -1 || checkIndex(wire.headerNumber(), position);

Expand Down Expand Up @@ -515,7 +510,17 @@ public SingleChronicleQueue queue() {
return queue;
}

private <T> void append(int length, WireWriter<T> wireWriter, T writer) throws UnrecoverableTimeoutException {
/**
* overwritten in delta wire
*
* @param wire
* @param index
*/
void beforeAppend(Wire wire, long index) {
}

private <T> void append(int length, WireWriter<T> wireWriter, T writer) throws
UnrecoverableTimeoutException {

assert checkAppendingThread();
try {
Expand All @@ -525,6 +530,8 @@ private <T> void append(int length, WireWriter<T> wireWriter, T writer) throws U

try {
position(store.writeHeader(wire, length, timeoutMS()));
assert ((AbstractWire) wire).isInsideHeader();
beforeAppend(wire, wire.headerNumber() + 1);
wireWriter.write(writer, wire);
wire.updateHeader(length, position, false);
lastIndex(wire.headerNumber());
Expand Down Expand Up @@ -555,16 +562,20 @@ private void rollCycleTo(int cycle) throws UnrecoverableTimeoutException {

}

@Override
/**
* Write an EOF marker on the current cycle if it is about to roll. It would do this any way
* if a new message wis written, but this doesn't create a new cycle or add a message.
*/
public void writeEndOfCycleIfRequired() {
if (wire != null && queue.cycle() != cycle)
store.writeEOF(wire, timeoutMS());
}

private <T> void append2(int length, WireWriter<T> wireWriter, T writer) throws UnrecoverableTimeoutException, EOFException, StreamCorruptedException {
<T> void append2(int length, WireWriter<T> wireWriter, T writer) throws
UnrecoverableTimeoutException, EOFException, StreamCorruptedException {
setCycle(Math.max(queue.cycle(), cycle + 1), true);
position(store.writeHeader(wire, length, timeoutMS()));

beforeAppend(wire, wire.headerNumber() + 1);
wireWriter.write(writer, wire);
wire.updateHeader(length, position, false);
}
Expand Down
@@ -1,9 +1,9 @@
package net.openhft.chronicle.queue;
package net.openhft.chronicle.queue.impl.single;

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.time.SetTimeProvider;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.impl.RollingChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.wire.DocumentContext;
import org.junit.After;
Expand Down Expand Up @@ -67,7 +67,8 @@ public void test() throws ExecutionException, InterruptedException {
});

timeProvider.currentTimeMillis(2000);
wqueue.acquireAppender().writeEndOfCycleIfRequired();
((SingleChronicleQueueExcerpts.StoreAppender) wqueue.acquireAppender())
.writeEndOfCycleIfRequired();
Jvm.pause(200);
wqueue.acquireAppender().writeText("hello world");
f.get();
Expand Down

0 comments on commit 4351478

Please sign in to comment.