Skip to content

Commit

Permalink
0003252: When inserting extract in background reload batches record the
Browse files Browse the repository at this point in the history
data_row_count during the insert of the RQ batch
  • Loading branch information
chenson42 committed Sep 15, 2017
1 parent 7756061 commit 4753d42
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 66 deletions.
Expand Up @@ -78,7 +78,7 @@ public interface IDataService {
public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtClient);

public long insertReloadEvent(ISqlTransaction transaction, Node targetNode,
TriggerRouter triggerRouter, TriggerHistory triggerHistory, String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy, Status status);
TriggerRouter triggerRouter, TriggerHistory triggerHistory, String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy, Status status, long estimatedBatchRowCount);

public void sendScript(String nodeId, String script, boolean isLoad);

Expand All @@ -99,7 +99,7 @@ public boolean sendSchema(String nodeId, String catalogName, String schemaName,
public void insertDataAndDataEventAndOutgoingBatch(Data data, String channelId, List<Node> nodes, String routerId, boolean isLoad, long loadId, String createBy);

public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data,
String nodeId, String routerId, boolean isLoad, long loadId, String createBy, Status status);
String nodeId, String routerId, boolean isLoad, long loadId, String createBy, Status status, long estimatedBatchRowCount);

public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, String routerId, boolean isLoad, long loadId, String createBy);

Expand Down
Expand Up @@ -154,7 +154,7 @@ public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtCli

insertReloadEvent(transaction, targetNode, triggerRouter,
triggerHistory, request.getReloadSelect(), false, -1, null,
Status.NE);
Status.NE, null, -1);

if (!targetNode.requires13Compatiblity() && deleteAtClient) {
insertSqlEvent(
Expand Down Expand Up @@ -291,21 +291,22 @@ public TableReloadRequest mapRow(Row rs) {
public long insertReloadEvent(ISqlTransaction transaction, Node targetNode,
TriggerRouter triggerRouter, TriggerHistory triggerHistory,
String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy,
Status status) {
Status status, long estimatedBatchRowCount) {
String channelId = getReloadChannelIdForTrigger(triggerRouter.getTrigger(), engine
.getConfigurationService().getChannels(false));
return insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory,
overrideInitialLoadSelect, isLoad, loadId, createBy, status, channelId);
overrideInitialLoadSelect, isLoad, loadId, createBy, status, channelId, estimatedBatchRowCount);
}

/**
* @param estimatedBatchRowCount TODO
* @return If isLoad then return the inserted batch id otherwise return the
* data id
*/
public long insertReloadEvent(ISqlTransaction transaction, Node targetNode,
TriggerRouter triggerRouter, TriggerHistory triggerHistory,
String overrideInitialLoadSelect, boolean isLoad, long loadId, String createBy,
Status status, String channelId) {
Status status, String channelId, long estimatedBatchRowCount) {
if (triggerHistory == null) {
triggerHistory = lookupTriggerHistory(triggerRouter.getTrigger());
}
Expand All @@ -320,7 +321,7 @@ public long insertReloadEvent(ISqlTransaction transaction, Node targetNode,
if (isLoad) {
return insertDataAndDataEventAndOutgoingBatch(transaction, data,
targetNode.getNodeId(), triggerRouter.getRouter().getRouterId(), isLoad,
loadId, createBy, status);
loadId, createBy, status, null, estimatedBatchRowCount);
} else {
return insertData(transaction, data);
}
Expand Down Expand Up @@ -443,15 +444,15 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
String symNodeSecurityReloadChannel = null;
int totalTableCount = 0;
try {
for (List<TriggerRouter> triggerRouterList : triggerRoutersByHistoryId.values()) {
if (triggerRouterList.size() > 0) {
TriggerRouter tr = triggerRouterList.get(0);
symNodeSecurityReloadChannel = tr.getTrigger().getReloadChannelId();
}
totalTableCount += triggerRouterList.size();
}
for (List<TriggerRouter> triggerRouterList : triggerRoutersByHistoryId.values()) {
if (triggerRouterList.size() > 0) {
TriggerRouter tr = triggerRouterList.get(0);
symNodeSecurityReloadChannel = tr.getTrigger().getReloadChannelId();
}
totalTableCount += triggerRouterList.size();
}
} catch (Exception e) {
}
catch (Exception e) { }

processInfo.setDataCount(totalTableCount);

Expand Down Expand Up @@ -906,15 +907,25 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre

processInfo.setCurrentTableName(table.getName());

int numberOfBatches = getNumberOfReloadBatches(table, triggerRouter,
channel, targetNode, selectSql);
long rowCount = getDataCountForReload(table, targetNode, selectSql);
long transformMultiplier = getTransformMultiplier(table, triggerRouter);

// calculate the number of batches needed for table.
long numberOfBatches = 1;
long lastBatchSize = channel.getMaxBatchSize();

if (rowCount > 0) {
numberOfBatches = (rowCount * transformMultiplier / channel.getMaxBatchSize()) + 1;
lastBatchSize = rowCount % numberOfBatches;
}

long startBatchId = -1;
long endBatchId = -1;
for (int i = 0; i < numberOfBatches; i++) {
long batchSize = i == numberOfBatches-1 ? lastBatchSize : channel.getMaxBatchSize();
// needs to grab the start and end batch id
endBatchId = insertReloadEvent(transaction, targetNode, triggerRouter,
triggerHistory, selectSql, true, loadId, createBy, Status.RQ);
triggerHistory, selectSql, true, loadId, createBy, Status.RQ, null, batchSize);
if (startBatchId == -1) {
startBatchId = endBatchId;
}
Expand All @@ -924,7 +935,7 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre
targetNode.getNodeId(), channel.getQueue(), triggerRouter, startBatchId, endBatchId);
} else {
insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory,
selectSql, true, loadId, createBy, Status.NE);
selectSql, true, loadId, createBy, Status.NE, null, -1);
}

