Skip to content

Commit

Permalink
Add readMap/writeMap and ExcerptHistory support.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Mar 27, 2016
1 parent 8723b61 commit f82da9a
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 7 deletions.
14 changes: 14 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/ExcerptAppender.java
Expand Up @@ -24,6 +24,7 @@

import java.io.StreamCorruptedException;
import java.lang.reflect.Proxy;
import java.util.Map;

/**
* The component that facilitates sequentially writing data to a {@link ChronicleQueue}.
Expand Down Expand Up @@ -79,11 +80,24 @@ default void writeBytes(long index, Bytes<?> bytes) throws StreamCorruptedExcept
*/
int cycle();

/**
* Proxy an interface so each message called is written to a file for replay.
*
* @param tClass primary interface
* @param additional any additional interfaces
* @return a proxy which implements the primary interface (additional interfaces have to be cast)
*/
default <T> T methodWriter(Class<T> tClass, Class... additional) {
Class[] interfaces = ObjectUtils.addAll(tClass, additional);

//noinspection unchecked
return (T) Proxy.newProxyInstance(tClass.getClassLoader(), interfaces, new MethodWriterInvocationHandler(this));
}

/**
* Write a Map as a marshallable
*/
default void writeMap(Map<String, Object> map) {
QueueInternal.writeMap(this, map);
}
}
57 changes: 57 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/ExcerptHistory.java
@@ -0,0 +1,57 @@
/*
*
* * Copyright (C) 2016 higherfrequencytrading.com
* *
* * This program is free software: you can redistribute it and/or modify
* * it under the terms of the GNU Lesser General Public License as published by
* * the Free Software Foundation, either version 3 of the License.
* *
* * This program is distributed in the hope that it will be useful,
* * but WITHOUT ANY WARRANTY; without even the implied warranty of
* * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* * GNU Lesser General Public License for more details.
* *
* * You should have received a copy of the GNU Lesser General Public License
* * along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package net.openhft.chronicle.queue;


import net.openhft.chronicle.wire.Marshallable;

/**
* Created by peter on 27/03/16.
*/
public interface ExcerptHistory extends Marshallable {
/**
* Get the ExcerptHistory to update it or read it.
*
* @return the ExcerptHistory for the current Excerpt.
*/
static ExcerptHistory get() {
return VanillaExcerptHistory.getThreadLocal();
}

/**
* You only need to call this if you wish to override it's behaviour.
*
* @param md to change to the default implementation for this thread.
*/
static void set(ExcerptHistory md) {
VanillaExcerptHistory.setThreadLocal(md);
}

int timings();

long timing(int n);

int sources();

int sourceId(int n);

long sourceIndex(int n);

void reset();
}
14 changes: 12 additions & 2 deletions src/main/java/net/openhft/chronicle/queue/ExcerptTailer.java
Expand Up @@ -21,6 +21,7 @@
import net.openhft.chronicle.wire.ReadMarshallable;
import org.jetbrains.annotations.NotNull;

import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
Expand Down Expand Up @@ -128,10 +129,19 @@ public interface ExcerptTailer extends ExcerptCommon {
/**
* Reads messages from this tails as methods. It returns a BooleanSupplier which returns
*
* @param objects
* @return
* @param objects which implement the methods serialized to the file.
* @return a reader which will read one Excerpt at a time
*/
default MethodReader methodReader(Object... objects) {
return new MethodReader(this, objects);
}

/**
* Read a Map&gt;String, Object&gt; from the content.
*
* @return the Map, or null if no message is waiting.
*/
default Map<String, Object> readMap() {
return QueueInternal.readMap(this);
}
}
8 changes: 8 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/MethodReader.java
Expand Up @@ -70,6 +70,12 @@ public MethodReader(ExcerptTailer tailer, Object... objects) {
});
}
}

if (wireParser.lookup("history") == null) {
wireParser.register(() -> "history", (s, v, $) -> {
v.marshallable(ExcerptHistory.get());
});
}
}

