Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds into 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Aug 2, 2016
2 parents 20497ce + b6e4657 commit 14cccbe
Showing 1 changed file with 124 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,9 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
|| (reloadRequests.size() == 1 && reloadRequests.get(0).isFullLoadRequest());

if (!reverse) {
log.info("Queueing up " + (isFullLoad ? "an initial" : "a" + " load to ") + targetNode.getNodeId());
log.info("Queueing up " + (isFullLoad ? "an initial" : "a" + " load to node ") + targetNode.getNodeId());
} else {
log.info("Queueing up a reverse " + (isFullLoad ? "initial" : "") + " load to " + targetNode.getNodeId());
log.info("Queueing up a reverse " + (isFullLoad ? "initial" : "") + " load to node " + targetNode.getNodeId());
}

/*
Expand Down Expand Up @@ -412,21 +412,25 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa

insertCreateSchemaScriptPriorToReload(targetNode, nodeIdRecord, loadId,
createBy, transactional, transaction);

}
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests);
if (isFullLoad || (reloadRequests != null && reloadRequests.size() > 0)) {
insertSqlEventsPriorToReload(targetNode, nodeIdRecord, loadId, createBy,
transactional, transaction, reverse);
transactional, transaction, reverse,
triggerHistories, triggerRoutersByHistoryId,
mapReloadRequests, isFullLoad);
}

insertCreateBatchesForReload(targetNode, loadId, createBy,
triggerHistories, triggerRoutersByHistoryId, transactional,
transaction, convertReloadListToMap(reloadRequests));
transaction, mapReloadRequests);

insertDeleteBatchesForReload(targetNode, loadId, createBy,
triggerHistories, triggerRoutersByHistoryId, transactional,
transaction, convertReloadListToMap(reloadRequests));
transaction, mapReloadRequests);

insertLoadBatchesForReload(targetNode, loadId, createBy, triggerHistories,
triggerRoutersByHistoryId, transactional, transaction, reloadRequests);
triggerRoutersByHistoryId, transactional, transaction, mapReloadRequests);

if (isFullLoad) {
String afterSql = parameterService
Expand Down Expand Up @@ -464,7 +468,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
request.getTargetNodeId(), request.getSourceNodeId(), request.getTriggerId(),
request.getRouterId(), request.getCreateTime());
}
log.info("Table reload request for load id " + loadId + " have been processed.");
log.info("Table reload request(s) for load id " + loadId + " have been processed.");
}

transaction.commit();
Expand Down Expand Up @@ -569,36 +573,71 @@ private String formatCommandForScript(String command) {
}

private void insertSqlEventsPriorToReload(Node targetNode, String nodeIdRecord, long loadId,
String createBy, boolean transactional, ISqlTransaction transaction, boolean reverse) {
if (!Constants.DEPLOYMENT_TYPE_REST.equals(targetNode.getDeploymentType())) {
/*
* Insert node security so the client doing the initial load knows
* that an initial load is currently happening
*/
insertNodeSecurityUpdate(transaction, nodeIdRecord, targetNode.getNodeId(), true,
loadId, createBy);

/*
* Mark incoming batches as OK at the target node because we marked
* outgoing batches as OK at the source
*/
insertSqlEvent(
transaction,
targetNode,
String.format(
"update %s_incoming_batch set status='OK', error_flag=0 where node_id='%s' and status != 'OK'",
tablePrefix, engine.getNodeService().findIdentityNodeId()), true,
loadId, createBy);
}

