Skip to content

Commit

Permalink
Merge pull request #70 from swarwick/master
Browse files Browse the repository at this point in the history
Issue #69 MBP with MBO Performance
  • Loading branch information
iamolever committed Mar 13, 2020
2 parents 19ce4f5 + ca0cbd4 commit c72a738
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 325 deletions.
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group 'com.epam.cme'
version '2.1.3'
version '2.1.4'

apply plugin: 'java'

Expand Down
2 changes: 1 addition & 1 deletion mbp-only/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
id "me.champeau.gradle.jmh" version "0.5.0-rc-1"
}
group = 'com.epam.cme'
version = '2.1.3'
version = '2.1.4'
sourceCompatibility = 1.8
targetCompatibility = 1.8

Expand Down
2 changes: 1 addition & 1 deletion mbp-with-mbo/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

group = 'com.epam.cme'
version = '2.1.3'
version = '2.1.4'
sourceCompatibility = 1.8
targetCompatibility = 1.8

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public class LowLevelMdpChannel implements MdpChannel {
incrementalStatistics = outputStatisticsEveryXseconds > 0 ? new IncrementalStatistics(outputStatisticsEveryXseconds) : null;
String channelId = channelCfg.getId();
instrumentManager = new MdpInstrumentManager(channelId, listeners);
Buffer<MdpPacket> buffer = new MDPOffHeapBuffer(incrQueueSize);
IMDPOffHeapBuffer buffer = new MDPOffHeapBuffer(incrQueueSize);
List<Consumer<MdpMessage>> emptyBookConsumers = new ArrayList<>();
ChannelController target = new ChannelControllerRouter(channelId, instrumentManager, mdpMessageTypes, listeners,
instrumentObserver, emptyBookConsumers, mboIncrementMessageTemplateIds, mboSnapshotMessageTemplateIds);
Expand All @@ -110,15 +110,13 @@ public class LowLevelMdpChannel implements MdpChannel {
mboCycleHandler = mbpCycleHandler;
}
recoveryManager = getRecoveryManager(recoveryFeedType);
ChannelController targetForBuffered = new BufferedMessageRouter(channelId, instrumentManager, mdpMessageTypes,
listeners, mboCycleHandler, instrumentObserver, emptyBookConsumers, mboIncrementMessageTemplateIds, mboSnapshotMessageTemplateIds);
TCPMessageRequester tcpMessageRequester = null;
ConnectionCfg connectionCfg = channelCfg.getConnectionCfg(FeedType.H, Feed.A);
if (connectionCfg != null) {
TCPChannel tcpChannel = new MdpTCPChannel(connectionCfg);
tcpMessageRequester = new MdpTCPMessageRequester<>(channelId, listeners, mdpMessageTypes, tcpChannel, tcpUsername, tcpPassword);
}
this.channelController = new GapChannelController(listeners, target, targetForBuffered, recoveryManager, buffer, gapThreshold, maxNumberOfTCPAttempts,
this.channelController = new GapChannelController(listeners, target, recoveryManager, buffer, gapThreshold, maxNumberOfTCPAttempts,
channelId, mdpMessageTypes, mboCycleHandler, mbpCycleHandler, scheduledExecutorService, tcpMessageRequester, mboIncrementMessageTemplateIds, mboSnapshotMessageTemplateIds);
emptyBookConsumers.add(channelController);
if (scheduledExecutorService != null) initChannelStateThread();
Expand Down
31 changes: 0 additions & 31 deletions mbp-with-mbo/src/main/java/com/epam/cme/mdp3/control/Buffer.java

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ public class GapChannelController implements MdpChannelController, Consumer<MdpM
private final Lock lock = new ReentrantLock();
private final int gapThreshold;
private final int maxNumberOfTCPAttempts;
private final Buffer<MdpPacket> buffer;
private final IMDPOffHeapBuffer buffer;
private final SnapshotRecoveryManager snapshotRecoveryManager;
private final ChannelController target;
private final ChannelController targetForBuffered;
private final String channelId;
private final SnapshotCycleHandler mboCycleHandler;
private final SnapshotCycleHandler mbpCycleHandler;
Expand All @@ -54,9 +53,8 @@ public class GapChannelController implements MdpChannelController, Consumer<MdpM
private List<Integer> mboIncrementMessageTemplateIds;
private List<Integer> mboSnapshotMessageTemplateIds;

public GapChannelController(List<ChannelListener> channelListeners, ChannelController target,
ChannelController targetForBuffered, SnapshotRecoveryManager snapshotRecoveryManager,
Buffer<MdpPacket> buffer, int gapThreshold, final int maxNumberOfTCPAttempts, String channelId, MdpMessageTypes mdpMessageTypes,
public GapChannelController(List<ChannelListener> channelListeners, ChannelController target, SnapshotRecoveryManager snapshotRecoveryManager,
IMDPOffHeapBuffer buffer, int gapThreshold, final int maxNumberOfTCPAttempts, String channelId, MdpMessageTypes mdpMessageTypes,
SnapshotCycleHandler mboCycleHandler, SnapshotCycleHandler mbpCycleHandler,
ScheduledExecutorService executor, TCPMessageRequester tcpMessageRequester,
List<Integer> mboIncrementMessageTemplateIds, List<Integer> mboSnapshotMessageTemplateIds) {
Expand All @@ -70,7 +68,6 @@ public GapChannelController(List<ChannelListener> channelListeners, ChannelContr
this.mdpMessageTypes = mdpMessageTypes;
this.mboCycleHandler = mboCycleHandler;
this.mbpCycleHandler = mbpCycleHandler;
this.targetForBuffered = targetForBuffered;
this.executor = executor;
if(tcpMessageRequester != null) {
TCPPacketListener tcpPacketListener = new TCPPacketListenerImpl();
Expand Down Expand Up @@ -179,7 +176,7 @@ public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPac
}
processMessagesFromBuffer(feedContext);
} else if(pkgSequence > expectedSequence) {
buffer.add(mdpPacket);
buffer.add(pkgSequence, mdpPacket);
if(pkgSequence > (expectedSequence + gapThreshold)) {
if(log.isInfoEnabled()) {
log.info("Past gap of {} expected {} current {}, lost count {}", gapThreshold, expectedSequence, pkgSequence, (pkgSequence - 1) - expectedSequence);
Expand Down Expand Up @@ -208,7 +205,7 @@ public void handleIncrementalPacket(MdpFeedContext feedContext, MdpPacket mdpPac
break;
case INITIAL:
case OUTOFSYNC:
buffer.add(mdpPacket);
buffer.add(pkgSequence, mdpPacket);
packetsInBufferDuringInitialOrOutOfSync++;
if(log.isTraceEnabled()) {
log.trace("Feed {}:{} | handleIncrementalPacket: current state is '{}', so the packet with sequence '{}' has been put into buffer",
Expand Down Expand Up @@ -243,6 +240,7 @@ public void accept(MdpMessage resetMessage) {
lastProcessedSeqNum = 0;
smallestSnapshotSequence = 0;
highestSnapshotSequence = 0;
buffer.clear();
wasChannelResetInPrcdPacket = true;
if(currentState != ChannelState.SYNC) {
switchState(ChannelState.SYNC);
Expand All @@ -263,25 +261,11 @@ private void switchState(ChannelState newState) {
}

private void processMessagesFromBuffer(MdpFeedContext feedContext){
while (!buffer.isEmpty()) {
MdpPacket mdpPacket = buffer.remove();
long pkgSequence = mdpPacket.getMsgSeqNum();
long expectedSequence = lastProcessedSeqNum + 1;
if(pkgSequence == expectedSequence) {
for (long expectedSequence = lastProcessedSeqNum + 1; expectedSequence <= buffer.getLastMsgSeqNum(); expectedSequence++) {
MdpPacket mdpPacket = buffer.remove(expectedSequence);
if(mdpPacket != null) {
target.handleIncrementalPacket(feedContext, mdpPacket);
lastProcessedSeqNum = pkgSequence;
} else if(pkgSequence < expectedSequence && pkgSequence <= highestSnapshotSequence){
long expectedSmallestSequence = smallestSnapshotSequence + 1;
if(pkgSequence == expectedSmallestSequence){
targetForBuffered.handleIncrementalPacket(feedContext, mdpPacket);
smallestSnapshotSequence = expectedSmallestSequence;
} if(pkgSequence > expectedSmallestSequence) {
buffer.add(mdpPacket);
break;
}
} else if(pkgSequence > expectedSequence){
buffer.add(mdpPacket);
break;
lastProcessedSeqNum = mdpPacket.getMsgSeqNum();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.epam.cme.mdp3.control;

import com.epam.cme.mdp3.MdpPacket;

public interface IMDPOffHeapBuffer {
boolean exist(final long msgSeqNum);
MdpPacket remove(final long msgSeqNum);
void add(final long msgSeqNum, final MdpPacket packet);
long getLastMsgSeqNum();
void clear();
void clear(final long msgSeqNum);
}

This file was deleted.

Loading

0 comments on commit c72a738

Please sign in to comment.