Skip to content
This repository was archived by the owner on Jun 7, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,20 @@ public boolean isReady()
*/
public void catchUp()
{
long lBaseSeconds = (long)iterator.getBaseSeconds() << 32;
logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds));
if (lBaseSeconds > baseSeconds) {
baseSeconds = lBaseSeconds;
}
logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds));
int intervalMillis;

int skippedPayloadTuples = 0;

caughtup = false;
if (isReady()) {
logger.debug("catching up {}->{}", upstream, group);

long lBaseSeconds = (long)iterator.getBaseSeconds() << 32;
logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds));
if (lBaseSeconds > baseSeconds) {
baseSeconds = lBaseSeconds;
}
logger.debug("Set the base seconds to {}", Codec.getStringWindowId(baseSeconds));
int intervalMillis;

int skippedPayloadTuples = 0;

try {
/*
* fast forward to catch up with the windowId without consuming
Expand Down Expand Up @@ -337,8 +339,8 @@ public void boot(EventLoop eventloop)
{
for (PhysicalNode pn : physicalNodes) {
eventloop.disconnect(pn.getClient());
physicalNodes.clear();
}
physicalNodes.clear();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,15 @@ private void handleResetRequest(ResetRequestTuple request, final AbstractLengthP
* @param connection
* @return
*/
public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, AbstractLengthPrependerClient connection)
public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request,
final AbstractLengthPrependerClient connection)
{
String identifier = request.getIdentifier();
String type = request.getStreamType();
String upstream_identifier = request.getUpstreamIdentifier();

// Check if there is a logical node of this type, if not create it.
LogicalNode ln;
final LogicalNode ln;
if (subscriberGroups.containsKey(type)) {
//logger.debug("adding to exiting group = {}", subscriberGroups.get(type));
/*
Expand All @@ -238,15 +239,23 @@ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, Abstra
}

ln = subscriberGroups.get(type);
ln.boot(eventloop);
ln.addConnection(connection);
serverHelperExecutor.submit(new Runnable()
{
@Override
public void run()
{
ln.boot(eventloop);
ln.addConnection(connection);
ln.catchUp();
}
});
} else {
/*
* if there is already a datalist registered for the type in which this client is interested,
* then get a iterator on the data items of that data list. If the datalist is not registered,
* then create one and register it. Hopefully this one would be used by future upstream nodes.
*/
DataList dl;
final DataList dl;
if (publisherBuffers.containsKey(upstream_identifier)) {
dl = publisherBuffers.get(upstream_identifier);
//logger.debug("old list = {}", dl);
Expand All @@ -270,8 +279,16 @@ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, Abstra
}

subscriberGroups.put(type, ln);
ln.addConnection(connection);
dl.addDataListener(ln);
serverHelperExecutor.submit(new Runnable()
{
@Override
public void run()
{
ln.addConnection(connection);
ln.catchUp();
dl.addDataListener(ln);
}
});
}

return ln;
Expand Down Expand Up @@ -460,16 +477,7 @@ public int readSize()
key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
subscriber.registered(key);

final LogicalNode logicalNode = handleSubscriberRequest(subscriberRequest, subscriber);
serverHelperExecutor.submit(new Runnable()
{
@Override
public void run()
{
logicalNode.catchUp();
}

});
handleSubscriberRequest(subscriberRequest, subscriber);
break;

case PURGE_REQUEST:
Expand Down