String beforeSql = parameterService.getString(reverse ? ParameterConstants.INITIAL_LOAD_REVERSE_BEFORE_SQL
: ParameterConstants.INITIAL_LOAD_BEFORE_SQL);
if (isNotBlank(beforeSql)) {
insertSqlEvent(
transaction,
targetNode,
beforeSql, true,
loadId, createBy);
String createBy, boolean transactional, ISqlTransaction transaction, boolean reverse,
List<TriggerHistory> triggerHistories,
Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId,
Map<String, TableReloadRequest> reloadRequests, boolean isFullLoad) {

if (!Constants.DEPLOYMENT_TYPE_REST.equals(targetNode.getDeploymentType())) {
/*
* Insert node security so the client doing the initial load knows
* that an initial load is currently happening
*/
insertNodeSecurityUpdate(transaction, nodeIdRecord, targetNode.getNodeId(), true,
loadId, createBy);

if (isFullLoad) {
/*
* Mark incoming batches as OK at the target node because we marked
* outgoing batches as OK at the source
*/
insertSqlEvent(
transaction,
targetNode,
String.format(
"update %s_incoming_batch set status='OK', error_flag=0 where node_id='%s' and status != 'OK'",
tablePrefix, engine.getNodeService().findIdentityNodeId()), true,
loadId, createBy);
}
}

if (!isFullLoad && reloadRequests != null && reloadRequests.size() > 0) {
String beforeSql = "";
int beforeSqlSent = 0;

for (TriggerHistory triggerHistory : triggerHistories) {
List<TriggerRouter> triggerRouters = triggerRoutersByHistoryId.get(triggerHistory
.getTriggerHistoryId());
for (TriggerRouter triggerRouter : triggerRouters) {
TableReloadRequest currentRequest = reloadRequests.get(triggerRouter.getTriggerId() + triggerRouter.getRouterId());
beforeSql = currentRequest.getBeforeCustomSql();

if (isNotBlank(beforeSql)) {
String tableName = triggerRouter.qualifiedTargetTableName(triggerHistory);
String formattedBeforeSql = String.format(beforeSql, tableName) + ";";

insertSqlEvent(
transaction,
targetNode,
formattedBeforeSql, true,
loadId, createBy);
beforeSqlSent++;
}
}
}
log.info("Before sending load {} to target node {} the before sql [{}] was sent for {} tables", new Object[] {
loadId, targetNode, beforeSql, beforeSqlSent });

} else {
String beforeSql = parameterService.getString(reverse ? ParameterConstants.INITIAL_LOAD_REVERSE_BEFORE_SQL
: ParameterConstants.INITIAL_LOAD_BEFORE_SQL);
if (isNotBlank(beforeSql)) {
insertSqlEvent(
transaction,
targetNode,
beforeSql, true,
loadId, createBy);
}
}
}

