Skip to content

Commit

Permalink
CHRON-96 fixed the WithMappedTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Austin committed Jan 14, 2015
1 parent bf38e9a commit 33711aa
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 39 deletions.
Expand Up @@ -20,4 +20,11 @@ public Attached withMapping(MappingFunction mappingFunction) {
this.mappingFunction = mappingFunction;
return this;
}

@Override
public String toString() {
return "Attached{" +
"mappingFunction=" + (mappingFunction == null ? "null" : mappingFunction.getClass()
) + '}';
}
}
36 changes: 15 additions & 21 deletions chronicle/src/main/java/net/openhft/chronicle/tcp/SourceTcp.java
Expand Up @@ -314,6 +314,8 @@ protected boolean onRead(final SelectionKey key) throws IOException {
Object attachment = key.attachment();
if (attachment instanceof MappingProvider) {
MappingFunction mappingFunction = readUpTo(size).readObject(MappingFunction.class);
System.out.println(mappingFunction.getClass() + " key=" + key);

((MappingProvider) attachment).withMapping(mappingFunction);
} else
throw new IllegalStateException("Expecting attached to implement MappingFunction.class");
Expand Down Expand Up @@ -358,7 +360,9 @@ private ByteBufferBytes readUpTo(int size) throws IOException {

protected boolean onWrite(final SelectionKey key) throws IOException {
final long now = System.currentTimeMillis();
if (running.get() && !write(key.attachment())) {
Object attachment = key.attachment();

if (running.get() && !write(attachment)) {
if (lastHeartbeat <= now) {
sendSizeAndIndex(ChronicleTcp.IN_SYNC_LEN, 0L);
}
Expand Down Expand Up @@ -445,10 +449,6 @@ protected boolean write(Object attached) throws IOException {
final long size = tailer.capacity();
long remaining = size + ChronicleTcp.HEADER_SIZE;


if (tailer instanceof MappingProvider)
((MappingProvider) tailer).withMapping();

writeBuffer.clear();
writeBuffer.putInt((int) size);
writeBuffer.putLong(tailer.index());
Expand Down Expand Up @@ -560,16 +560,7 @@ protected boolean write(Object attached) throws IOException {
}

pauseReset();
Bytes bytes;
if (attached instanceof MappingProvider) {
bytes = applyMapping(tailer, (MappingProvider) attached);

} else

{
bytes = tailer;
}

Bytes bytes = applyMapping(tailer, attached);

final long size = bytes.capacity();
long remaining = size + ChronicleTcp.HEADER_SIZE;
Expand Down Expand Up @@ -598,7 +589,7 @@ protected boolean write(Object attached) throws IOException {
for (int count = builder.maxExcerptsPerMessage(); (count > 0) && tailer.nextIndex(); ) {
if (!tailer.wasPadding()) {

bytes = applyMapping(tailer, (MappingProvider) attached);
bytes = applyMapping(tailer, attached);

if (hasRoomForExcerpt(writeBuffer, bytes)) {
// if there is free space, copy another one.
Expand All @@ -621,9 +612,7 @@ protected boolean write(Object attached) throws IOException {
connection.writeAll(writeBuffer);
}

if (writeBuffer.remaining() > 0)

{
if (writeBuffer.remaining() > 0) {
throw new EOFException("Failed to send index=" + tailer.index());
}

Expand All @@ -635,12 +624,17 @@ protected boolean write(Object attached) throws IOException {
* applies a mapping if the mapping is not set to {@code}null{code}
*
* @param source
* @param mappingProvider the key attachment
* @param attached the key attachment
* @return returns the tailer or the mapped bytes
* @see
*/
private Bytes applyMapping(final ExcerptTailer source, MappingProvider mappingProvider) {
private Bytes applyMapping(final ExcerptTailer source, Object attached) {

if (!(attached instanceof MappingProvider)) {
return tailer;
}

final MappingProvider mappingProvider = (MappingProvider) attached;
final MappingFunction mappingFunction = mappingProvider.withMapping();

if (mappingFunction == null)
Expand Down
Expand Up @@ -7,7 +7,6 @@
import net.openhft.lang.io.serialization.BytesMarshallable;
import net.openhft.lang.model.constraints.NotNull;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
Expand Down Expand Up @@ -262,13 +261,13 @@ private Collection<MarketData> loadMarketData() throws IOException, ParseExcepti

}

@Ignore("looks like there is a bug the MappingFunction is appearing to be applied to all the sinks, " +
"not just the sink that sent the mapping.")

@Test
public void testReplicationWithPriceMarketDataFilter() throws Throwable {

final String sourceBasePath = getVanillaTestPath("-source");
final String sinkBasePath = getVanillaTestPath("-sink");
final String sinkHighLowBasePath = getVanillaTestPath("-sink-highlow");
final String sinkCloseBasePath = getVanillaTestPath("-sink-close");

final ChronicleTcpTestBase.PortSupplier portSupplier = new ChronicleTcpTestBase.PortSupplier();

Expand All @@ -280,11 +279,7 @@ public void testReplicationWithPriceMarketDataFilter() throws Throwable {

final int port = portSupplier.getAndCheckPort();

final Chronicle closeSink = ChronicleQueueBuilder.vanilla(sinkBasePath)
.sink()
.withMapping(Close.fromMarketData()) // this is sent to the source
.connectAddress("localhost", port)
.build();

try {


Expand All @@ -299,6 +294,8 @@ public void testReplicationWithPriceMarketDataFilter() throws Throwable {

Callable<Void> appenderCallable = new Callable<Void>() {
public Void call() throws Exception {


AffinityLock lock = AffinityLock.acquireLock();
try {
final ExcerptAppender appender = source.createAppender();
Expand All @@ -322,7 +319,7 @@ public Void call() throws Exception {

public Void call() throws Exception {

final Chronicle highLowSink = ChronicleQueueBuilder.vanilla(sinkBasePath)
final Chronicle highLowSink = ChronicleQueueBuilder.vanilla(sinkHighLowBasePath)
.sink()
.withMapping(HighLow.fromMarketData()) // this is sent to the source
.connectAddress("localhost", port)
Expand All @@ -342,7 +339,6 @@ public Void call() throws Exception {
Assert.assertTrue(actual.date > DATE_FORMAT.parse("2014-01-01").getTime());
Assert.assertTrue(actual.date < DATE_FORMAT.parse("2016-01-01").getTime());


Assert.assertTrue(actual.high > 5000);
Assert.assertTrue(actual.high < 8000);

Expand All @@ -360,6 +356,7 @@ public Void call() throws Exception {

} finally {
lock.release();
highLowSink.clear();
}
return null;
}
Expand All @@ -371,6 +368,12 @@ public Void call() throws Exception {

public Void call() throws Exception {

final Chronicle closeSink = ChronicleQueueBuilder.vanilla(sinkCloseBasePath)
.sink()
.withMapping(Close.fromMarketData()) // this is sent to the source
.connectAddress("localhost", port)
.build();

AffinityLock lock = AffinityLock.acquireLock();
try (final ExcerptTailer tailer = closeSink.createTailer()) {

Expand Down Expand Up @@ -406,13 +409,16 @@ public Void call() throws Exception {

} finally {
lock.release();
closeSink.clear();
}

return null;
}


};


try {

ThreadFactory appenderFactory = new ThreadFactory() {
Expand All @@ -428,7 +434,6 @@ public Thread newThread(Runnable r) {
appenderFuture.get(20, TimeUnit.SECONDS);



ThreadFactory closeFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Expand Down Expand Up @@ -458,16 +463,13 @@ public Thread newThread(Runnable r) {


} finally {


closeSink.close();
closeSink.clear();

source.close();
source.clear();

// check cleanup
assertFalse(new File(sourceBasePath).exists());
assertFalse(new File(sinkBasePath).exists());
assertFalse(new File(sinkCloseBasePath).exists());
assertFalse(new File(sinkHighLowBasePath).exists());
}
}

Expand Down

0 comments on commit 33711aa

Please sign in to comment.