Skip to content

Commit

Permalink
Add a reader for Single Queues
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Feb 10, 2016
1 parent 22192ce commit ac7bef8
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 11 deletions.
70 changes: 70 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/ChronicleReader.java
@@ -0,0 +1,70 @@
/*
* Copyright 2014 Higher Frequency Trading
*
* http://www.higherfrequencytrading.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.openhft.chronicle.queue;

import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.wire.DocumentContext;

import java.io.File;
import java.io.IOException;

/**
* Display records in a Chronicle in a text form.
*
* @author peter.lawrey
*/
public enum ChronicleReader {
;

public static void main(String... args) throws IOException, InterruptedException {
if (args.length < 1) {
System.err.println("Usage: java " + ChronicleReader.class.getName() + " {chronicle-base-path} [from-index]");
System.exit(-1);
}

String basePath = args[0];
ChronicleQueue ic = SingleChronicleQueueBuilder.binary(new File(basePath)).build();
ExcerptTailer tailer = ic.createTailer();
if (args.length > 1) {
long index = Long.parseLong(args[1]);
while (!tailer.moveToIndex(index))
Thread.sleep(50);
}

//noinspection InfiniteLoopStatement
while (true) {
try (DocumentContext dc = tailer.readingDocument()) {
if (!dc.isPresent()) {
Thread.sleep(50);
continue;
}
System.out.print(Long.toHexString(dc.index()) + ": ");
Bytes<?> bytes = dc.wire().bytes();
byte b0 = bytes.readByte(0);
if (b0 < 0) {
System.out.println(bytes.toHexString());
} else {
System.out.println(bytes.toString());
}
}
System.out.println();
}
}
}
Expand Up @@ -35,12 +35,15 @@

public class SingleChronicleQueueExcerpts {

private static final Logger LOG = LoggerFactory.getLogger(SingleChronicleQueueExcerpts.class);
private static final String ROLL_STRING = "roll";
private static final int ROLL_KEY = BytesUtil.asInt(ROLL_STRING);
private static final int SPB_HEADER_SIZE = 4;
@FunctionalInterface
public interface BytesConsumer {
boolean accept(Bytes<?> bytes)
throws InterruptedException;
}

@FunctionalInterface
public interface WireWriter<T> {
long writeOrAdvanceIfNotEmpty(
Expand All @@ -50,12 +53,6 @@ long writeOrAdvanceIfNotEmpty(
}


private static final Logger LOG = LoggerFactory.getLogger(SingleChronicleQueueExcerpts.class);
private static final String ROLL_STRING = "roll";
private static final int ROLL_KEY = BytesUtil.asInt(ROLL_STRING);
private static final int SPB_HEADER_SIZE = 4;


// *************************************************************************
//
// APPENDERS
Expand Down Expand Up @@ -278,6 +275,11 @@ public Wire wire() {
return wire;
}

@Override
public long index() {
throw new UnsupportedOperationException("todo");
}

@Override
public void close() {
storeAppender.index++;
Expand All @@ -299,8 +301,13 @@ private static class TailerDocumentContext implements DocumentContext {
this.wire = wire;
}

public void start() {
dc.start();
public void start(boolean next, long index) {
dc.start(next, index);
}

@Override
public long index() {
return dc.index();
}

@Override
Expand Down Expand Up @@ -381,8 +388,7 @@ public boolean readBytes(@NotNull final Bytes using) {

@Override
public DocumentContext readingDocument() {
next();
dc.start();
dc.start(next(), index);
return dc;
}

Expand Down

0 comments on commit ac7bef8

Please sign in to comment.