Expand All @@ -608,6 +647,7 @@ private void insertCreateBatchesForReload(Node targetNode, long loadId, String c
ISqlTransaction transaction, Map<String, TableReloadRequest> reloadRequests) {

if (reloadRequests != null && reloadRequests.size() > 0) {
int createEventsSent = 0;
for (TriggerHistory triggerHistory : triggerHistories) {
List<TriggerRouter> triggerRouters = triggerRoutersByHistoryId.get(triggerHistory
.getTriggerHistoryId());
Expand All @@ -626,12 +666,15 @@ private void insertCreateBatchesForReload(Node targetNode, long loadId, String c
targetNode)) {
insertCreateEvent(transaction, targetNode, triggerHistory, triggerRouter.getRouter().getRouterId(), true,
loadId, createBy);
createEventsSent++;
if (!transactional) {
transaction.commit();
}
}
}
}
log.info("Before sending load {} to target node {} create table events were sent for {} tables", new Object[] {
loadId, targetNode, createEventsSent });
}
else {
if (parameterService.is(ParameterConstants.INITIAL_LOAD_CREATE_SCHEMA_BEFORE_RELOAD)) {
Expand Down Expand Up @@ -660,6 +703,7 @@ private void insertDeleteBatchesForReload(Node targetNode, long loadId, String c
ISqlTransaction transaction, Map<String, TableReloadRequest> reloadRequests) {

if (reloadRequests != null && reloadRequests.size() > 0) {
int deleteEventsSent = 0;
for (TriggerHistory triggerHistory : triggerHistories) {
List<TriggerRouter> triggerRouters = triggerRoutersByHistoryId.get(triggerHistory
.getTriggerHistoryId());
Expand All @@ -678,12 +722,15 @@ private void insertDeleteBatchesForReload(Node targetNode, long loadId, String c
targetNode)) {
insertPurgeEvent(transaction, targetNode, triggerRouter, triggerHistory,
true, currentRequest.getBeforeCustomSql(), loadId, createBy);
deleteEventsSent++;
if (!transactional) {
transaction.commit();
}
}
}
}
log.info("Before sending load {} to target node {} delete data events were sent for {} tables", new Object[] {
loadId, targetNode, deleteEventsSent });
}
else {
if (parameterService.is(ParameterConstants.INITIAL_LOAD_DELETE_BEFORE_RELOAD)) {
Expand Down Expand Up @@ -716,7 +763,7 @@ private void insertDeleteBatchesForReload(Node targetNode, long loadId, String c
private void insertLoadBatchesForReload(Node targetNode, long loadId, String createBy,
List<TriggerHistory> triggerHistories,
Map<Integer, List<TriggerRouter>> triggerRoutersByHistoryId, boolean transactional,
ISqlTransaction transaction, List<TableReloadRequest> reloadRequests) {
ISqlTransaction transaction, Map<String, TableReloadRequest> reloadRequests) {
Map<String, Channel> channels = engine.getConfigurationService().getChannels(false);
DatabaseInfo dbInfo = platform.getDatabaseInfo();
String quote = dbInfo.getDelimiterToken();
Expand All @@ -728,47 +775,55 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre
for (TriggerRouter triggerRouter : triggerRouters) {
if (triggerRouter.getInitialLoadOrder() >= 0
&& engine.getGroupletService().isTargetEnabled(triggerRouter, targetNode)) {

TableReloadRequest reloadRequest = reloadRequests.get(triggerRouter.getTriggerId() + triggerRouter.getRouterId());
String selectSql = reloadRequest != null ? reloadRequest.getReloadSelect() : null;
if (StringUtils.isBlank(selectSql)) {
selectSql = StringUtils.isBlank(triggerRouter.getInitialLoadSelect())
? Constants.ALWAYS_TRUE_CONDITION
: triggerRouter.getInitialLoadSelect();
}

if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB)) {
Trigger trigger = triggerRouter.getTrigger();
String reloadChannel = getReloadChannelIdForTrigger(trigger, channels);
Channel channel = channels.get(reloadChannel);
// calculate the number of batches needed for table.
int numberOfBatches = triggerRouter.getInitialLoadBatchCount();
if (numberOfBatches <= 0) {
Table table = platform.getTableFromCache(
triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(),
triggerHistory.getSourceTableName(), false);
String sql = String.format("select count(*) from %s where %s", table
.getQualifiedTableName(quote, catalogSeparator, schemaSeparator),
StringUtils.isBlank(triggerRouter.getInitialLoadSelect()) ? Constants.ALWAYS_TRUE_CONDITION
: triggerRouter.getInitialLoadSelect());
sql = FormatUtils.replace("groupId", targetNode.getNodeGroupId(), sql);
sql = FormatUtils
.replace("externalId", targetNode.getExternalId(), sql);
sql = FormatUtils.replace("nodeId", targetNode.getNodeId(), sql);
int rowCount = sqlTemplate.queryForInt(sql);
int transformMultiplier = 0;
for (TransformService.TransformTableNodeGroupLink transform : engine.getTransformService().getTransformTables(false)) {
if (triggerRouter.getRouter().getNodeGroupLink().equals(transform.getNodeGroupLink()) &&
transform.getSourceTableName().equals(table.getName())) {
transformMultiplier++;
}
}
if (transformMultiplier == 0) { transformMultiplier = 1; }

if (rowCount > 0) {
numberOfBatches = (rowCount * transformMultiplier / channel.getMaxBatchSize()) + 1;
} else {
numberOfBatches = 1;
}

Table table = platform.getTableFromCache(
triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(),
triggerHistory.getSourceTableName(), false);


String sql = String.format("select count(*) from %s where %s", table
.getQualifiedTableName(quote, catalogSeparator, schemaSeparator), selectSql);
sql = FormatUtils.replace("groupId", targetNode.getNodeGroupId(), sql);
sql = FormatUtils
.replace("externalId", targetNode.getExternalId(), sql);
sql = FormatUtils.replace("nodeId", targetNode.getNodeId(), sql);
int rowCount = sqlTemplate.queryForInt(sql);
int transformMultiplier = 0;
for (TransformService.TransformTableNodeGroupLink transform : engine.getTransformService().getTransformTables(false)) {
if (triggerRouter.getRouter().getNodeGroupLink().equals(transform.getNodeGroupLink()) &&
transform.getSourceTableName().equals(table.getName())) {
transformMultiplier++;
}
}

if (transformMultiplier == 0) { transformMultiplier = 1; }

if (rowCount > 0) {
numberOfBatches = (rowCount * transformMultiplier / channel.getMaxBatchSize()) + 1;
} else {
numberOfBatches = 1;
}

long startBatchId = -1;
long endBatchId = -1;
for (int i = 0; i < numberOfBatches; i++) {
// needs to grab the start and end batch id
endBatchId = insertReloadEvent(transaction, targetNode, triggerRouter,
triggerHistory, null, true, loadId, createBy, Status.RQ);
triggerHistory, selectSql, true, loadId, createBy, Status.RQ);
if (startBatchId == -1) {
startBatchId = endBatchId;
}
Expand All @@ -778,7 +833,7 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre
targetNode.getNodeId(), channel.getQueue(), triggerRouter, startBatchId, endBatchId);
} else {
insertReloadEvent(transaction, targetNode, triggerRouter, triggerHistory,
null, true, loadId, createBy, Status.NE);
selectSql, true, loadId, createBy, Status.NE);
}

if (!transactional) {
Expand Down

0 comments on commit 14cccbe

Please sign in to comment.