Skip to content

Commit

Permalink
0006055: Multi-threaded routing by channel
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Oct 26, 2023
1 parent 5d8f54f commit 2de9633
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 54 deletions.
Expand Up @@ -207,6 +207,9 @@ private ParameterConstants() {
public final static String ROUTING_GAPS_USE_TRANSACTION_VIEW = "routing.gaps.use.transaction.view";
public final static String ROUTING_GAPS_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS = "routing.gaps.transaction.view.clock.sync.threshold";
public final static String ROUTING_MAX_BATCH_SIZE_EXCEED_PERCENT = "routing.max.batch.size.exceed.percent";
public final static String ROUTING_USE_CHANNEL_THREADS = "routing.use.channel.threads";
public final static String ROUTING_THREAD_COUNT_PER_SERVER = "routing.thread.per.server.count";
public final static String ROUTING_LOCK_TIMEOUT_MS = "routing.lock.timeout.ms";
public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates";
@Deprecated
public final static String INCOMING_BATCH_DELETE_ON_LOAD = "incoming.batch.delete.on.load";
Expand Down
Expand Up @@ -156,7 +156,7 @@ public CsvData next() {
data.setNoBinaryOldData(requiresLobSelectedFromSource || dialectHasNoOldBinaryData);
outgoingBatch.incrementExtractRowCount();
outgoingBatch.incrementExtractRowCount(data.getDataEventType());
if (data.getDataEventType().equals(DataEventType.INSERT) || data.getDataEventType().equals(DataEventType.UPDATE)) {
if (!containsBigLob && (data.getDataEventType().equals(DataEventType.INSERT) || data.getDataEventType().equals(DataEventType.UPDATE))) {
int expectedCommaCount = triggerHistory.getParsedColumnNames().length;
int commaCount = StringUtils.countMatches(data.getRowData(), ",") + 1;
if (commaCount < expectedCommaCount) {
Expand Down
Expand Up @@ -27,7 +27,7 @@ public class NodeCommunication implements Serializable {
private static final long serialVersionUID = 1L;

public enum CommunicationType {
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_XTRCT, OFF_FSPULL, OFF_FSPUSH;
PULL, PUSH, FILE_PUSH, FILE_PULL, OFFLN_PULL, OFFLN_PUSH, EXTRACT, FILE_XTRCT, OFF_FSPULL, OFF_FSPUSH, ROUTE;

public static boolean isPullType(CommunicationType communicationType) {
return communicationType == PULL || communicationType == CommunicationType.FILE_PULL
Expand Down
Expand Up @@ -90,6 +90,10 @@ public long getDataProcessed() {
return dataProcessed;
}

public void setDataProcessed(long dataProcessed) {
this.dataProcessed = dataProcessed;
}

public long getBatchesProcessed() {
return batchesProcessed;
}
Expand Down
Expand Up @@ -101,8 +101,7 @@ public void waitForComplete(long timeout) {
"Timed out after %sms", timeout));
}
} catch (java.lang.InterruptedException e) {
throw new InterruptedException(String.format(
"Timed out after %sms", timeout));
throw new InterruptedException(e);
}
}
}
Expand Down
Expand Up @@ -169,6 +169,15 @@ public void commit() {
}
}

public void commitOnlyPrerouted() {
dataIds.addAll(uncommittedDataIds);
clearState();
}

public List<Long> getUncommittedDataIds() {
return uncommittedDataIds;
}

protected void clearState() {
this.usedDataRouters.clear();
this.timesByRouter.clear();
Expand Down
Expand Up @@ -540,7 +540,7 @@ public DataGap getLastDataGap() {
}

@Override
public void addDataIds(List<Long> dataIds) {
public synchronized void addDataIds(List<Long> dataIds) {
this.dataIds.addAll(dataIds);
}

Expand Down
Expand Up @@ -99,7 +99,7 @@ protected void execute() {
ISymmetricDialect symmetricDialect = engine.getSymmetricDialect();
IDataGapRouteCursor cursor = null;
processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(engine.getNodeService().findIdentityNodeId(), null,
new ProcessInfoKey(engine.getNodeService().findIdentityNodeId(), context.getChannel().getChannelId(), null,
ProcessType.ROUTER_READER));
processInfo.setCurrentChannelId(context.getChannel().getChannelId());
try {
Expand Down
Expand Up @@ -412,6 +412,9 @@ protected ThreadPoolExecutor getExecutor(final CommunicationType communicationTy
case PUSH:
threadCountParameter = ParameterConstants.PUSH_THREAD_COUNT_PER_SERVER;
break;
case ROUTE:
threadCountParameter = ParameterConstants.ROUTING_THREAD_COUNT_PER_SERVER;
break;
case OFFLN_PULL:
threadCountParameter = ParameterConstants.OFFLINE_PULL_THREAD_COUNT_PER_SERVER;
break;
Expand Down Expand Up @@ -474,6 +477,9 @@ protected Date getLockTimeoutDate(CommunicationType communicationType) {
case PUSH:
parameter = ParameterConstants.PUSH_LOCK_TIMEOUT_MS;
break;
case ROUTE:
parameter = ParameterConstants.ROUTING_LOCK_TIMEOUT_MS;
break;
case OFFLN_PULL:
parameter = ParameterConstants.OFFLINE_PULL_LOCK_TIMEOUT_MS;
break;
Expand Down

0 comments on commit 2de9633

Please sign in to comment.