Skip to content

Commit

Permalink
CHRON-22 Add support for remote client access
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli committed Jan 15, 2015
1 parent dc799a2 commit 7597d4a
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 95 deletions.
Expand Up @@ -27,6 +27,7 @@
import net.openhft.lang.model.constraints.NotNull; import net.openhft.lang.model.constraints.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import sun.nio.ch.IOUtil;


import java.io.IOException; import java.io.IOException;
import java.io.StreamCorruptedException; import java.io.StreamCorruptedException;
Expand Down Expand Up @@ -136,21 +137,46 @@ protected AbstractStatelessExcerp() {
} }


private final class StatelessExcerpAppender extends AbstractStatelessExcerp implements ExcerptAppender { private final class StatelessExcerpAppender extends AbstractStatelessExcerp implements ExcerptAppender {
private ByteBuffer readBuffer; private final ByteBuffer readBuffer;
private final ByteBuffer commandBuffer;
private ByteBuffer writeBuffer; private ByteBuffer writeBuffer;
private long lastIndex; private long lastIndex;
private long actionType; private long actionType;


public StatelessExcerpAppender() { public StatelessExcerpAppender() {
super(); super();


int minSize = ChronicleTcp.nextPower2(builder.minBufferSize()); this.writeBuffer = null;
this.readBuffer = ChronicleTcp.createBufferOfSize(12);
this.commandBuffer = ChronicleTcp.createBufferOfSize(16);
this.startAddr = NO_PAGE;
this.positionAddr = NO_PAGE;
this.capacityAddr = NO_PAGE;
this.limitAddr = NO_PAGE;
this.finished = true;
this.lastIndex = -1;
this.actionType = builder.appendRequireAck() ? ChronicleTcp.ACTION_SUBMIT : ChronicleTcp.ACTION_SUBMIT_NOACK;

resizeWriteBuffer(builder.minBufferSize());
}

protected void resizeWriteBuffer(int size) {
if(writeBuffer != null) {
ChronicleTcp.clean(writeBuffer);
}

if(size > Integer.MAX_VALUE) {
throw new IllegalStateException("Excerpt size can't exceed " + Integer.MAX_VALUE);
}

int maxSize = Math.min(Integer.MAX_VALUE, size);
int roundedSize = ChronicleTcp.nextPower2(maxSize, builder.minBufferSize());


this.writeBuffer = ChronicleTcp.createBufferOfSize(16 + minSize); this.writeBuffer = ChronicleTcp.createBufferOfSize(roundedSize);
this.readBuffer = ChronicleTcp.createBufferOfSize(12); this.startAddr = ChronicleTcp.address(this.writeBuffer);
this.finished = true; this.positionAddr = this.startAddr;
this.lastIndex = -1; this.capacityAddr = this.startAddr + roundedSize;
this.actionType = builder.appendRequireAck() ? ChronicleTcp.ACTION_SUBMIT : ChronicleTcp.ACTION_SUBMIT_NOACK; this.limitAddr = this.startAddr + size;
} }


@Override @Override
Expand All @@ -159,32 +185,20 @@ public void startExcerpt() {
} }


@Override @Override
public void startExcerpt(long capacity) { public void startExcerpt(long excerptSize) {
if(!finished) { if(!finished) {
finish(); finish();
} }


if(capacity <= this.writeBuffer.capacity()) { if(excerptSize <= this.writeBuffer.capacity()) {
this.positionAddr = this.startAddr + 16; this.positionAddr = this.startAddr;
this.limitAddr = this.startAddr + 16 + capacity; this.limitAddr = this.startAddr + excerptSize;
} else { } else {
if(writeBuffer != null) { resizeWriteBuffer((int)excerptSize);
ChronicleTcp.clean(writeBuffer);
}

int minSize = ChronicleTcp.nextPower2(builder.minBufferSize());

this.writeBuffer = ChronicleTcp.createBufferOfSize(16 + minSize);
this.startAddr = ChronicleTcp.address(this.writeBuffer);
this.positionAddr = this.startAddr + 16;
this.capacityAddr = this.startAddr + 16 + minSize;
this.limitAddr = this.startAddr + 16 + capacity;
} }


// move limit and position at the expected size, buffer will be filled writeBuffer.limit((int)excerptSize);
// through NativeBytes methods writeBuffer.position((int)excerptSize);
writeBuffer.limit(16 + (int)capacity);
writeBuffer.position(16 + (int)capacity);


finished = false; finished = false;
} }
Expand All @@ -206,19 +220,15 @@ public void finish() {
openConnection(); openConnection();
} }


