Skip to content

Commit

Permalink
Fix a bug in writingDocument where it was failing to roll.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Apr 2, 2016
1 parent 88c9966 commit b14c4bc
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 96 deletions.
Expand Up @@ -37,7 +37,7 @@
public enum ChronicleReader { public enum ChronicleReader {
; ;


public static void main(String... args) throws IOException, InterruptedException { public static void main(String... args) throws IOException {
if (args.length < 1) { if (args.length < 1) {
System.err.println("Usage: java " + ChronicleReader.class.getName() + " {chronicle-base-path} {regex} [from-index]"); System.err.println("Usage: java " + ChronicleReader.class.getName() + " {chronicle-base-path} {regex} [from-index]");
System.exit(-1); System.exit(-1);
Expand All @@ -49,7 +49,7 @@ public static void main(String... args) throws IOException, InterruptedException
tailFileFrom(basePath, regex, index, false); tailFileFrom(basePath, regex, index, false);
} }


public static void tailFileFrom(String basePath, String regex, long index, boolean stopAtEnd) throws IOException { public static void tailFileFrom(String basePath, String regex, long index, boolean stopAtEnd) {
ChronicleQueue ic = SingleChronicleQueueBuilder.binary(new File(basePath)).build(); ChronicleQueue ic = SingleChronicleQueueBuilder.binary(new File(basePath)).build();
ExcerptTailer tailer = ic.createTailer(); ExcerptTailer tailer = ic.createTailer();
if (index > 0) { if (index > 0) {
Expand Down
Expand Up @@ -110,7 +110,7 @@ default <T> MethodWriterBuilder<T> methodWriterBuilder(Class<T> tClass) {
/** /**
* Write a Map as a marshallable * Write a Map as a marshallable
*/ */
default void writeMap(Map<String, Object> map) { default void writeMap(Map<String, ?> map) {
QueueInternal.writeMap(this, map); QueueInternal.writeMap(this, map);
} }
} }
4 changes: 2 additions & 2 deletions src/main/java/net/openhft/chronicle/queue/QueueInternal.java
Expand Up @@ -47,10 +47,10 @@ static Map<String, Object> readMap(ExcerptTailer tailer) {


} }


static void writeMap(ExcerptAppender appender, Map<String, Object> map) { static void writeMap(ExcerptAppender appender, Map<String, ?> map) {
try (DocumentContext context = appender.writingDocument()) { try (DocumentContext context = appender.writingDocument()) {
Wire wire = context.wire(); Wire wire = context.wire();
for (Map.Entry<String, Object> entry : map.entrySet()) { for (Map.Entry<String, ?> entry : map.entrySet()) {
wire.writeEventName(entry.getKey()).object(entry.getValue()); wire.writeEventName(entry.getKey()).object(entry.getValue());
} }
} }
Expand Down
Expand Up @@ -128,7 +128,12 @@ public Wire wire() {


@Override @Override
public DocumentContext writingDocument() { public DocumentContext writingDocument() {
assert checkAppendingThread();
try { try {
int cycle = queue.cycle();
if (this.cycle != cycle)
rollCycleTo(cycle);

position = wire.writeHeader(queue.timeoutMS, TimeUnit.MILLISECONDS); position = wire.writeHeader(queue.timeoutMS, TimeUnit.MILLISECONDS);
metaData = false; metaData = false;
} catch (TimeoutException e) { } catch (TimeoutException e) {
Expand Down Expand Up @@ -157,6 +162,8 @@ public void close() {
store.writePosition(position); store.writePosition(position);
} catch (StreamCorruptedException e) { } catch (StreamCorruptedException e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} finally {
assert resetAppendingThread();
} }
} }


Expand Down Expand Up @@ -296,9 +303,13 @@ private <T> void append2(int length, WireWriter<T> wireWriter, T writer) throws


private boolean checkAppendingThread() { private boolean checkAppendingThread() {
Thread appendingThread = this.appendingThread; Thread appendingThread = this.appendingThread;
if (appendingThread != null) Thread currentThread = Thread.currentThread();
throw new IllegalStateException("Attempting to use Appneder in " + Thread.currentThread() + " while used by " + appendingThread); if (appendingThread != null) {
this.appendingThread = Thread.currentThread(); if (appendingThread == currentThread)
throw new IllegalStateException("Nested blocks of writingDocument() not supported");
throw new IllegalStateException("Attempting to use Appender in " + currentThread + " while used by " + appendingThread);
}
this.appendingThread = currentThread;
return true; return true;
} }


Expand Down Expand Up @@ -350,20 +361,12 @@ public String toString() {


@Override @Override
public boolean readDocument(@NotNull final ReadMarshallable marshaller) { public boolean readDocument(@NotNull final ReadMarshallable marshaller) {
try { return read(marshaller, ReadMarshallable::readMarshallable);
return read(marshaller, ReadMarshallable::readMarshallable, queue.timeoutMS);
} catch (TimeoutException e) {
return false;
}
} }


@Override @Override
public boolean readBytes(@NotNull final Bytes using) { public boolean readBytes(@NotNull final Bytes using) {
try { return read(using, (t, w) -> t.write(w.bytes()));
return read(using, (t, w) -> t.write(w.bytes()), queue.timeoutMS);
} catch (TimeoutException e) {
return false;
}
} }


@Override @Override
Expand All @@ -384,12 +387,9 @@ public String readText() {


@Nullable @Nullable
public boolean readText(StringBuilder sb) { public boolean readText(StringBuilder sb) {
try { if (read(sb, (t, w) ->
if (read(sb, (t, w) -> w.bytes().parseUtf8(sb, (int) w.bytes().readRemaining())))
w.bytes().parseUtf8(sb, (int) w.bytes().readRemaining()), queue.timeoutMS)) return true;
return true;
} catch (TimeoutException e) {
}
sb.setLength(0); sb.setLength(0);
sb.append("No message"); sb.append("No message");
return false; return false;
Expand Down Expand Up @@ -454,11 +454,7 @@ private boolean next(boolean includeMetaData) throws TimeoutException {


@Override @Override
public boolean readBytes(@NotNull final ReadBytesMarshallable using) { public boolean readBytes(@NotNull final ReadBytesMarshallable using) {
try { return read(using, (t, w) -> t.readMarshallable(w.bytes()));
return read(using, (t, w) -> t.readMarshallable(w.bytes()), queue.timeoutMS);
} catch (TimeoutException e) {
return false;
}
} }


/** /**
Expand Down Expand Up @@ -555,7 +551,7 @@ public RollingChronicleQueue queue() {
return queue; return queue;
} }


private <T> boolean read(@NotNull final T t, @NotNull final BiConsumer<T, Wire> c, long timeoutMS) throws TimeoutException { private <T> boolean read(@NotNull final T t, @NotNull final BiConsumer<T, Wire> c) {
if (this.store == null) { if (this.store == null) {
toStart(); toStart();
if (this.store == null) return false; if (this.store == null) return false;
Expand Down

0 comments on commit b14c4bc

Please sign in to comment.