Skip to content

Commit

Permalink
CHRON-96 moved net.openhft.chronicle.tcp.WithMappedTest.testReplicati…
Browse files Browse the repository at this point in the history
…onWithMapping into its own test
  • Loading branch information
Rob Austin committed Jan 14, 2015
1 parent dfd79dd commit 0365037
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 86 deletions.
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
Expand All @@ -49,10 +50,21 @@ protected synchronized String getTestName() {
return testName.getMethodName(); return testName.getMethodName();
} }


protected synchronized String getTmpDir() { static String getTmpDir() {
return System.getProperty("java.io.tmpdir"); return System.getProperty("java.io.tmpdir");
} }


synchronized String getVanillaTestPath(String suffix) {
final String path = getTmpDir() + "/" + "chronicle-" + testName
.getMethodName() + suffix;
final File f = new File(path);
if (f.exists()) {
f.delete();
}

return path;
}

// ************************************************************************* // *************************************************************************
// //
// ************************************************************************* // *************************************************************************
Expand Down Expand Up @@ -81,7 +93,7 @@ public void onError(SelectableChannel channel, IOException exception) {
} }


public int port() { public int port() {
if(port.get() == -1) { if (port.get() == -1) {
try { try {
this.latch.await(5, TimeUnit.SECONDS); this.latch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Expand Down
Expand Up @@ -125,91 +125,7 @@ public void run() {
} }




@Test
public void testReplicationWithMapping() throws Exception {
final int RUNS = 100;

final String sourceBasePath = getVanillaTestPath("-source");
final String sinkBasePath = getVanillaTestPath("-sink");

final PortSupplier portSupplier = new PortSupplier();

final Chronicle source = ChronicleQueueBuilder.vanilla(sourceBasePath)
.source()
.bindAddress(0)
.connectionListener(portSupplier)
.build();

final int port = portSupplier.getAndCheckPort();
final Chronicle sink = ChronicleQueueBuilder.vanilla(sinkBasePath)
.sink()
.withMapping(new NoOpMappingFunction()) // this is sent to the source
.connectAddress("localhost", port)
.build();

try {

final Thread at = new Thread("th-appender") {
public void run() {
AffinityLock lock = AffinityLock.acquireLock();
try {
final ExcerptAppender appender = source.createAppender();
for (int i = 0; i < RUNS; i++) {
appender.startExcerpt();
long value = 1000000000 + i;
appender.append(value).append(' ');
appender.finish();
}

appender.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.release();
}
}
};

final Thread tt = new Thread("th-tailer") {
public void run() {
AffinityLock lock = AffinityLock.acquireLock();
try {
final ExcerptTailer tailer = sink.createTailer();
for (int i = 0; i < RUNS; i++) {
long value = 1000000000 + i;
assertTrue(tailer.nextIndex());
long val = tailer.parseLong();


assertEquals("i: " + i, value, val);
assertEquals("i: " + i, 0, tailer.remaining());
tailer.finish();
}

tailer.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.release();
}
}
};

at.start();
tt.start();

at.join();
tt.join();
} finally {
sink.close();
sink.clear();

source.close();
source.clear();

assertFalse(new File(sourceBasePath).exists());
assertFalse(new File(sinkBasePath).exists());
}
}




@Test @Test
Expand Down
111 changes: 111 additions & 0 deletions chronicle/src/test/java/net/openhft/chronicle/tcp/WithMappedTest.java
@@ -0,0 +1,111 @@
package net.openhft.chronicle.tcp;

import net.openhft.affinity.AffinityLock;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptTailer;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

import java.io.File;

import static org.junit.Assert.*;

/**
* @author Rob Austin.
*/
public class WithMappedTest extends ChronicleTcpTestBase {

@Rule
public final TestName testName = new TestName();


@Test
public void testReplicationWithMapping() throws Exception {
final int RUNS = 100;

final String sourceBasePath = getVanillaTestPath("-source");
final String sinkBasePath = getVanillaTestPath("-sink");

final ChronicleTcpTestBase.PortSupplier portSupplier = new ChronicleTcpTestBase.PortSupplier();

final Chronicle source = ChronicleQueueBuilder.vanilla(sourceBasePath)
.source()
.bindAddress(0)
.connectionListener(portSupplier)
.build();

final int port = portSupplier.getAndCheckPort();
final Chronicle sink = ChronicleQueueBuilder.vanilla(sinkBasePath)
.sink()
.withMapping(new StatefulVanillaChronicleTest.NoOpMappingFunction()) // this is sent to the source
.connectAddress("localhost", port)
.build();

try {

final Thread at = new Thread("th-appender") {
public void run() {
AffinityLock lock = AffinityLock.acquireLock();
try {
final ExcerptAppender appender = source.createAppender();
for (int i = 0; i < RUNS; i++) {
appender.startExcerpt();
long value = 1000000000 + i;
appender.append(value).append(' ');
appender.finish();
}

appender.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.release();
}
}
};

final Thread tt = new Thread("th-tailer") {
public void run() {
AffinityLock lock = AffinityLock.acquireLock();
try {
final ExcerptTailer tailer = sink.createTailer();
for (int i = 0; i < RUNS; i++) {
long value = 1000000000 + i;
assertTrue(tailer.nextIndex());
long val = tailer.parseLong();

assertEquals("i: " + i, value, val);
assertEquals("i: " + i, 0, tailer.remaining());
tailer.finish();
}

tailer.close();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.release();
}
}
};

at.start();
tt.start();

at.join();
tt.join();
} finally {
sink.close();
sink.clear();

source.close();
source.clear();

assertFalse(new File(sourceBasePath).exists());
assertFalse(new File(sinkBasePath).exists());
}
}

}

0 comments on commit 0365037

Please sign in to comment.