if (!transactional) {
Expand All @@ -936,20 +947,6 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre
}
}

protected int getNumberOfReloadBatches(Table table, TriggerRouter triggerRouter, Channel channel, Node targetNode, String selectSql) {
int rowCount = getDataCountForReload(table, targetNode, selectSql);
int transformMultiplier = getTransformMultiplier(table, triggerRouter);

// calculate the number of batches needed for table.
int numberOfBatches = 1;

if (rowCount > 0) {
numberOfBatches = (rowCount * transformMultiplier / channel.getMaxBatchSize()) + 1;
}

return numberOfBatches;
}

protected int getDataCountForReload(Table table, Node targetNode, String selectSql) {
DatabaseInfo dbInfo = platform.getDatabaseInfo();
String quote = dbInfo.getDelimiterToken();
Expand Down Expand Up @@ -1010,7 +1007,7 @@ private void insertFileSyncBatchForReload(Node targetNode, long loadId, String c
insertReloadEvent(transaction, targetNode, fileSyncSnapshotTriggerRouter,
fileSyncSnapshotHistory,
"reload_channel_id='" + channel.getChannelId() + "'", true, loadId,
createBy, Status.NE, channel.getChannelId());
createBy, Status.NE, channel.getChannelId(), -1);
if (!transactional) {
transaction.commit();
}
Expand Down Expand Up @@ -1091,7 +1088,7 @@ protected void createPurgeEvent(ISqlTransaction transaction, String sql, Node ta
CsvUtils.escapeCsvData(sql), null, triggerHistory, channelId, null, null);
if (isLoad) {
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(),
triggerRouter.getRouter().getRouterId(), isLoad, loadId, createBy, Status.NE);
triggerRouter.getRouter().getRouterId(), isLoad, loadId, createBy, Status.NE, null, -1);
} else {
data.setNodeList(targetNode.getNodeId());
insertData(transaction, data);
Expand Down Expand Up @@ -1140,7 +1137,7 @@ protected void insertSqlEvent(ISqlTransaction transaction, TriggerHistory histor
data.setNodeList(targetNode.getNodeId());
if (isLoad) {
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(),
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE);
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE, null, -1);
} else {
insertData(transaction, data);
}
Expand All @@ -1160,7 +1157,7 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId,
data.setNodeList(targetNode.getNodeId());
if (isLoad) {
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(),
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE);
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE, null, -1);
} else {
insertData(transaction, data);
}
Expand Down Expand Up @@ -1208,7 +1205,7 @@ public void insertCreateEvent(ISqlTransaction transaction, Node targetNode,
try {
if (isLoad) {
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(),
routerId, isLoad, loadId, createBy, Status.NE);
routerId, isLoad, loadId, createBy, Status.NE, null, -1);
} else {
insertData(transaction, data);
}
Expand Down Expand Up @@ -1311,7 +1308,7 @@ public void insertDataAndDataEventAndOutgoingBatch(Data data, String channelId,
long dataId = insertData(transaction, data);
for (Node node : nodes) {
insertDataEventAndOutgoingBatch(transaction, dataId, channelId, node.getNodeId(),
data.getDataEventType(), routerId, isLoad, loadId, createBy, Status.NE, data.getTableName());
data.getDataEventType(), routerId, isLoad, loadId, createBy, Status.NE, data.getTableName(), -1);
}
transaction.commit();
} catch (Error ex) {
Expand Down Expand Up @@ -1339,7 +1336,7 @@ public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, Str
try {
transaction = sqlTemplate.startSqlTransaction();
batchId = insertDataAndDataEventAndOutgoingBatch(transaction, data, nodeId, routerId,
isLoad, loadId, createBy, Status.NE);
isLoad, loadId, createBy, Status.NE, null, -1);
transaction.commit();
return batchId;
} catch (Error ex) {
Expand All @@ -1358,48 +1355,49 @@ public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, Str
}

