Skip to content

Commit

Permalink
0004174: Initial load in background contention with routing
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Dec 2, 2019
1 parent 1e182bf commit f05a3e9
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 38 deletions.
Expand Up @@ -40,7 +40,7 @@ public class TableReloadRequest {
protected Date createTime = new Date();
protected Date lastUpdateTime = new Date();
protected String lastUpdateBy;
protected int loadId;
protected long loadId;
protected boolean processed;

public TableReloadRequest(TableReloadRequestKey key) {
Expand Down Expand Up @@ -170,11 +170,11 @@ public String getIdentifier() {
return getTriggerId() + getRouterId();
}

public int getLoadId() {
public long getLoadId() {
return loadId;
}

public void setLoadId(int loadId) {
public void setLoadId(long loadId) {
this.loadId = loadId;
}

Expand Down
Expand Up @@ -26,6 +26,8 @@
public interface ISequenceService {

public long nextVal(String name);

public long nextRange(String name, long size);

public long currVal(String name);

Expand Down
Expand Up @@ -331,6 +331,7 @@ public TableReloadRequest mapRow(Row rs) {
request.setChannelId(rs.getString("channel_id"));
request.setTriggerId(rs.getString("trigger_id"));
request.setRouterId(rs.getString("router_id"));
request.setLoadId(rs.getLong("load_id"));
request.setCreateTime(rs.getDateTime("create_time"));
request.setLastUpdateBy(rs.getString("last_update_by"));
request.setLastUpdateTime(rs.getDateTime("last_update_time"));
Expand Down Expand Up @@ -371,7 +372,7 @@ public List<TableReloadRequest> getTableReloadRequestByLoadId() {
public List<TableReloadRequest> collapseTableReloadRequestsByLoadId(List<TableReloadRequest> requests) {
List<TableReloadRequest> collapsedRequests = new ArrayList<TableReloadRequest>();

int previousLoadId = -1;
long previousLoadId = -1;

TableReloadRequest summary = null;
for (TableReloadRequest request : requests) {
Expand Down Expand Up @@ -668,7 +669,42 @@ public TableReloadStatus mapRow(Row rs) {
return request;
}
}


protected long insertRequestedOutgoingBatches(ISqlTransaction transaction, Node targetNode,
TriggerRouter triggerRouter, TriggerHistory triggerHistory,
String overrideInitialLoadSelect, long loadId, String createBy,
String channelId, long rowsPerBatch, long batchCount) {

long startBatchId = engine.getSequenceService().nextRange(Constants.SEQUENCE_OUTGOING_BATCH, batchCount);
String tableName = triggerHistory.getSourceTableName().toLowerCase();

for (int i = 0; i < batchCount; i++) {
long batchId = startBatchId + i;
OutgoingBatch batch = new OutgoingBatch(targetNode.getNodeId(), channelId, Status.RQ);
batch.setBatchId(batchId);
batch.setLoadId(loadId);
batch.setCreateBy(createBy);
batch.setLoadFlag(true);
batch.incrementRowCount(DataEventType.RELOAD);
batch.setDataRowCount(rowsPerBatch);
batch.incrementTableCount(tableName);
batch.setExtractJobFlag(true);
engine.getOutgoingBatchService().insertOutgoingBatch(transaction, batch);

if (i == 0) {
Data data = new Data(triggerHistory.getSourceTableName(), DataEventType.RELOAD,
overrideInitialLoadSelect != null ? overrideInitialLoadSelect : triggerRouter.getInitialLoadSelect(), null,
triggerHistory, channelId, null, null);
data.setNodeList(targetNode.getNodeId());
data.setPreRouted(true);
long dataId = insertData(transaction, data);
insertDataEvent(transaction, new DataEvent(dataId, batchId));
}
}

return startBatchId;
}

/**
* @return If isLoad then return the inserted batch id otherwise return the
* data id
Expand Down Expand Up @@ -1449,24 +1485,14 @@ private Map<Integer, ExtractRequest> insertLoadBatchesForReload(Node targetNode,

// calculate the number of batches needed for table.
long numberOfBatches = 1;
long lastBatchSize = channel.getMaxBatchSize();

if (rowCount > 0) {
numberOfBatches = (rowCount * transformMultiplier / channel.getMaxBatchSize()) + 1;
lastBatchSize = rowCount % numberOfBatches;
numberOfBatches = (long) Math.ceil((rowCount * transformMultiplier) / (channel.getMaxBatchSize() * 1f));
}

long startBatchId = -1;
long endBatchId = -1;
for (int i = 0; i < numberOfBatches; i++) {
long batchSize = i == numberOfBatches - 1 ? lastBatchSize : channel.getMaxBatchSize();
// needs to grab the start and end batch id
endBatchId = insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory, selectSql, true,
loadId, createBy, Status.RQ, null, batchSize);
if (startBatchId == -1) {
startBatchId = endBatchId;
}
}
long startBatchId = insertRequestedOutgoingBatches(transaction, targetNode, triggerRouter, triggerHistory, selectSql,
loadId, createBy, reloadChannel, channel.getMaxBatchSize(), numberOfBatches);
long endBatchId = startBatchId + numberOfBatches - 1;

firstBatchId = firstBatchId == 0 ? startBatchId : firstBatchId;

Expand Down
Expand Up @@ -39,7 +39,7 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map<String, String> replace

putSql("selectTableReloadRequestToProcess", "select target_node_id, create_table, delete_first, reload_select, before_custom_sql, "
+ " reload_time, channel_id, create_time, last_update_by, "
+ " last_update_time, trigger_id, router_id "
+ " last_update_time, trigger_id, router_id, load_id "
+ " from $(table_reload_request) "
+ " where source_node_id=? and processed = 0 "
+ " order by create_time, target_node_id");
Expand Down
Expand Up @@ -89,7 +89,7 @@ public synchronized long nextVal(String name) {
if (!parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED) && getSequenceDefinition(name).getCacheSize() > 0) {
return nextValFromCache(null, name);
}
return nextValFromDatabase(name);
return nextValFromDatabase(name, 1);
}

public synchronized long nextVal(ISqlTransaction transaction, final String name) {
Expand All @@ -104,40 +104,41 @@ public void transactionRolledBack() {
if (!parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED) && getSequenceDefinition(transaction, name).getCacheSize() > 0) {
return nextValFromCache(transaction, name);
}
return nextValFromDatabase(transaction, name);
return nextValFromDatabase(transaction, name, 1);
}

protected long nextValFromCache(ISqlTransaction transaction, String name) {
CachedRange range = sequenceCache.get(name);
if (range != null) {
long currentValue = range.getCurrentValue();
if (currentValue < range.getEndValue()) {
range.setCurrentValue(++currentValue);
currentValue += range.getIncrementBy();
range.setCurrentValue(currentValue);
return currentValue;
} else {
sequenceCache.remove(name);
}
}
return nextValFromDatabase(transaction, name);
return nextValFromDatabase(transaction, name, 1);
}

protected long nextValFromDatabase(final String name) {
protected long nextValFromDatabase(final String name, long size) {
return new DoTransaction<Long>() {
public Long execute(ISqlTransaction transaction) {
return nextValFromDatabase(transaction, name);
return nextValFromDatabase(transaction, name, size);
}
}.execute();
}

protected long nextValFromDatabase(ISqlTransaction transaction, String name) {
protected long nextValFromDatabase(ISqlTransaction transaction, String name, long size) {
if (transaction == null) {
return nextValFromDatabase(name);
return nextValFromDatabase(name, size);
} else {
long sequenceTimeoutInMs = parameterService.getLong(
ParameterConstants.SEQUENCE_TIMEOUT_MS, 5000);
long ts = System.currentTimeMillis();
do {
long nextVal = tryToGetNextVal(transaction, name);
long nextVal = tryToGetNextVal(transaction, name, size);
if (nextVal > 0) {
return nextVal;
}
Expand All @@ -149,32 +150,32 @@ protected long nextValFromDatabase(ISqlTransaction transaction, String name) {
}
}

protected long tryToGetNextVal(ISqlTransaction transaction, String name) {
protected long tryToGetNextVal(ISqlTransaction transaction, String name, long size) {
long currVal = currVal(transaction, name);
Sequence sequence = getSequenceDefinition(transaction, name);
long nextVal = currVal + sequence.getIncrementBy();
long nextVal = currVal + (sequence.getIncrementBy() * size);
if (nextVal > sequence.getMaxValue()) {
if (sequence.isCycle()) {
nextVal = sequence.getMinValue();
nextVal = sequence.getMinValue() + ((sequence.getIncrementBy() * size) - sequence.getIncrementBy());
} else {
throw new IllegalStateException(String.format(
"The sequence named %s has reached it's max value. "
+ "No more numbers can be handled out.", name));
+ "No more numbers can be handed out.", name));
}
} else if (nextVal < sequence.getMinValue()) {
if (sequence.isCycle()) {
nextVal = sequence.getMaxValue();
nextVal = sequence.getMaxValue() + ((sequence.getIncrementBy() * size) - sequence.getIncrementBy());
} else {
throw new IllegalStateException(String.format(
"The sequence named %s has reached it's min value. "
+ "No more numbers can be handled out.", name));
+ "No more numbers can be handed out.", name));
}
}

CachedRange range = null;
if (!parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED) && sequence.getCacheSize() > 0) {
long endVal = nextVal + (sequence.getIncrementBy() * (sequence.getCacheSize() - 1));
range = new CachedRange(nextVal, endVal);
range = new CachedRange(nextVal, endVal, sequence.getIncrementBy());
nextVal = endVal;
}

Expand All @@ -189,6 +190,58 @@ protected long tryToGetNextVal(ISqlTransaction transaction, String name) {
return nextVal;
}

/**
* Obtain a contiguous range of sequence numbers. The initial load extract in background needs an uninterrupted range
* of batch numbers. As a bonus, it's more efficient to request the entire range in one call.
*
* @param name Sequence name to use
* @param size Number of sequence numbers to obtain
* @return Starting sequence number for the entire range that was obtained
*/
public synchronized long nextRange(String name, long size) {
Sequence sequence = getSequenceDefinition(name);
if (size <= 0) {
throw new IllegalStateException("Size of range must be a positive integer");
}
if (sequence.getIncrementBy() <= 0) {
throw new IllegalStateException("Increment-by must be a positive integer");
}
long startingValue = 0;
long rangeNeeded = size * sequence.getIncrementBy();

if (!parameterService.is(ParameterConstants.CLUSTER_LOCKING_ENABLED) && sequence.getCacheSize() > 0) {
CachedRange range = sequenceCache.get(name);
if (range != null) {
long currentValue = range.getCurrentValue();
long endValue = range.getEndValue();
long rangeAvailable = endValue - currentValue;
long rangeEndValue = currentValue + rangeNeeded;

if (currentValue < endValue && rangeEndValue <= sequence.getMaxValue()) {
startingValue = currentValue + sequence.getIncrementBy();
if (rangeNeeded <= rangeAvailable) {
range.setCurrentValue(currentValue + rangeNeeded);
rangeNeeded = 0;
} else {
rangeNeeded -= rangeAvailable;
size = rangeNeeded / sequence.getIncrementBy();
sequenceCache.remove(name);
}
} else {
sequenceCache.remove(name);
}
}
}

if (rangeNeeded > 0) {
long databaseStartingValue = nextValFromDatabase(name, size) - (rangeNeeded - sequence.getIncrementBy());
if (startingValue == 0) {
startingValue = databaseStartingValue;
}
}
return startingValue;
}

protected Sequence getSequenceDefinition(final String name) {
Sequence sequence = sequenceDefinitionCache.get(name);
if (sequence != null) {
Expand Down Expand Up @@ -269,10 +322,12 @@ protected Map<String, Sequence> getAll() {
static class CachedRange {
long currentValue;
long endValue;
int incrementBy;

public CachedRange(long currentValue, long endValue) {
public CachedRange(long currentValue, long endValue, int incrementBy) {
this.currentValue = currentValue;
this.endValue = endValue;
this.incrementBy = incrementBy;
}

public long getCurrentValue() {
Expand All @@ -285,7 +340,11 @@ public void setCurrentValue(long currentValue) {

public long getEndValue() {
return endValue;
}
}

public int getIncrementBy() {
return incrementBy;
}
}

abstract class DoTransaction<T> {
Expand Down

0 comments on commit f05a3e9

Please sign in to comment.