From ec4eac89bdee5a88f256c6c37d5ab5363c13dcbf Mon Sep 17 00:00:00 2001 From: Steven Warwick Date: Thu, 5 Mar 2020 09:53:21 -0500 Subject: [PATCH 1/2] Removing any sort from the MDPOffHeapBuffer to improve performance. Issue #69 No longer any need for a BufferedMessageRouter as we only need to go forward in history past the expected seq number. Version change to 2.1.4 --- core/build.gradle | 2 +- mbp-only/build.gradle | 2 +- mbp-with-mbo/build.gradle | 2 +- .../cme/mdp3/channel/LowLevelMdpChannel.java | 6 +- .../mdp3/control/BufferedMessageRouter.java | 49 ------- .../mdp3/control/GapChannelController.java | 36 ++--- .../cme/mdp3/control/MDPOffHeapBuffer.java | 87 +++++------- .../cme/mdp3/test/control/BufferTest.java | 125 +++++++++--------- .../control/GapChannelControllerTest.java | 35 ++--- 9 files changed, 118 insertions(+), 226 deletions(-) delete mode 100644 mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/BufferedMessageRouter.java diff --git a/core/build.gradle b/core/build.gradle index b325a70..15b5850 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -1,5 +1,5 @@ group 'com.epam.cme' -version '2.1.3' +version '2.1.4' apply plugin: 'java' diff --git a/mbp-only/build.gradle b/mbp-only/build.gradle index b73afd6..7aba9ff 100644 --- a/mbp-only/build.gradle +++ b/mbp-only/build.gradle @@ -6,7 +6,7 @@ plugins { id "me.champeau.gradle.jmh" version "0.5.0-rc-1" } group = 'com.epam.cme' -version = '2.1.3' +version = '2.1.4' sourceCompatibility = 1.8 targetCompatibility = 1.8 diff --git a/mbp-with-mbo/build.gradle b/mbp-with-mbo/build.gradle index 1aff1e0..1f4877a 100644 --- a/mbp-with-mbo/build.gradle +++ b/mbp-with-mbo/build.gradle @@ -6,7 +6,7 @@ plugins { } group = 'com.epam.cme' -version = '2.1.3' +version = '2.1.4' sourceCompatibility = 1.8 targetCompatibility = 1.8 diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/LowLevelMdpChannel.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/LowLevelMdpChannel.java index d6ea69c..7f6dbd8 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/LowLevelMdpChannel.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/LowLevelMdpChannel.java @@ -95,7 +95,7 @@ public class LowLevelMdpChannel implements MdpChannel { incrementalStatistics = outputStatisticsEveryXseconds > 0 ? new IncrementalStatistics(outputStatisticsEveryXseconds) : null; String channelId = channelCfg.getId(); instrumentManager = new MdpInstrumentManager(channelId, listeners); - Buffer buffer = new MDPOffHeapBuffer(incrQueueSize); + MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(incrQueueSize); List> emptyBookConsumers = new ArrayList<>(); ChannelController target = new ChannelControllerRouter(channelId, instrumentManager, mdpMessageTypes, listeners, instrumentObserver, emptyBookConsumers, mboIncrementMessageTemplateIds, mboSnapshotMessageTemplateIds); @@ -110,15 +110,13 @@ public class LowLevelMdpChannel implements MdpChannel { mboCycleHandler = mbpCycleHandler; } recoveryManager = getRecoveryManager(recoveryFeedType); - ChannelController targetForBuffered = new BufferedMessageRouter(channelId, instrumentManager, mdpMessageTypes, - listeners, mboCycleHandler, instrumentObserver, emptyBookConsumers, mboIncrementMessageTemplateIds, mboSnapshotMessageTemplateIds); TCPMessageRequester tcpMessageRequester = null; ConnectionCfg connectionCfg = channelCfg.getConnectionCfg(FeedType.H, Feed.A); if (connectionCfg != null) { TCPChannel tcpChannel = new MdpTCPChannel(connectionCfg); tcpMessageRequester = new MdpTCPMessageRequester<>(channelId, listeners, mdpMessageTypes, tcpChannel, tcpUsername, tcpPassword); } - this.channelController = new GapChannelController(listeners, target, targetForBuffered, recoveryManager, buffer, gapThreshold, maxNumberOfTCPAttempts, + this.channelController = new GapChannelController(listeners, target, recoveryManager, buffer, gapThreshold, maxNumberOfTCPAttempts, channelId, mdpMessageTypes, mboCycleHandler, mbpCycleHandler, scheduledExecutorService, tcpMessageRequester, mboIncrementMessageTemplateIds, mboSnapshotMessageTemplateIds); emptyBookConsumers.add(channelController); if (scheduledExecutorService != null) initChannelStateThread(); diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/BufferedMessageRouter.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/BufferedMessageRouter.java deleted file mode 100644 index d93608a..0000000 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/BufferedMessageRouter.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Copyright 2004-2016 EPAM Systems - * This file is part of Java Market Data Handler for CME Market Data (MDP 3.0). - * Java Market Data Handler for CME Market Data (MDP 3.0) is free software: you can redistribute it and/or modify it under the terms of the - * GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. - * Java Market Data Handler for CME Market Data (MDP 3.0) is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. - * You should have received a copy of the GNU General Public License along with Java Market Data Handler for CME Market Data (MDP 3.0). - * If not, see . - */ - -package com.epam.cme.mdp3.control; - -import com.epam.cme.mdp3.ChannelListener; -import com.epam.cme.mdp3.MdpGroupEntry; -import com.epam.cme.mdp3.MdpMessage; -import com.epam.cme.mdp3.sbe.schema.MdpMessageTypes; - -import java.util.List; -import java.util.function.Consumer; - -public class BufferedMessageRouter extends ChannelControllerRouter { - private SnapshotCycleHandler cycleHandler; - - public BufferedMessageRouter(String channelId, InstrumentManager instrumentManager, MdpMessageTypes mdpMessageTypes, - List channelListeners, SnapshotCycleHandler cycleHandler, - InstrumentObserver instrumentObserver, List> emptyBookConsumers, - List mboIncrementMessageTemplateIds, List mboSnapshotMessageTemplateIds) { - super(channelId, instrumentManager, mdpMessageTypes, channelListeners, instrumentObserver, emptyBookConsumers, mboIncrementMessageTemplateIds, mboSnapshotMessageTemplateIds); - this.cycleHandler = cycleHandler; - } - - @Override - protected void routeMBOEntry(int securityId, MdpMessage mdpMessage, MdpGroupEntry orderIDEntry, MdpGroupEntry mdEntry, long msgSeqNum){ - long snapshotSequence = cycleHandler.getSnapshotSequence(securityId); - if(snapshotSequence < msgSeqNum) { - super.routeMBOEntry(securityId, mdpMessage, orderIDEntry, mdEntry, msgSeqNum); - } - } - - @Override - protected void routeMBPEntry(int securityId, MdpMessage mdpMessage, MdpGroupEntry mdEntry, long msgSeqNum){ - long snapshotSequence = cycleHandler.getSnapshotSequence(securityId); - if(snapshotSequence < msgSeqNum) { - super.routeMBPEntry(securityId, mdpMessage, mdEntry, msgSeqNum); - } - } -} diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java index c442c79..7ea4cc8 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java @@ -32,10 +32,9 @@ public class GapChannelController implements MdpChannelController, Consumer buffer; + private final MDPOffHeapBuffer buffer; private final SnapshotRecoveryManager snapshotRecoveryManager; private final ChannelController target; - private final ChannelController targetForBuffered; private final String channelId; private final SnapshotCycleHandler mboCycleHandler; private final SnapshotCycleHandler mbpCycleHandler; @@ -54,9 +53,8 @@ public class GapChannelController implements MdpChannelController, Consumer mboIncrementMessageTemplateIds; private List mboSnapshotMessageTemplateIds; - public GapChannelController(List channelListeners, ChannelController target, - ChannelController targetForBuffered, SnapshotRecoveryManager snapshotRecoveryManager, - Buffer buffer, int gapThreshold, final int maxNumberOfTCPAttempts, String channelId, MdpMessageTypes mdpMessageTypes, + public GapChannelController(List channelListeners, ChannelController target, SnapshotRecoveryManager snapshotRecoveryManager, + MDPOffHeapBuffer buffer, int gapThreshold, final int maxNumberOfTCPAttempts, String channelId, MdpMessageTypes mdpMessageTypes, SnapshotCycleHandler mboCycleHandler, SnapshotCycleHandler mbpCycleHandler, ScheduledExecutorService executor, TCPMessageRequester tcpMessageRequester, List mboIncrementMessageTemplateIds, List mboSnapshotMessageTemplateIds) { @@ -70,7 +68,6 @@ public GapChannelController(List channelListeners, ChannelContr this.mdpMessageTypes = mdpMessageTypes; this.mboCycleHandler = mboCycleHandler; this.mbpCycleHandler = mbpCycleHandler; - this.targetForBuffered = targetForBuffered; this.executor = executor; if(tcpMessageRequester != null) { TCPPacketListener tcpPacketListener = new TCPPacketListenerImpl(); @@ -179,7 +176,7 @@ public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPac } processMessagesFromBuffer(feedContext); } else if(pkgSequence > expectedSequence) { - buffer.add(mdpPacket); + buffer.add(pkgSequence, mdpPacket); if(pkgSequence > (expectedSequence + gapThreshold)) { if(log.isInfoEnabled()) { log.info("Past gap of {} expected {} current {}, lost count {}", gapThreshold, expectedSequence, pkgSequence, (pkgSequence - 1) - expectedSequence); @@ -208,7 +205,7 @@ public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPac break; case INITIAL: case OUTOFSYNC: - buffer.add(mdpPacket); + buffer.add(pkgSequence, mdpPacket); packetsInBufferDuringInitialOrOutOfSync++; if(log.isTraceEnabled()) { log.trace("Feed {}:{} | handleIncrementalPacket: current state is '{}', so the packet with sequence '{}' has been put into buffer", @@ -243,6 +240,7 @@ public void accept(MdpMessage resetMessage) { lastProcessedSeqNum = 0; smallestSnapshotSequence = 0; highestSnapshotSequence = 0; + buffer.clear(); wasChannelResetInPrcdPacket = true; if(currentState != ChannelState.SYNC) { switchState(ChannelState.SYNC); @@ -263,25 +261,11 @@ private void switchState(ChannelState newState) { } private void processMessagesFromBuffer(MdpFeedContext feedContext){ - while (!buffer.isEmpty()) { - MdpPacket mdpPacket = buffer.remove(); - long pkgSequence = mdpPacket.getMsgSeqNum(); - long expectedSequence = lastProcessedSeqNum + 1; - if(pkgSequence == expectedSequence) { + for (long expectedSequence = lastProcessedSeqNum + 1; expectedSequence <= buffer.getLastMsgSeqNum(); expectedSequence++) { + MdpPacket mdpPacket = buffer.remove(expectedSequence); + if(mdpPacket != null) { target.handleIncrementalPacket(feedContext, mdpPacket); - lastProcessedSeqNum = pkgSequence; - } else if(pkgSequence < expectedSequence && pkgSequence <= highestSnapshotSequence){ - long expectedSmallestSequence = smallestSnapshotSequence + 1; - if(pkgSequence == expectedSmallestSequence){ - targetForBuffered.handleIncrementalPacket(feedContext, mdpPacket); - smallestSnapshotSequence = expectedSmallestSequence; - } if(pkgSequence > expectedSmallestSequence) { - buffer.add(mdpPacket); - break; - } - } else if(pkgSequence > expectedSequence){ - buffer.add(mdpPacket); - break; + lastProcessedSeqNum = mdpPacket.getMsgSeqNum(); } } } diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPOffHeapBuffer.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPOffHeapBuffer.java index 0b6ad0d..ffc3248 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPOffHeapBuffer.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPOffHeapBuffer.java @@ -16,20 +16,14 @@ import com.epam.cme.mdp3.sbe.message.SbeConstants; import net.openhft.chronicle.bytes.NativeBytesStore; -import java.util.Arrays; - -import org.agrona.collections.LongHashSet; - import static com.epam.cme.mdp3.sbe.message.SbeConstants.MESSAGE_SEQ_NUM_OFFSET; -public class MDPOffHeapBuffer implements Buffer { +public class MDPOffHeapBuffer { private final static long UNDEFINED_VALUE = Integer.MAX_VALUE; private final MdpPacket[] data; private MdpPacket resultPacket = MdpPacket.allocate(); private final MdpPacket emptyPacket = MdpPacket.allocate(); - private boolean full; - private LongHashSet msgSeqNums = new LongHashSet(); - private int index = 0; + private long lastMsgSeqNum = 0; public MDPOffHeapBuffer(int capacity) { NativeBytesStore emptyStore = NativeBytesStore.nativeStoreWithFixedCapacity(SbeConstants.MDP_PACKET_MAX_SIZE); @@ -42,64 +36,43 @@ public MDPOffHeapBuffer(int capacity) { data[i] = mdpPacket; } } - - @Override - public synchronized void add(MdpPacket entity) { - if (msgSeqNums.contains(entity.getMsgSeqNum())) { - return; - } - msgSeqNums.add(entity.getMsgSeqNum()); - MdpPacket mdpPacket = data[index++]; - if (full) { - index = 0; // keep at 0 until no longer full because sorting will cause the older message (smallest seq num) to move to the first entry in the list - } else if (index >= data.length) { - index = 0; - full = true; - } - copy(entity, mdpPacket); - sort(); + + public boolean exist(final long msgSeqNum) { + final int pos = (int) msgSeqNum % this.data.length; + final MdpPacket packet = this.data[pos]; + return !isPacketEmpty(packet); } - /** - * It returns the entities in sorted order and removes them after. - * Entry was returned in previous call will be filled the data of next entry. Make sure you took data from it and don't use it anymore. - * @return T or null if buffer is empty. - */ - @Override - public synchronized MdpPacket remove() { - MdpPacket nextPackage = data[0]; - if(isPacketEmpty(nextPackage)){ + public MdpPacket remove(final long msgSeqNum) { + final int pos = (int) msgSeqNum % this.data.length; + MdpPacket nextPacket = this.data[pos]; + if(isPacketEmpty(nextPacket)){ return null; } - copy(nextPackage, resultPacket); - copy(emptyPacket, nextPackage); - System.arraycopy(data, 1, data, 0, data.length - 1); - data[data.length - 1] = nextPackage; - if (full) { - index = data.length - 1; - full = false; - } else { - index--; - } + copy(nextPacket, resultPacket); + copy(emptyPacket, nextPacket); - msgSeqNums.remove(resultPacket.getMsgSeqNum()); return resultPacket; } - - @Override - public boolean isEmpty() { - return !full && isPacketEmpty(data[0]); + + public void add(final long msgSeqNum, final MdpPacket packet) { + final int pos = (int) msgSeqNum % this.data.length; + MdpPacket emptyPacket = data[pos]; + copy(packet, emptyPacket); + this.lastMsgSeqNum = msgSeqNum > this.lastMsgSeqNum ? msgSeqNum : this.lastMsgSeqNum; } - - private void sort(){ - //Arrays.sort is not the best variant here because it allocates TimSort class and array into it every time. - Arrays.sort(data, 0, full ? data.length : index, (o1, o2) -> { - long sequence1 = o1.getMsgSeqNum(); - long sequence2 = o2.getMsgSeqNum(); - return Long.compare(sequence1, sequence2); - }); + + public long getLastMsgSeqNum() { + return this.lastMsgSeqNum; } - + + public void clear() { + for (int i = 0; i < data.length; i++) { + copy(emptyPacket, data[i]); + } + this.lastMsgSeqNum = 0; + } + private void copy(MdpPacket from, MdpPacket to){ to.buffer().copyFrom(from.buffer()); to.length(from.getPacketSize()); diff --git a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/BufferTest.java b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/BufferTest.java index 59c883a..5dd4e59 100644 --- a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/BufferTest.java +++ b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/BufferTest.java @@ -19,13 +19,14 @@ public void elementsMustBeInSequenceOrder(){ MdpPacket n3 = MdpPacket.instance(); n3.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(3)); MdpPacket n4 = MdpPacket.instance(); n4.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(4)); MdpPacket n5 = MdpPacket.instance(); n5.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(5)); - buffer.add(n4); - buffer.add(n1); - buffer.add(n5); - buffer.add(n2); - buffer.add(n3); + buffer.add(4, n4); + buffer.add(1, n1); + buffer.add(5, n5); + buffer.add(2, n2); + buffer.add(3, n3); + assertEquals(5, buffer.getLastMsgSeqNum()); for (int i = 1; i <= 5; i++) { - MdpPacket nextPacket = buffer.remove(); + MdpPacket nextPacket = buffer.remove(i); assertEquals(i, nextPacket.getMsgSeqNum()); } } @@ -36,14 +37,15 @@ public void elementsMustNotDuplicateSequence(){ MdpPacket n1 = MdpPacket.instance(); n1.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); MdpPacket n2 = MdpPacket.instance(); n2.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); MdpPacket n3 = MdpPacket.instance(); n3.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(2)); - buffer.add(n1); - buffer.add(n2); - buffer.add(n3); + buffer.add(1, n1); + buffer.add(1, n2); + buffer.add(2, n3); + assertEquals(2, buffer.getLastMsgSeqNum()); for (int i = 1; i <= 2; i++) { - MdpPacket nextPacket = buffer.remove(); + MdpPacket nextPacket = buffer.remove(i); assertEquals(i, nextPacket.getMsgSeqNum()); } - assertNull(buffer.remove()); + assertNull(buffer.remove(1)); } @Test @@ -51,13 +53,14 @@ public void bufferMustCopyDataFromObject(){ MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); MdpPacket packet = MdpPacket.instance(); packet.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); - buffer.add(packet); + buffer.add(1, packet); packet.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(2)); - buffer.add(packet); + buffer.add(2, packet); packet.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(3)); - buffer.add(packet); + buffer.add(3, packet); + assertEquals(3, buffer.getLastMsgSeqNum()); for (int i = 1; i <= 3; i++) { - MdpPacket nextPacket = buffer.remove(); + MdpPacket nextPacket = buffer.remove(i); assertEquals(i, nextPacket.getMsgSeqNum()); } } @@ -70,17 +73,16 @@ public void lowElementsMustBeRemovedIfBufferIsFull(){ MdpPacket n3 = MdpPacket.instance(); n3.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(3)); MdpPacket n4 = MdpPacket.instance(); n4.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(4)); MdpPacket n5 = MdpPacket.instance(); n5.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(5)); - buffer.add(n1); - buffer.add(n2); - buffer.add(n3); - buffer.add(n5); - buffer.add(n4); - for (int i = 3; i <= 5; i++) { - assertFalse(buffer.isEmpty()); - MdpPacket nextPacket = buffer.remove(); + buffer.add(1, n1); + buffer.add(2, n2); + buffer.add(3, n3); + buffer.add(5, n5); + buffer.add(4, n4); + assertEquals(5, buffer.getLastMsgSeqNum()); + for (int i = 3; i <= 5; i++) { + MdpPacket nextPacket = buffer.remove(i); assertEquals(i, nextPacket.getMsgSeqNum()); } - assertTrue(buffer.isEmpty()); } @Test @@ -91,39 +93,36 @@ public void outOfOrderAdditionAndRemoval(){ MdpPacket n3 = MdpPacket.instance(); n3.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(3)); MdpPacket n4 = MdpPacket.instance(); n4.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(4)); MdpPacket n5 = MdpPacket.instance(); n5.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(5)); - buffer.add(n3); - buffer.add(n1); - buffer.add(n5); - buffer.add(n2); - buffer.add(n4); + buffer.add(3, n3); + buffer.add(1, n1); + buffer.add(5, n5); + buffer.add(2, n2); + buffer.add(4, n4); + assertEquals(5, buffer.getLastMsgSeqNum()); - assertFalse(buffer.isEmpty()); - MdpPacket nextPacket = buffer.remove(); + MdpPacket nextPacket = buffer.remove(3); assertEquals(3, nextPacket.getMsgSeqNum()); - nextPacket = buffer.remove(); + nextPacket = buffer.remove(4); assertEquals(4, nextPacket.getMsgSeqNum()); - nextPacket = buffer.remove(); - assertEquals(5, nextPacket.getMsgSeqNum()); - - assertTrue(buffer.isEmpty()); - + nextPacket = buffer.remove(2); + assertEquals(2, nextPacket.getMsgSeqNum()); + MdpPacket n6 = MdpPacket.instance(); n6.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(6)); MdpPacket n7 = MdpPacket.instance(); n7.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(7)); MdpPacket n8 = MdpPacket.instance(); n8.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(8)); MdpPacket n9 = MdpPacket.instance(); n9.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(9)); - buffer.add(n7); - buffer.add(n8); - buffer.add(n6); - buffer.add(n9); + buffer.add(7, n7); + buffer.add(8, n8); + buffer.add(6, n6); + buffer.add(9, n9); + assertEquals(9, buffer.getLastMsgSeqNum()); - nextPacket = buffer.remove(); + nextPacket = buffer.remove(7); assertEquals(7, nextPacket.getMsgSeqNum()); - nextPacket = buffer.remove(); + nextPacket = buffer.remove(8); assertEquals(8, nextPacket.getMsgSeqNum()); - nextPacket = buffer.remove(); + nextPacket = buffer.remove(9); assertEquals(9, nextPacket.getMsgSeqNum()); - - assertTrue(buffer.isEmpty()); } @Test @@ -137,36 +136,36 @@ public void addAndRemoveHalfFull(){ MdpPacket n6 = MdpPacket.instance(); n6.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(6)); MdpPacket n7 = MdpPacket.instance(); n7.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(7)); MdpPacket n8 = MdpPacket.instance(); n8.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(8)); - buffer.add(n1); - buffer.add(n2); - buffer.add(n3); - buffer.add(n5); - buffer.add(n4); - - MdpPacket nextPacket = buffer.remove(); + buffer.add(1, n1); + buffer.add(2, n2); + buffer.add(3, n3); + buffer.add(5, n5); + buffer.add(4, n4); + assertEquals(5, buffer.getLastMsgSeqNum()); + MdpPacket nextPacket = buffer.remove(1); assertEquals(1, nextPacket.getMsgSeqNum()); - nextPacket = buffer.remove(); + nextPacket = buffer.remove(2); assertEquals(2, nextPacket.getMsgSeqNum()); - buffer.add(n7); - buffer.add(n6); - buffer.add(n8); - + buffer.add(7, n7); + buffer.add(6, n6); + buffer.add(8, n8); + assertEquals(8, buffer.getLastMsgSeqNum()); for (int x = 4; x <= 8; x++) { - nextPacket = buffer.remove(); + nextPacket = buffer.remove(x); assertEquals(x, nextPacket.getMsgSeqNum()); } - assertTrue(buffer.isEmpty()); } @Test public void methodRemoveShouldReturnNullIfBufferIsEmpty(){ MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); MdpPacket n1 = MdpPacket.instance(); n1.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); - buffer.add(n1); - MdpPacket nextPacket = buffer.remove(); + buffer.add(1, n1); + assertEquals(1, buffer.getLastMsgSeqNum()); + MdpPacket nextPacket = buffer.remove(1); assertEquals(1, nextPacket.getMsgSeqNum()); - assertNull(buffer.remove()); + assertNull(buffer.remove(1)); } } \ No newline at end of file diff --git a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/GapChannelControllerTest.java b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/GapChannelControllerTest.java index 7edc0a7..aba58d8 100644 --- a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/GapChannelControllerTest.java +++ b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/GapChannelControllerTest.java @@ -23,7 +23,7 @@ public class GapChannelControllerTest { private GapChannelController gapChannelController; private TestChannelController testChannelController; - private final int bufferCapacity = 10; + private final int bufferCapacity = 100; private final String testChannelId = "1"; private TestSnapshotRecoveryManager testRecoveryManager; private boolean snapshotRecoveryStarted; @@ -36,15 +36,13 @@ public void init() throws Exception { ClassLoader classLoader = getClass().getClassLoader(); MdpMessageTypes mdpMessageTypes = new MdpMessageTypes(classLoader.getResource(Constants.TEMPLATE_NAME).toURI()); testChannelController = new TestChannelController(); - Buffer buffer = new MDPOffHeapBuffer(bufferCapacity); + MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(bufferCapacity); testRecoveryManager = new TestSnapshotRecoveryManager(); SnapshotCycleHandler mboSnapshotCycleHandler = new OffHeapSnapshotCycleHandler(); testChannelListener = new TestChannelListener(); List mboChannelListeners = Collections.singletonList(testChannelListener); instrumentManager = new MdpInstrumentManager("TEST", mboChannelListeners); - ChannelController targetForBuffered = new BufferedMessageRouter(testChannelId, instrumentManager, mdpMessageTypes, - mboChannelListeners, mboSnapshotCycleHandler, new TestInstrumentObserver(testChannelId), Collections.emptyList(), null, null); - gapChannelController = new GapChannelController(mboChannelListeners, testChannelController, targetForBuffered, + gapChannelController = new GapChannelController(mboChannelListeners, testChannelController, testRecoveryManager, buffer, 0, GapChannelController.MAX_NUMBER_OF_TCP_ATTEMPTS, testChannelId, mdpMessageTypes, mboSnapshotCycleHandler, mboSnapshotCycleHandler,null, null, null, null); } @@ -95,19 +93,14 @@ public void duplicateMessagesWhichWereTakenFromBufferAndHaveSeqNumLessThanHighSn sendSnapshotMessage(2, 2, 2, 1, 1, 2); sendSnapshotMessage(1, instrument, instrumentLastMsgSeqNumProcessed, 1, 1, 1);//next cycle - TestChannelListener.IncrementalRefreshEntity incrementMessage = testChannelListener.nextMBOIncrementMessage(); - assertNotNull(incrementMessage); - assertEquals(instrument, incrementMessage.getSecurityId()); - assertEquals(incrementSequence, incrementMessage.getMsgSeqNum()); - incrementMessage = testChannelListener.nextMBPIncrementMessage(); - assertNotNull(incrementMessage); - assertEquals(instrument, incrementMessage.getSecurityId()); - assertEquals(incrementSequence, incrementMessage.getMsgSeqNum()); - assertNull(testChannelListener.nextMBOIncrementMessage()); assertNull(testChannelListener.nextMBPIncrementMessage()); assertNull(testChannelController.nextIncrementalMessage()); + gapChannelController.handleIncrementalPacket(incrementContext, createPacketWithIncrement(++incrementSequence, new int[]{instrument}, new short[]{1}));//duplicate + Pair incr = testChannelController.nextIncrementalMessage(); + assertNotNull(incr); + assertEquals(incr.getValue().getMsgSeqNum(), incrementSequence); } @Test @@ -115,7 +108,6 @@ public void entriesFromIncrementShouldBeSentAccordingToSnapshotSequence() throws int instrument1 = 1, instrument1lastMsgSeqNumProcessed = 1, instrument1Sequence = 2; int instrument2 = 2, instrument2lastMsgSeqNumProcessed = 3, instrument2Sequence = 4; - byte ignored = 0; instrumentManager.registerSecurity(instrument1, ""); final MdpFeedContext incrementContext = new MdpFeedContext(Feed.A, FeedType.I); @@ -127,11 +119,6 @@ public void entriesFromIncrementShouldBeSentAccordingToSnapshotSequence() throws sendSnapshotMessage(3, instrument2, instrument2lastMsgSeqNumProcessed, 1, 1, 3); sendSnapshotMessage(1, instrument1, instrument1lastMsgSeqNumProcessed, 1, 1, 3);//next cycle - TestChannelListener.IncrementalRefreshEntity incrementMessage = testChannelListener.nextMBOIncrementMessage(); - assertNotNull(incrementMessage); - assertEquals(instrument1, incrementMessage.getSecurityId()); - assertEquals(instrument1Sequence, incrementMessage.getMsgSeqNum()); - Pair pair = testChannelController.nextIncrementalMessage(); assertNotNull(pair); assertEquals(instrument2Sequence, pair.getRight().getMsgSeqNum()); @@ -159,11 +146,11 @@ public void itMustHandleMessagesInSequenceOrder() throws Exception { ClassLoader classLoader = getClass().getClassLoader(); MdpMessageTypes mdpMessageTypes = new MdpMessageTypes(classLoader.getResource(Constants.TEMPLATE_NAME).toURI()); testChannelController = new TestChannelController(); - Buffer buffer = new MDPOffHeapBuffer(bufferCapacity); + MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(bufferCapacity); testRecoveryManager = new TestSnapshotRecoveryManager(); int gapThreshold = 3; OffHeapSnapshotCycleHandler cycleHandler = new OffHeapSnapshotCycleHandler(); - gapChannelController = new GapChannelController(Collections.singletonList(testChannelListener), testChannelController, testChannelController, testRecoveryManager, buffer, gapThreshold, GapChannelController.MAX_NUMBER_OF_TCP_ATTEMPTS, testChannelId, mdpMessageTypes, cycleHandler, cycleHandler, null, null, null, null); + gapChannelController = new GapChannelController(Collections.singletonList(testChannelListener), testChannelController, testRecoveryManager, buffer, gapThreshold, GapChannelController.MAX_NUMBER_OF_TCP_ATTEMPTS, testChannelId, mdpMessageTypes, cycleHandler, cycleHandler, null, null, null, null); int lastMsgSeqNumProcessed = 0; @@ -191,14 +178,14 @@ public void itMustRecoverFromTCPChannelCorrectly() throws Exception { ClassLoader classLoader = getClass().getClassLoader(); MdpMessageTypes mdpMessageTypes = new MdpMessageTypes(classLoader.getResource(Constants.TEMPLATE_NAME).toURI()); testChannelController = new TestChannelController(); - Buffer buffer = new MDPOffHeapBuffer(bufferCapacity); + MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(bufferCapacity); testRecoveryManager = new TestSnapshotRecoveryManager(); int gapThreshold = 3; int maxTCPAttempts = 1; // set to one to allow second attempt to test reset of number of TCP attempts after valid recovery OffHeapSnapshotCycleHandler cycleHandler = new OffHeapSnapshotCycleHandler(); ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); gapChannelController = new GapChannelController(Collections.singletonList(testChannelListener), - testChannelController, testChannelController, testRecoveryManager, buffer, gapThreshold, maxTCPAttempts, testChannelId, + testChannelController, testRecoveryManager, buffer, gapThreshold, maxTCPAttempts, testChannelId, mdpMessageTypes, cycleHandler, cycleHandler, executorService, new TestTCPMessageRequester(), null, null); From ca0cbd46ade501980e0626bde64bc2d97afe426e Mon Sep 17 00:00:00 2001 From: Steven Warwick Date: Thu, 5 Mar 2020 14:47:09 -0500 Subject: [PATCH 2/2] Removing Buffer interface which is no longer used. Removing mock MDPHeapBuffer which was not used. Adding an interface for the MDPOffHeapBuffer Updating the MDPOffHeapBuffer to include the interface and also to use an index method for converting a msgseqnum to a data index position. --- .../cme/mdp3/channel/LowLevelMdpChannel.java | 2 +- .../com/epam/cme/mdp3/control/Buffer.java | 31 ---------- .../mdp3/control/GapChannelController.java | 4 +- .../cme/mdp3/control/IMDPOffHeapBuffer.java | 12 ++++ .../epam/cme/mdp3/control/MDPHeapBuffer.java | 61 ------------------- .../cme/mdp3/control/MDPOffHeapBuffer.java | 20 +++--- .../cme/mdp3/test/control/BufferTest.java | 15 ++--- .../control/GapChannelControllerTest.java | 6 +- 8 files changed, 38 insertions(+), 113 deletions(-) delete mode 100644 mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/Buffer.java create mode 100644 mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/IMDPOffHeapBuffer.java delete mode 100644 mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPHeapBuffer.java diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/LowLevelMdpChannel.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/LowLevelMdpChannel.java index 7f6dbd8..650330b 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/LowLevelMdpChannel.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/channel/LowLevelMdpChannel.java @@ -95,7 +95,7 @@ public class LowLevelMdpChannel implements MdpChannel { incrementalStatistics = outputStatisticsEveryXseconds > 0 ? new IncrementalStatistics(outputStatisticsEveryXseconds) : null; String channelId = channelCfg.getId(); instrumentManager = new MdpInstrumentManager(channelId, listeners); - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(incrQueueSize); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(incrQueueSize); List> emptyBookConsumers = new ArrayList<>(); ChannelController target = new ChannelControllerRouter(channelId, instrumentManager, mdpMessageTypes, listeners, instrumentObserver, emptyBookConsumers, mboIncrementMessageTemplateIds, mboSnapshotMessageTemplateIds); diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/Buffer.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/Buffer.java deleted file mode 100644 index c71750e..0000000 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/Buffer.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2004-2016 EPAM Systems - * This file is part of Java Market Data Handler for CME Market Data (MDP 3.0). - * Java Market Data Handler for CME Market Data (MDP 3.0) is free software: you can redistribute it and/or modify it under the terms of the - * GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. - * Java Market Data Handler for CME Market Data (MDP 3.0) is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. - * You should have received a copy of the GNU General Public License along with Java Market Data Handler for CME Market Data (MDP 3.0). - * If not, see . - */ - -package com.epam.cme.mdp3.control; - - -public interface Buffer { - - /** - * It copies the data from the entry. - * @param entity - */ - void add(T entity); - - /** - * It returns the entities in sorted order and removes them after. - * @return T or null if buffer is empty. - */ - T remove(); - - boolean isEmpty(); -} diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java index 7ea4cc8..78d0318 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/GapChannelController.java @@ -32,7 +32,7 @@ public class GapChannelController implements MdpChannelController, Consumer mboSnapshotMessageTemplateIds; public GapChannelController(List channelListeners, ChannelController target, SnapshotRecoveryManager snapshotRecoveryManager, - MDPOffHeapBuffer buffer, int gapThreshold, final int maxNumberOfTCPAttempts, String channelId, MdpMessageTypes mdpMessageTypes, + IMDPOffHeapBuffer buffer, int gapThreshold, final int maxNumberOfTCPAttempts, String channelId, MdpMessageTypes mdpMessageTypes, SnapshotCycleHandler mboCycleHandler, SnapshotCycleHandler mbpCycleHandler, ScheduledExecutorService executor, TCPMessageRequester tcpMessageRequester, List mboIncrementMessageTemplateIds, List mboSnapshotMessageTemplateIds) { diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/IMDPOffHeapBuffer.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/IMDPOffHeapBuffer.java new file mode 100644 index 0000000..34739f9 --- /dev/null +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/IMDPOffHeapBuffer.java @@ -0,0 +1,12 @@ +package com.epam.cme.mdp3.control; + +import com.epam.cme.mdp3.MdpPacket; + +public interface IMDPOffHeapBuffer { + boolean exist(final long msgSeqNum); + MdpPacket remove(final long msgSeqNum); + void add(final long msgSeqNum, final MdpPacket packet); + long getLastMsgSeqNum(); + void clear(); + void clear(final long msgSeqNum); +} diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPHeapBuffer.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPHeapBuffer.java deleted file mode 100644 index 6185a5a..0000000 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPHeapBuffer.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2004-2016 EPAM Systems - * This file is part of Java Market Data Handler for CME Market Data (MDP 3.0). - * Java Market Data Handler for CME Market Data (MDP 3.0) is free software: you can redistribute it and/or modify it under the terms of the - * GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. - * Java Market Data Handler for CME Market Data (MDP 3.0) is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; - * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - * See the GNU General Public License for more details. - * You should have received a copy of the GNU General Public License along with Java Market Data Handler for CME Market Data (MDP 3.0). - * If not, see . - */ - -package com.epam.cme.mdp3.control; - - -import com.epam.cme.mdp3.MdpPacket; - -import java.util.Comparator; -import java.util.PriorityQueue; - -/** - * This is dirty implementation. It has been created as quick implementation of Buffer. - */ -@Deprecated -public class MDPHeapBuffer implements Buffer { - private PriorityQueue queue = new PriorityQueue<>(new MdpPacketComparator()); - private final int capacity; - - public MDPHeapBuffer(int capacity) { - this.capacity = capacity; - } - - @Override - public void add(MdpPacket entity) { - if(queue.size() == capacity){ - queue.poll(); - } - queue.add(entity.copy()); - } - - @Override - public MdpPacket remove() { - return queue.poll(); - } - - @Override - public boolean isEmpty() { - return queue.isEmpty(); - } - - private class MdpPacketComparator implements Comparator{ - - @Override - public int compare(MdpPacket o1, MdpPacket o2) { - long sequence1 = o1.getMsgSeqNum(); - long sequence2 = o2.getMsgSeqNum(); - return Long.compare(sequence1, sequence2); - } - } - -} diff --git a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPOffHeapBuffer.java b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPOffHeapBuffer.java index ffc3248..946ab8e 100644 --- a/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPOffHeapBuffer.java +++ b/mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/MDPOffHeapBuffer.java @@ -18,7 +18,7 @@ import static com.epam.cme.mdp3.sbe.message.SbeConstants.MESSAGE_SEQ_NUM_OFFSET; -public class MDPOffHeapBuffer { +public class MDPOffHeapBuffer implements IMDPOffHeapBuffer { private final static long UNDEFINED_VALUE = Integer.MAX_VALUE; private final MdpPacket[] data; private MdpPacket resultPacket = MdpPacket.allocate(); @@ -38,14 +38,12 @@ public MDPOffHeapBuffer(int capacity) { } public boolean exist(final long msgSeqNum) { - final int pos = (int) msgSeqNum % this.data.length; - final MdpPacket packet = this.data[pos]; + final MdpPacket packet = this.data[index(msgSeqNum)]; return !isPacketEmpty(packet); } public MdpPacket remove(final long msgSeqNum) { - final int pos = (int) msgSeqNum % this.data.length; - MdpPacket nextPacket = this.data[pos]; + MdpPacket nextPacket = this.data[index(msgSeqNum)]; if(isPacketEmpty(nextPacket)){ return null; } @@ -56,9 +54,7 @@ public MdpPacket remove(final long msgSeqNum) { } public void add(final long msgSeqNum, final MdpPacket packet) { - final int pos = (int) msgSeqNum % this.data.length; - MdpPacket emptyPacket = data[pos]; - copy(packet, emptyPacket); + copy(packet, data[index(msgSeqNum)]); this.lastMsgSeqNum = msgSeqNum > this.lastMsgSeqNum ? msgSeqNum : this.lastMsgSeqNum; } @@ -73,6 +69,14 @@ public void clear() { this.lastMsgSeqNum = 0; } + public void clear(final long msgSeqNum) { + copy(emptyPacket, data[index(msgSeqNum)]); + } + + private int index(final long msgSeqNum) { + return (int) msgSeqNum % this.data.length; + } + private void copy(MdpPacket from, MdpPacket to){ to.buffer().copyFrom(from.buffer()); to.length(from.getPacketSize()); diff --git a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/BufferTest.java b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/BufferTest.java index 5dd4e59..38ead33 100644 --- a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/BufferTest.java +++ b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/BufferTest.java @@ -1,6 +1,7 @@ package com.epam.cme.mdp3.test.control; import com.epam.cme.mdp3.MdpPacket; +import com.epam.cme.mdp3.control.IMDPOffHeapBuffer; import com.epam.cme.mdp3.control.MDPOffHeapBuffer; import com.epam.cme.mdp3.test.ModelUtils; import org.junit.Test; @@ -13,7 +14,7 @@ public class BufferTest { @Test public void elementsMustBeInSequenceOrder(){ - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(5); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(5); MdpPacket n1 = MdpPacket.instance(); n1.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); MdpPacket n2 = MdpPacket.instance(); n2.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(2)); MdpPacket n3 = MdpPacket.instance(); n3.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(3)); @@ -33,7 +34,7 @@ public void elementsMustBeInSequenceOrder(){ @Test public void elementsMustNotDuplicateSequence(){ - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(5); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(5); MdpPacket n1 = MdpPacket.instance(); n1.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); MdpPacket n2 = MdpPacket.instance(); n2.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); MdpPacket n3 = MdpPacket.instance(); n3.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(2)); @@ -50,7 +51,7 @@ public void elementsMustNotDuplicateSequence(){ @Test public void bufferMustCopyDataFromObject(){ - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); MdpPacket packet = MdpPacket.instance(); packet.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); buffer.add(1, packet); @@ -67,7 +68,7 @@ public void bufferMustCopyDataFromObject(){ @Test public void lowElementsMustBeRemovedIfBufferIsFull(){ - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); MdpPacket n1 = MdpPacket.instance(); n1.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); MdpPacket n2 = MdpPacket.instance(); n2.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(2)); MdpPacket n3 = MdpPacket.instance(); n3.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(3)); @@ -87,7 +88,7 @@ public void lowElementsMustBeRemovedIfBufferIsFull(){ @Test public void outOfOrderAdditionAndRemoval(){ - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); MdpPacket n1 = MdpPacket.instance(); n1.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); MdpPacket n2 = MdpPacket.instance(); n2.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(2)); MdpPacket n3 = MdpPacket.instance(); n3.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(3)); @@ -127,7 +128,7 @@ public void outOfOrderAdditionAndRemoval(){ @Test public void addAndRemoveHalfFull(){ - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(5); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(5); MdpPacket n1 = MdpPacket.instance(); n1.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); MdpPacket n2 = MdpPacket.instance(); n2.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(2)); MdpPacket n3 = MdpPacket.instance(); n3.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(3)); @@ -160,7 +161,7 @@ public void addAndRemoveHalfFull(){ @Test public void methodRemoveShouldReturnNullIfBufferIsEmpty(){ - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(3); MdpPacket n1 = MdpPacket.instance(); n1.wrapFromBuffer(ModelUtils.getMBOIncrementTestMessage(1)); buffer.add(1, n1); assertEquals(1, buffer.getLastMsgSeqNum()); diff --git a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/GapChannelControllerTest.java b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/GapChannelControllerTest.java index aba58d8..76e3ad8 100644 --- a/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/GapChannelControllerTest.java +++ b/mbp-with-mbo/src/test/java/com/epam/cme/mdp3/test/control/GapChannelControllerTest.java @@ -36,7 +36,7 @@ public void init() throws Exception { ClassLoader classLoader = getClass().getClassLoader(); MdpMessageTypes mdpMessageTypes = new MdpMessageTypes(classLoader.getResource(Constants.TEMPLATE_NAME).toURI()); testChannelController = new TestChannelController(); - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(bufferCapacity); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(bufferCapacity); testRecoveryManager = new TestSnapshotRecoveryManager(); SnapshotCycleHandler mboSnapshotCycleHandler = new OffHeapSnapshotCycleHandler(); testChannelListener = new TestChannelListener(); @@ -146,7 +146,7 @@ public void itMustHandleMessagesInSequenceOrder() throws Exception { ClassLoader classLoader = getClass().getClassLoader(); MdpMessageTypes mdpMessageTypes = new MdpMessageTypes(classLoader.getResource(Constants.TEMPLATE_NAME).toURI()); testChannelController = new TestChannelController(); - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(bufferCapacity); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(bufferCapacity); testRecoveryManager = new TestSnapshotRecoveryManager(); int gapThreshold = 3; OffHeapSnapshotCycleHandler cycleHandler = new OffHeapSnapshotCycleHandler(); @@ -178,7 +178,7 @@ public void itMustRecoverFromTCPChannelCorrectly() throws Exception { ClassLoader classLoader = getClass().getClassLoader(); MdpMessageTypes mdpMessageTypes = new MdpMessageTypes(classLoader.getResource(Constants.TEMPLATE_NAME).toURI()); testChannelController = new TestChannelController(); - MDPOffHeapBuffer buffer = new MDPOffHeapBuffer(bufferCapacity); + IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(bufferCapacity); testRecoveryManager = new TestSnapshotRecoveryManager(); int gapThreshold = 3; int maxTCPAttempts = 1; // set to one to allow second attempt to test reset of number of TCP attempts after valid recovery