/**
* @param estimatedBatchRowCount TODO
* @return The inserted batch id
*/
public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data,
String nodeId, String routerId, boolean isLoad, long loadId, String createBy,
Status status, String overrideChannelId) {
public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data, String nodeId, String routerId, boolean isLoad,
long loadId, String createBy, Status status, String overrideChannelId, long estimatedBatchRowCount) {
long dataId = insertData(transaction, data);
String channelId = null;
if (isLoad) {
if (overrideChannelId != null) {
channelId = overrideChannelId;
}
else {
TriggerHistory history = data.getTriggerHistory();
if (history != null && channelId == null) {
Trigger trigger = engine.getTriggerRouterService().getTriggerById(
history.getTriggerId());
channelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService()
.getChannels(false));
}
}
if (overrideChannelId != null) {
channelId = overrideChannelId;
} else {
TriggerHistory history = data.getTriggerHistory();
if (history != null && channelId == null) {
Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId());
channelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService().getChannels(false));
}
}
} else {
channelId = data.getChannelId();
}
return insertDataEventAndOutgoingBatch(transaction, dataId, channelId, nodeId,
data.getDataEventType(), routerId, isLoad, loadId, createBy, status, data.getTableName());
return insertDataEventAndOutgoingBatch(transaction, dataId, channelId, nodeId, data.getDataEventType(), routerId, isLoad, loadId,
createBy, status, data.getTableName(), estimatedBatchRowCount);
}

public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data,
String nodeId, String routerId, boolean isLoad, long loadId, String createBy,
Status status) {
return insertDataAndDataEventAndOutgoingBatch(transaction, data, nodeId, routerId, isLoad, loadId, createBy, status, null);
Status status, long estimatedBatchRowCount) {
return insertDataAndDataEventAndOutgoingBatch(transaction, data, nodeId, routerId, isLoad, loadId, createBy, status, null, estimatedBatchRowCount);
}

protected long insertDataEventAndOutgoingBatch(ISqlTransaction transaction, long dataId,
String channelId, String nodeId, DataEventType eventType, String routerId,
boolean isLoad, long loadId, String createBy, Status status, String tableName) {
boolean isLoad, long loadId, String createBy, Status status, String tableName, long estimatedBatchRowCount) {
OutgoingBatch outgoingBatch = new OutgoingBatch(nodeId, channelId, status);
outgoingBatch.setLoadId(loadId);
outgoingBatch.setCreateBy(createBy);
outgoingBatch.setLoadFlag(isLoad);
outgoingBatch.incrementRowCount(eventType);
outgoingBatch.incrementDataRowCount();
if (estimatedBatchRowCount > 0) {
outgoingBatch.setDataRowCount(estimatedBatchRowCount);
} else {
outgoingBatch.incrementDataRowCount();
}
if (tableName != null) {
outgoingBatch.incrementTableCount(tableName.toLowerCase());
}
Expand Down Expand Up @@ -1432,7 +1430,7 @@ private void insertNodeSecurityUpdate(ISqlTransaction transaction, String nodeId
" t.node_id = '" + nodeIdRecord + "'");
if (data != null) {
insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNodeId,
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE, channelId);
Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy, Status.NE, channelId, -1);
}
}

Expand Down Expand Up @@ -1559,7 +1557,7 @@ public String reloadTable(String nodeId, String catalogName, String schemaName,
for (TriggerRouter triggerRouter : triggerRouters) {
eventCount++;
insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory,
overrideInitialLoadSelect, false, -1, "reloadTable", Status.NE);
overrideInitialLoadSelect, false, -1, "reloadTable", Status.NE, null, -1);
}
}
}
Expand Down
Expand Up @@ -268,7 +268,7 @@ public void insertOutgoingBatch(ISqlTransaction transaction, OutgoingBatch outgo
outgoingBatch.isLoadFlag() ? 1 : 0, outgoingBatch.isCommonFlag() ? 1 : 0, outgoingBatch.getReloadRowCount(),
outgoingBatch.getOtherRowCount(), outgoingBatch.getDataUpdateRowCount(), outgoingBatch.getDataInsertRowCount(),
outgoingBatch.getDataDeleteRowCount(), outgoingBatch.getLastUpdatedHostName(), outgoingBatch.getCreateBy(),
outgoingBatch.getSummary());
outgoingBatch.getSummary(), outgoingBatch.getDataRowCount());
outgoingBatch.setBatchId(batchId);
}

Expand Down
Expand Up @@ -51,8 +51,8 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
putSql("insertOutgoingBatchSql",
"insert into $(outgoing_batch) "
+ " (batch_id, node_id, channel_id, status, load_id, extract_job_flag, load_flag, common_flag, reload_row_count, other_row_count, "
+ " data_update_row_count, data_insert_row_count, data_delete_row_count, last_update_hostname, last_update_time, create_time, create_by, summary) "
+ " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp, ?, ?) ");
+ " data_update_row_count, data_insert_row_count, data_delete_row_count, last_update_hostname, last_update_time, create_time, create_by, summary, data_row_count) "
+ " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, current_timestamp, current_timestamp, ?, ?, ?) ");

putSql("updateOutgoingBatchSql",
"update $(outgoing_batch) set status=?, load_id=?, extract_job_flag=?, load_flag=?, error_flag=?, "
Expand Down

0 comments on commit 4753d42

Please sign in to comment.