Skip to content

Commit

Permalink
Cleanup Read/Write Context
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Oct 30, 2015
1 parent 9aff28d commit eb69f0c
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 99 deletions.
@@ -0,0 +1,27 @@
/*
*
* Copyright (C) 2015 higherfrequencytrading.com
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package net.openhft.chronicle.queue.impl;

import net.openhft.chronicle.bytes.Bytes;

import java.io.IOException;

@FunctionalInterface
public interface BytesWriter {
long write(Bytes<?> bytes) throws IOException;
}
Expand Up @@ -88,17 +88,17 @@ public ChronicleQueue queue() {
public static class DelegatedAppender extends DefaultAppender<ChronicleQueue> {
private final Bytes<ByteBuffer> buffer;
private final Wire wire;
private final ThrowingAcceptor<Bytes, IOException> consumer;
private final BytesWriter writer;

public DelegatedAppender(
@NotNull ChronicleQueue queue,
@NotNull ThrowingAcceptor<Bytes, IOException> consumer) throws IOException {
@NotNull BytesWriter writer) throws IOException {

super(queue);

this.buffer = elasticByteBuffer();
this.wire = queue.wireType().apply(this.buffer);
this.consumer = consumer;
this.writer = writer;
}

public DelegatedAppender(
Expand All @@ -109,13 +109,16 @@ public DelegatedAppender(

this.buffer = elasticByteBuffer();
this.wire = queue.wireType().apply(this.buffer);
this.consumer = appender::writeBytes;
this.writer = appender::writeBytes;
}

@Override
public long writeDocument(@NotNull WriteMarshallable writer) throws IOException {
this.buffer.clear();
writer.writeMarshallable(this.wire);
writer.writeMarshallable(this.wire);this.buffer.readLimit(this.buffer.writePosition());
this.buffer.readPosition(0);
this.buffer.writePosition(this.buffer.readLimit());
this.buffer.writeLimit(this.buffer.readLimit());

return writeBytes(this.buffer);
}
Expand All @@ -124,20 +127,17 @@ public long writeDocument(@NotNull WriteMarshallable writer) throws IOException
public long writeBytes(@NotNull WriteBytesMarshallable marshallable) throws IOException {
this.buffer.clear();
marshallable.writeMarshallable(this.buffer);
this.buffer.readLimit(this.buffer.writePosition());
this.buffer.readPosition(0);
this.buffer.writePosition(this.buffer.readLimit());
this.buffer.writeLimit(this.buffer.readLimit());

return writeBytes(this.buffer);
}

@Override
public long writeBytes(@NotNull Bytes<?> bytes) throws IOException {
bytes.readLimit(bytes.writePosition());
bytes.readPosition(0);
bytes.writePosition(bytes.readLimit());
bytes.writeLimit(bytes.readLimit());

consumer.accept(bytes);

return WireConstants.NO_INDEX;
return writer.write(bytes);
}
}

Expand Down
Expand Up @@ -23,6 +23,7 @@
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.impl.DelegatedChronicleQueue;
import net.openhft.chronicle.queue.impl.Excerpts;
import net.openhft.chronicle.queue.impl.WireConstants;
import net.openhft.chronicle.queue.impl.ringbuffer.BytesRingBuffer;
import net.openhft.chronicle.threads.EventGroup;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
Expand All @@ -47,29 +48,22 @@ public AsyncChronicleQueue(@NotNull ChronicleQueue queue, long capacity) throws
this.store = NativeBytesStore.nativeStoreWithFixedCapacity(capacity);
this.store.zeroOut(0, this.store.writeLimit());
this.buffer = new BytesRingBuffer(this.store);
this.appender = null;
this.storeAppender = super.createAppender();
this.storeAppender = queue.createAppender();
this.eventGroup = new EventGroup(true);
this.eventGroup.addHandler(this::handleEvent);
this.eventGroup.start();

this.appender = null;
}

@NotNull
@Override
public synchronized ExcerptAppender createAppender() throws IOException {
if(appender != null) {
//TODO: better error management
throw new IllegalStateException("Max 1 appender per queue");
}

return this.appender = new Excerpts.DelegatedAppender(this, bytes -> {
try {
this.buffer.offer(bytes);
} catch(InterruptedException e) {
//TODO: what to do ?
LOGGER.warn("", e);
}
});
return this.appender = new Excerpts.DelegatedAppender(this, this::offer);
}

@Override
Expand All @@ -82,9 +76,20 @@ public void close() throws IOException {
//
// *************************************************************************

private long offer(Bytes<?> bytes) throws IOException {
try {
this.buffer.offer(bytes);
} catch(InterruptedException e) {
//TODO: what to do ?
LOGGER.warn("", e);
}

return WireConstants.NO_INDEX;
}

private boolean handleEvent() throws InvalidEventHandlerException {
try {
return buffer.apply(this::appendBytes) > 0;
return buffer.apply(this::append) > 0;
} catch(InterruptedException e) {
//TODO: what to do
LOGGER.warn("", e);
Expand All @@ -93,7 +98,7 @@ private boolean handleEvent() throws InvalidEventHandlerException {
return false;
}

private void appendBytes(Bytes<?> bytes) {
private void append(Bytes<?> bytes) {
try {
storeAppender.writeBytes(bytes);
} catch(IOException e) {
Expand Down

0 comments on commit eb69f0c

Please sign in to comment.