writeLong(0, this.actionType);
writeLong(8, position() - 16);

writeBuffer.flip();

try { try {
connection.writeAllOrEOF(writeBuffer); connection.writeAction(commandBuffer, actionType, position());
connection.writeAllOrEOF((ByteBuffer)writeBuffer.flip());


if(builder.appendRequireAck()) { if(builder.appendRequireAck()) {
connection.readUpTo(this.readBuffer, ChronicleTcp.HEADER_SIZE); connection.readUpTo(readBuffer, ChronicleTcp.HEADER_SIZE);


int recType = this.readBuffer.getInt(); int recType = readBuffer.getInt();
long recIndex = this.readBuffer.getLong(); long recIndex = readBuffer.getLong();


if (recType == ChronicleTcp.ACK_LEN) { if (recType == ChronicleTcp.ACK_LEN) {
this.lastIndex = recIndex; this.lastIndex = recIndex;
Expand Down Expand Up @@ -447,15 +457,6 @@ public boolean nextIndex() {
return true; return true;
} }


protected boolean advanceIndex() throws IOException {
if(nextIndex()) {
finish();
return true;
} else {
return false;
}
}

@Override @Override
public long findMatch(@NotNull ExcerptComparator comparator) { public long findMatch(@NotNull ExcerptComparator comparator) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
Expand All @@ -465,5 +466,14 @@ public long findMatch(@NotNull ExcerptComparator comparator) {
public void findRange(@NotNull long[] startEnd, @NotNull ExcerptComparator comparator) { public void findRange(@NotNull long[] startEnd, @NotNull ExcerptComparator comparator) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

protected boolean advanceIndex() throws IOException {
if(nextIndex()) {
finish();
return true;
} else {
return false;
}
}
} }
} }
Expand Up @@ -95,10 +95,18 @@ public static int nextPower2(int size) {
return Maths.nextPower2(size, size); return Maths.nextPower2(size, size);
} }


public static int nextPower2(int size, int min) {
return Maths.nextPower2(size, min);
}

public static long nextPower2(long size) { public static long nextPower2(long size) {
return Maths.nextPower2(size, size); return Maths.nextPower2(size, size);
} }


public static long nextPower2(long size, long min) {
return Maths.nextPower2(size, min);
}

public static long address(@NotNull ByteBuffer buffer) { public static long address(@NotNull ByteBuffer buffer) {
return ((DirectBuffer) buffer).address(); return ((DirectBuffer) buffer).address();
} }
Expand Down
Expand Up @@ -25,6 +25,10 @@


import java.io.File; import java.io.File;


import static net.openhft.chronicle.ChronicleQueueBuilder.vanilla;
import static net.openhft.chronicle.ChronicleQueueBuilder.remoteAppender;
import static net.openhft.chronicle.ChronicleQueueBuilder.remoteTailer;

import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;


Expand All @@ -40,7 +44,7 @@ public void testVanillaStatelessExceptionOnCreateTailer() throws Exception {


@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testVanillaStatelessExceptionOnCreatAppenderTwice() throws Exception { public void testVanillaStatelessExceptionOnCreatAppenderTwice() throws Exception {
Chronicle ch = ChronicleQueueBuilder.remoteAppender() Chronicle ch = remoteAppender()
.connectAddress("localhost", 9876) .connectAddress("localhost", 9876)
.build(); .build();


Expand All @@ -49,20 +53,18 @@ public void testVanillaStatelessExceptionOnCreatAppenderTwice() throws Exception
} }


@Test @Test
public void testVanillaStatelessAppender_001() throws Exception { public void testVanillaStatelessAppender() throws Exception {
final String basePathSource = getVanillaTestPath("-source"); final String basePathSource = getVanillaTestPath("-source");
final PortSupplier portSupplier = new PortSupplier(); final PortSupplier portSupplier = new PortSupplier();


final Chronicle source = ChronicleQueueBuilder.vanilla(basePathSource) final Chronicle source = vanilla(basePathSource)
.source() .source()
.bindAddress(0) .bindAddress(0)
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final Chronicle remoteAppender = remoteAppender()

.connectAddress("localhost", portSupplier.getAndCheckPort())
final Chronicle remoteAppender = ChronicleQueueBuilder.remoteAppender()
.connectAddress("localhost", port)
.appendRequireAck(false) .appendRequireAck(false)
.build(); .build();


Expand Down Expand Up @@ -104,23 +106,23 @@ public void testVanillaStatelessAppender_001() throws Exception {
} }


@Test @Test
public void testVanillaStatelessAppender_002() throws Exception { public void testVanillaStatelessAppenderAndTailer() throws Exception {
final String basePathSource = getVanillaTestPath("-source"); final String basePathSource = getVanillaTestPath("-source");
final PortSupplier portSupplier = new PortSupplier(); final PortSupplier portSupplier = new PortSupplier();


final Chronicle source = ChronicleQueueBuilder.vanilla(basePathSource) final Chronicle source = vanilla(basePathSource)
.source() .source()
.bindAddress(0) .bindAddress(0)
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndCheckPort();


final Chronicle remoteAppender = ChronicleQueueBuilder.remoteAppender() final Chronicle remoteAppender = remoteAppender()
.connectAddress("localhost", port) .connectAddress("localhost", port)
.appendRequireAck(false) .appendRequireAck(false)
.build(); .build();
final Chronicle remoteTailer = ChronicleQueueBuilder.remoteTailer() final Chronicle remoteTailer = remoteTailer()
.connectAddress("localhost", port) .connectAddress("localhost", port)
.build(); .build();


Expand Down Expand Up @@ -161,23 +163,23 @@ public void testVanillaStatelessAppender_002() throws Exception {
} }


@Test @Test
public void testVanillaStatelessAppender_003() throws Exception { public void testVanillaStatelessAppenderAndTailerMT() throws Exception {
final String basePathSource = getVanillaTestPath("-source"); final String basePathSource = getVanillaTestPath("-source");
final PortSupplier portSupplier = new PortSupplier(); final PortSupplier portSupplier = new PortSupplier();


final Chronicle source = ChronicleQueueBuilder.vanilla(basePathSource) final Chronicle source = vanilla(basePathSource)
.source() .source()
.bindAddress(0) .bindAddress(0)
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final int port = portSupplier.getAndCheckPort();


final Chronicle remoteAppender = ChronicleQueueBuilder.remoteAppender() final Chronicle remoteAppender = remoteAppender()
.connectAddress("localhost", port) .connectAddress("localhost", port)
.appendRequireAck(false) .appendRequireAck(false)
.build(); .build();
final Chronicle remoteTailer = ChronicleQueueBuilder.remoteTailer() final Chronicle remoteTailer = remoteTailer()
.connectAddress("localhost", port) .connectAddress("localhost", port)
.build(); .build();


Expand Down Expand Up @@ -249,20 +251,18 @@ public void run() {
} }


@Test @Test
public void testVanillaStatelessAppender_004() throws Exception { public void testVanillaStatelessAppenderIndices() throws Exception {
final String basePathSource = getVanillaTestPath("-source"); final String basePathSource = getVanillaTestPath("-source");
final PortSupplier portSupplier = new PortSupplier(); final PortSupplier portSupplier = new PortSupplier();


final Chronicle source = ChronicleQueueBuilder.vanilla(basePathSource) final Chronicle source = vanilla(basePathSource)
.source() .source()
.bindAddress(0) .bindAddress(0)
.connectionListener(portSupplier) .connectionListener(portSupplier)
.build(); .build();


final int port = portSupplier.getAndCheckPort(); final Chronicle remoteAppender = remoteAppender()

.connectAddress("localhost", portSupplier.getAndCheckPort())
final Chronicle remoteAppender = ChronicleQueueBuilder.remoteAppender()
.connectAddress("localhost", port)
.appendRequireAck(true) .appendRequireAck(true)
.build(); .build();


Expand Down Expand Up @@ -297,4 +297,43 @@ public void testVanillaStatelessAppender_004() throws Exception {
assertFalse(new File(basePathSource).exists()); assertFalse(new File(basePathSource).exists());
} }
} }

@Test( expected = IllegalStateException.class)
public void testVanillaStatelessAppenderExceptionOnDisconnect() throws Exception {
final String basePathSource = getVanillaTestPath("-source");
final PortSupplier portSupplier = new PortSupplier();

final Chronicle source = vanilla(basePathSource)
.source()
.bindAddress(0)
.connectionListener(portSupplier)
.build();

final Chronicle remoteAppender = remoteAppender()
.connectAddress("localhost", portSupplier.getAndCheckPort())
.appendRequireAck(true)
.build();

final int items = 100;
final ExcerptAppender appender = remoteAppender.createAppender();

try {
source.clear();

for (long i = 1; i <= items; i++) {
appender.startExcerpt(16);
appender.writeLong(i);
appender.writeLong(i + 1);
appender.finish();

if(i == 10) {
source.close();
}
}
} finally {
source.clear();

assertFalse(new File(basePathSource).exists());
}
}
} }

0 comments on commit 7597d4a

Please sign in to comment.