Skip to content

Commit

Permalink
Refactoring after a review of the code.
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-lawrey committed Apr 17, 2015
1 parent b2eaf2c commit e128cb1
Show file tree
Hide file tree
Showing 13 changed files with 277 additions and 440 deletions.
Expand Up @@ -25,13 +25,10 @@
import net.openhft.chronicle.queue.Excerpt; import net.openhft.chronicle.queue.Excerpt;
import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.wire.TextWire;
import net.openhft.chronicle.wire.ValueOut;
import net.openhft.chronicle.wire.WireKey; import net.openhft.chronicle.wire.WireKey;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;


import java.io.IOException; import java.io.IOException;
import java.util.function.Consumer;


/** /**
* Created by Rob Austin * Created by Rob Austin
Expand Down Expand Up @@ -68,7 +65,8 @@ public ExcerptTailer createTailer() throws IOException {
@NotNull @NotNull
@Override @Override
public ExcerptAppender createAppender() throws IOException { public ExcerptAppender createAppender() throws IOException {
return new ClientWiredExcerptAppenderStateless(this, hub, TextWire::new); throw new UnsupportedOperationException();
// return new ClientWiredExcerptAppenderStateless(this, hub, TextWire::new);
} }


@Override @Override
Expand All @@ -86,16 +84,15 @@ public long firstAvailableIndex() {
throw new UnsupportedOperationException("todo"); throw new UnsupportedOperationException("todo");
} }


public long lastWrittenIndex() {
return proxyReturnLong(EventId.lastWrittenIndex);
}


@Override @Override
public void close() throws IOException { public void close() throws IOException {
// todo add ref count // todo add ref count
} }


public long lastWrittenIndex() {
return proxyReturnLong(EventId.lastWrittenIndex);
}

enum EventId implements ParameterizeWireKey { enum EventId implements ParameterizeWireKey {
lastWrittenIndex, lastWrittenIndex,
createAppender, createAppender,
Expand Down
Expand Up @@ -29,7 +29,6 @@
import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder; import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.impl.ChronicleWire;
import net.openhft.chronicle.wire.*; import net.openhft.chronicle.wire.*;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -48,9 +47,14 @@
*/ */
public class QueueWireHandler implements WireHandler, Consumer<WireHandlers> { public class QueueWireHandler implements WireHandler, Consumer<WireHandlers> {


private static final Logger LOG = LoggerFactory.getLogger(QueueWireHandler.class);
public static final int SIZE_OF_SIZE = ClientWiredStatelessTcpConnectionHub.SIZE_OF_SIZE; public static final int SIZE_OF_SIZE = ClientWiredStatelessTcpConnectionHub.SIZE_OF_SIZE;

private static final Logger LOG = LoggerFactory.getLogger(QueueWireHandler.class);
final StringBuilder cspText = new StringBuilder();
final StringBuilder eventName = new StringBuilder();
// assume there is a handler for each connection.
long tid = -1;
long cid = -1;
ChronicleQueue queue = null;
private WireHandlers publishLater; private WireHandlers publishLater;
private Wire inWire; private Wire inWire;
private Wire outWire; private Wire outWire;
Expand All @@ -60,13 +64,6 @@ public class QueueWireHandler implements WireHandler, Consumer<WireHandlers> {
private AtomicInteger cidCounter = new AtomicInteger(); private AtomicInteger cidCounter = new AtomicInteger();
private Map<ChronicleQueue, ExcerptAppender> queueToAppender = new ConcurrentHashMap<>(); private Map<ChronicleQueue, ExcerptAppender> queueToAppender = new ConcurrentHashMap<>();


// assume there is a handler for each connection.
long tid = -1;
long cid = -1;
final StringBuilder cspText = new StringBuilder();
final StringBuilder eventName = new StringBuilder();
ChronicleQueue queue = null;

public QueueWireHandler() { public QueueWireHandler() {
} }


Expand Down Expand Up @@ -124,10 +121,13 @@ void onEvent() throws IOException {
}); });


outWire.writeDocument(false, wireOut -> { outWire.writeDocument(false, wireOut -> {
throw new UnsupportedOperationException();
/*
QueueAppenderResponse qar = new QueueAppenderResponse(); QueueAppenderResponse qar = new QueueAppenderResponse();
qar.setCid(cid); qar.setCid(cid);
qar.setCsp(cspText); qar.setCsp(cspText);
wireOut.write(reply).typedMarshallable(qar); wireOut.write(reply).typedMarshallable(qar);
*/
}); });
} else if (EventId.submit.contentEquals(eventName)) { } else if (EventId.submit.contentEquals(eventName)) {
ExcerptAppender appender = queueToAppender.get(queue); ExcerptAppender appender = queueToAppender.get(queue);
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Expand Up @@ -49,9 +49,9 @@ public Indexer(@NotNull final AbstractChronicle chronicle) {
newLongArrayValuesPool(Class<? extends Wire> wireType) { newLongArrayValuesPool(Class<? extends Wire> wireType) {


if (TextWire.class.isAssignableFrom(wireType)) if (TextWire.class.isAssignableFrom(wireType))
return withInitial(LongArrayTextReference::new); return withInitial(TextLongArrayReference::new);
if (BinaryWire.class.isAssignableFrom(wireType)) if (BinaryWire.class.isAssignableFrom(wireType))
return withInitial(LongArrayDirectReference::new); return withInitial(BinaryLongArrayReference::new);
else else
throw new IllegalStateException("todo, unsupported type=" + wireType); throw new IllegalStateException("todo, unsupported type=" + wireType);


Expand Down
Expand Up @@ -22,13 +22,13 @@
import net.openhft.chronicle.bytes.NativeBytes; import net.openhft.chronicle.bytes.NativeBytes;
import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.wire.BinaryWire;
import net.openhft.chronicle.wire.Wire; import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireOut; import net.openhft.chronicle.wire.WireOut;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;


import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;


/** /**
* Created by peter.lawrey on 30/01/15. * Created by peter.lawrey on 30/01/15.
Expand All @@ -37,22 +37,20 @@ public class SingleAppender implements ExcerptAppender {


@NotNull @NotNull
private final DirectChronicleQueue chronicle; private final DirectChronicleQueue chronicle;
@Nullable
private final ChronicleWireOut wireOut;
private final Bytes buffer = NativeBytes.nativeBytes(); private final Bytes buffer = NativeBytes.nativeBytes();
private final Wire wire = new BinaryWire(buffer); private final Wire wire;


private long lastWrittenIndex = -1; private long lastWrittenIndex = -1;


public SingleAppender(ChronicleQueue chronicle) { public SingleAppender(ChronicleQueue chronicle, Function<Bytes, Wire> bytesToWire) {
this.chronicle = (DirectChronicleQueue) chronicle; this.chronicle = (DirectChronicleQueue) chronicle;
wireOut = new ChronicleWireOut(wire); wire = bytesToWire.apply(buffer);
} }


@Nullable @Nullable
@Override @Override
public WireOut wire() { public WireOut wire() {
return wireOut; return wire;
} }


@Override @Override
Expand Down

0 comments on commit e128cb1

Please sign in to comment.