Skip to content

Commit

Permalink
0003795: More efficient routing for non-transactional channels
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Apr 4, 2019
1 parent 900ac13 commit 32ad700
Showing 1 changed file with 98 additions and 66 deletions.
Expand Up @@ -135,82 +135,23 @@ public void run() {
}

protected void execute() {

long maxPeekAheadSizeInBytes = (long)(Runtime.getRuntime().maxMemory() * percentOfHeapToUse);
ISymmetricDialect symmetricDialect = engine.getSymmetricDialect();
ISqlReadCursor<Data> cursor = null;
processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(engine.getNodeService().findIdentityNodeId(), null,
ProcessType.ROUTER_READER));
processInfo.setCurrentChannelId(context.getChannel().getChannelId());
try {
int lastPeekAheadIndex = 0;
int dataCount = 0;
long maxDataToRoute = context.getChannel().getMaxDataToRoute();
List<Data> peekAheadQueue = new ArrayList<Data>(peekAheadCount);
boolean transactional = !context.getChannel().getBatchAlgorithm()
.equals(NonTransactionalBatchAlgorithm.NAME)
|| !symmetricDialect.supportsTransactionId();

processInfo.setStatus(ProcessStatus.QUERYING);
cursor = prepareCursor();
processInfo.setStatus(ProcessStatus.EXTRACTING);
boolean moreData = true;
while (dataCount < maxDataToRoute || (lastTransactionId != null && transactional)) {
if (moreData && (lastTransactionId != null || peekAheadQueue.size() == 0)) {
moreData = fillPeekAheadQueue(peekAheadQueue, peekAheadCount, cursor);
}

int dataWithSameTransactionIdCount = 0;

while (peekAheadQueue.size() > 0 && lastTransactionId == null &&
dataCount < maxDataToRoute) {
Data data = peekAheadQueue.remove(0);
copyToQueue(data);
dataCount++;
processInfo.incrementCurrentDataCount();
processInfo.setCurrentTableName(data.getTableName());
lastTransactionId = data.getTransactionId();
context.addTransaction(lastTransactionId);
dataWithSameTransactionIdCount++;
}

if (lastTransactionId != null && peekAheadQueue.size() > 0) {
Iterator<Data> datas = peekAheadQueue.iterator();
int index = 0;
while (datas.hasNext() && (dataCount < maxDataToRoute || transactional)) {
Data data = datas.next();
if (lastTransactionId.equals(data.getTransactionId())) {
dataWithSameTransactionIdCount++;
datas.remove();
copyToQueue(data);
dataCount++;
processInfo.incrementCurrentDataCount();
processInfo.setCurrentTableName(data.getTableName());
lastPeekAheadIndex = index;
} else {
context.addTransaction(data.getTransactionId());
index++;
}

}

if (dataWithSameTransactionIdCount == 0 || peekAheadQueue.size()-lastPeekAheadIndex > peekAheadCount) {
lastTransactionId = null;
lastPeekAheadIndex = 0;
}

}

if (!moreData && peekAheadQueue.size() == 0) {
// we've reached the end of the result set
break;
} else if (peekAheadSizeInBytes >= maxPeekAheadSizeInBytes) {
log.info("The peek ahead queue has reached its max size of {} bytes. Finishing reading the current transaction", peekAheadSizeInBytes);
finishTransactionMode = true;
peekAheadQueue.clear();
}
}

if (transactional) {
executeTransactional(cursor);
} else {
executeNonTransactional(cursor);
}

processInfo.setStatus(ProcessStatus.OK);
} catch (Throwable ex) {
processInfo.setStatus(ProcessStatus.ERROR);
Expand All @@ -233,6 +174,97 @@ && isNotBlank(ex.getMessage())
}

}

protected void executeTransactional(ISqlReadCursor<Data> cursor) throws Exception {
long maxPeekAheadSizeInBytes = (long)(Runtime.getRuntime().maxMemory() * percentOfHeapToUse);
int lastPeekAheadIndex = 0;
int dataCount = 0;
long maxDataToRoute = context.getChannel().getMaxDataToRoute();
List<Data> peekAheadQueue = new ArrayList<Data>(peekAheadCount);

processInfo.setStatus(ProcessStatus.QUERYING);
cursor = prepareCursor();
processInfo.setStatus(ProcessStatus.EXTRACTING);
boolean moreData = true;
while (dataCount < maxDataToRoute || (lastTransactionId != null)) {
if (moreData && (lastTransactionId != null || peekAheadQueue.size() == 0)) {
moreData = fillPeekAheadQueue(peekAheadQueue, peekAheadCount, cursor);
}

int dataWithSameTransactionIdCount = 0;

while (peekAheadQueue.size() > 0 && lastTransactionId == null &&
dataCount < maxDataToRoute) {
Data data = peekAheadQueue.remove(0);
copyToQueue(data);
dataCount++;
processInfo.incrementCurrentDataCount();
processInfo.setCurrentTableName(data.getTableName());
lastTransactionId = data.getTransactionId();
context.addTransaction(lastTransactionId);
dataWithSameTransactionIdCount++;
}

if (lastTransactionId != null && peekAheadQueue.size() > 0) {
Iterator<Data> datas = peekAheadQueue.iterator();
int index = 0;
while (datas.hasNext()) {
Data data = datas.next();
if (lastTransactionId.equals(data.getTransactionId())) {
dataWithSameTransactionIdCount++;
datas.remove();
copyToQueue(data);
dataCount++;
processInfo.incrementCurrentDataCount();
processInfo.setCurrentTableName(data.getTableName());
lastPeekAheadIndex = index;
} else {
context.addTransaction(data.getTransactionId());
index++;
}

}

if (dataWithSameTransactionIdCount == 0 || peekAheadQueue.size()-lastPeekAheadIndex > peekAheadCount) {
lastTransactionId = null;
lastPeekAheadIndex = 0;
}

}

if (!moreData && peekAheadQueue.size() == 0) {
// we've reached the end of the result set
break;
} else if (peekAheadSizeInBytes >= maxPeekAheadSizeInBytes) {
log.info("The peek ahead queue has reached its max size of {} bytes. Finishing reading the current transaction", peekAheadSizeInBytes);
finishTransactionMode = true;
peekAheadQueue.clear();
}
}
}


protected void executeNonTransactional(ISqlReadCursor<Data> cursor) throws Exception {
long maxDataToRoute = context.getChannel().getMaxDataToRoute();
List<Data> peekAheadQueue = new ArrayList<Data>(peekAheadCount);
int dataCount = 0;
while (dataCount < maxDataToRoute) {
fillPeekAheadQueue(peekAheadQueue, peekAheadCount, cursor);

if (peekAheadQueue.size() > 0) {
while (peekAheadQueue.size() > 0 && dataCount < maxDataToRoute) {
Data data = peekAheadQueue.remove(0);
copyToQueue(data);
dataCount++;
processInfo.incrementCurrentDataCount();
processInfo.setCurrentTableName(data.getTableName());
}

} else {
break;
}
}
}

protected boolean process(Data data) {
long dataId = data.getDataId();
Expand Down

0 comments on commit 32ad700

Please sign in to comment.