static void logMessage(CharSequence s, ValueIn v) {
Expand All @@ -94,7 +100,9 @@ static void logMessage(CharSequence s, ValueIn v) {
* @return true if there was a message, or false if not.
*/
public boolean readOne() {
ExcerptHistory excerptHistory = ExcerptHistory.get();
for (; ; ) {
excerptHistory.reset();
try (DocumentContext context = tailer.readingDocument()) {
if (context.isMetaData())
continue;
Expand Down
Expand Up @@ -3,6 +3,7 @@
import net.openhft.chronicle.core.util.ObjectUtils;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.ValueOut;
import net.openhft.chronicle.wire.Wire;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
Expand All @@ -27,7 +28,9 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return method.invoke(this, args);
}
try (DocumentContext context = appender.writingDocument()) {
ValueOut valueOut = context.wire()
Wire wire = context.wire();

ValueOut valueOut = wire
.writeEventName(method.getName());
Class[] parameterTypes = parameterMap.get(method);
if (parameterTypes == null)
Expand Down
56 changes: 56 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/NoExcerptHistory.java
@@ -0,0 +1,56 @@
/*
*
* * Copyright (C) 2016 higherfrequencytrading.com
* *
* * This program is free software: you can redistribute it and/or modify
* * it under the terms of the GNU Lesser General Public License as published by
* * the Free Software Foundation, either version 3 of the License.
* *
* * This program is distributed in the hope that it will be useful,
* * but WITHOUT ANY WARRANTY; without even the implied warranty of
* * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* * GNU Lesser General Public License for more details.
* *
* * You should have received a copy of the GNU Lesser General Public License
* * along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package net.openhft.chronicle.queue;

/**
* Created by peter on 27/03/16.
*/
public enum NoExcerptHistory implements ExcerptHistory {
INSTANCE;

@Override
public int timings() {
return 0;
}

@Override
public long timing(int n) {
return -1;
}

@Override
public int sources() {
return 0;
}

@Override
public int sourceId(int n) {
return -1;
}

@Override
public long sourceIndex(int n) {
return -1;
}

@Override
public void reset() {

}
}
60 changes: 60 additions & 0 deletions src/main/java/net/openhft/chronicle/queue/QueueInternal.java
@@ -0,0 +1,60 @@
/*
*
* * Copyright (C) 2016 higherfrequencytrading.com
* *
* * This program is free software: you can redistribute it and/or modify
* * it under the terms of the GNU Lesser General Public License as published by
* * the Free Software Foundation, either version 3 of the License.
* *
* * This program is distributed in the hope that it will be useful,
* * but WITHOUT ANY WARRANTY; without even the implied warranty of
* * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* * GNU Lesser General Public License for more details.
* *
* * You should have received a copy of the GNU Lesser General Public License
* * along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

package net.openhft.chronicle.queue;

import net.openhft.chronicle.core.pool.StringInterner;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.Wires;

import java.util.LinkedHashMap;
import java.util.Map;

/**
* Created by peter on 27/03/16.
*/
enum QueueInternal {
;
static final StringInterner INTERNER = new StringInterner(128);

static Map<String, Object> readMap(ExcerptTailer tailer) {
try (DocumentContext context = tailer.readingDocument()) {
if (!context.isData())
return null;
LinkedHashMap<String, Object> map = new LinkedHashMap<>();
StringBuilder sb = Wires.acquireStringBuilder();
Wire wire = context.wire();
while (wire.hasMore()) {
Object object = wire.readEventName(sb).object();
map.put(INTERNER.intern(sb), object);
}
return map;
}

}

static void writeMap(ExcerptAppender appender, Map<String, Object> map) {
try (DocumentContext context = appender.writingDocument()) {
Wire wire = context.wire();
for (Map.Entry<String, Object> entry : map.entrySet()) {
wire.writeEventName(entry.getKey()).object(entry.getValue());
}
}
}
}

0 comments on commit f82da9a

Please sign in to comment.