Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds into 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Aug 16, 2016
2 parents 16eaa84 + 09ee35d commit 4a4cb50
Show file tree
Hide file tree
Showing 17 changed files with 557 additions and 66 deletions.
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.job;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -33,6 +34,9 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

public class ReportStatusJob extends AbstractJob {

private static Map<String, Integer> lastBatchCountPerChannel =
Collections.synchronizedMap(new HashMap<String, Integer>());

protected ReportStatusJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.report.status", true, engine.getParameterService().is(ParameterConstants.HYBRID_PUSH_PULL_ENABLED),
Expand All @@ -47,21 +51,61 @@ void doJob(boolean force) throws Exception {
Node identity = engine.getNodeService().findIdentity();

if (remote.getNode().getNodeId().equals(identity.getNodeId())) {
log.debug("Skipping report status job because this node is the root node.");
log.debug("Skipping report status job because this node is the root node. identity={}, remote={}", identity, remote);
return;
}

Map<String, Integer> batchesToSendByChannel = engine.getOutgoingBatchService().
Map<String, Integer> batchCountPerChannel = engine.getOutgoingBatchService().
countOutgoingBatchesPendingByChannel(remote.getNode().getNodeId());

if (!batchesToSendByChannel.isEmpty()) {
log.debug("identity={} batchCountPerChannel='{}', lastBatchCountPerChannel='{}'",
identity, batchCountPerChannel, lastBatchCountPerChannel);

if (force || shouldSendStatus(batchCountPerChannel)) {
Map<String, String> requestParams = new HashMap<String, String>();

requestParams.put(WebConstants.BATCH_TO_SEND_COUNT, TransportUtils.toCSV(batchesToSendByChannel));
requestParams.put(WebConstants.BATCH_TO_SEND_COUNT, TransportUtils.toCSV(batchCountPerChannel));

engine.getTransportManager().sendStatusRequest(identity, requestParams);

updateLastSentStatus(batchCountPerChannel);
}
}


protected boolean shouldSendStatus(Map<String, Integer> batchCountPerChannel) {
if (batchCountPerChannel == null || batchCountPerChannel.isEmpty()) {
return false;
}

if (lastBatchCountPerChannel.equals(batchCountPerChannel)) {
return false;
}

if (engine.getParameterService().is(ParameterConstants.HYBRID_PUSH_PULL_BUFFER_STATUS_UPDATES)) {
for (String channelId : batchCountPerChannel.keySet()) {
Integer lastCount = lastBatchCountPerChannel.get(channelId);
if (lastCount == null) {
lastCount = Integer.valueOf(0);
}
Integer currentCount = batchCountPerChannel.get(channelId);

if (lastCount.equals(Integer.valueOf(0)) && !lastCount.equals(currentCount)) {
return true;
}
}

return false;
}

return true;
}

protected void updateLastSentStatus(Map<String, Integer> batchesToSendByChannel) {
lastBatchCountPerChannel.clear();
lastBatchCountPerChannel.putAll(batchesToSendByChannel);
}

@Override
public String getClusterLockName() {
return ClusterConstants.REPORT_STATUS;
Expand Down
Expand Up @@ -356,7 +356,9 @@ private ParameterConstants() {

public final static String HYBRID_PUSH_PULL_ENABLED = "hybrid.push.pull.enabled";

public final static String HYBRID_PUSH_PULL_TIMEOUT = "hybrid.push.pull.timeout";
public final static String HYBRID_PUSH_PULL_TIMEOUT = "hybrid.push.pull.timeout.ms";

public final static String HYBRID_PUSH_PULL_BUFFER_STATUS_UPDATES = "hybrid.push.pull.buffer.status.updates";

public final static String DBF_ROUTER_VALIDATE_HEADER = "dbf.router.validate.header";

Expand Down
Expand Up @@ -67,6 +67,8 @@ public String afterUpgrade(ISymmetricDialect symmetricDialect, String tablePrefi
if (isUpgradeTo38) {
engine.getSqlTemplate().update("update " + tablePrefix + "_" + TableConstants.SYM_SEQUENCE +
" set cache_size = 10 where sequence_name = ?", Constants.SEQUENCE_OUTGOING_BATCH);
engine.getSqlTemplate().update("update " + tablePrefix + "_" + TableConstants.SYM_CHANNEL +
" set max_batch_size = 10000 where reload_flag = 1 and max_batch_size = 10000");
}
return sb.toString();
}
Expand Down
@@ -0,0 +1,230 @@
package org.jumpmind.symmetric.model;

import java.io.Serializable;
import java.util.Date;

public class LoadSummary implements Serializable {

private static final long serialVersionUID = 1L;

private long loadId;
private String nodeId;
private boolean inError;
private int finishedBatchCount;
private int pendingBatchCount;
private long currentBatchId;
private long currentDataEventCount;
private String createBy;
private Date createTime;
private Date lastUpdateTime;
private String channelQueue;
private int tableCount;
private boolean isFullLoad;
private boolean isCreateFirst;
private boolean isDeleteFirst;
private boolean isRequestProcessed;
private boolean isConditional;
private boolean isCustomSql;
private long batchCount;
private String currentTableName;
private long dataCount;
private String processStatus;
private String processName;

public boolean isActive() {
return pendingBatchCount > 0;
}

public void setInError(boolean inError) {
this.inError = inError;
}

public boolean isInError() {
return inError;
}

public long getLoadId() {
return loadId;
}

public void setLoadId(long loadId) {
this.loadId = loadId;
}

public String getNodeId() {
return nodeId;
}

public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}

public int getFinishedBatchCount() {
return finishedBatchCount;
}

public void setFinishedBatchCount(int finishedBatchCount) {
this.finishedBatchCount = finishedBatchCount;
}

public int getPendingBatchCount() {
return pendingBatchCount;
}

public void setPendingBatchCount(int pendingBatchCount) {
this.pendingBatchCount = pendingBatchCount;
}

public long getCurrentBatchId() {
return currentBatchId;
}

public void setCurrentBatchId(long currentBatchId) {
this.currentBatchId = currentBatchId;
}

public long getCurrentDataEventCount() {
return currentDataEventCount;
}

public void setCurrentDataEventCount(long currentDataEventCount) {
this.currentDataEventCount = currentDataEventCount;
}

public Date getCreateTime() {
return createTime;
}

public void setCreateTime(Date createTime) {
this.createTime = createTime;
}

public Date getLastUpdateTime() {
return lastUpdateTime;
}

public void setLastUpdateTime(Date lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}

public void setCreateBy(String createBy) {
this.createBy = createBy;
}

public String getCreateBy() {
return createBy;
}

public String getLoadNodeId() {
return String.format("%010d-%s", loadId, nodeId);
}

public String getChannelQueue() {
return channelQueue;
}

public void setChannelQueue(String channelQueue) {
this.channelQueue = channelQueue;
}

public int getTableCount() {
return tableCount;
}

public void setTableCount(int tableCount) {
this.tableCount = tableCount;
}

public boolean isFullLoad() {
return isFullLoad;
}

public void setFullLoad(boolean isFullLoad) {
this.isFullLoad = isFullLoad;
}

public boolean isCreateFirst() {
return isCreateFirst;
}

public void setCreateFirst(boolean isCreateFirst) {
this.isCreateFirst = isCreateFirst;
}

public boolean isDeleteFirst() {
return isDeleteFirst;
}

public void setDeleteFirst(boolean isDeleteFirst) {
this.isDeleteFirst = isDeleteFirst;
}

public boolean isRequestProcessed() {
return isRequestProcessed;
}

public void setRequestProcessed(boolean isRequestProcessed) {
this.isRequestProcessed = isRequestProcessed;
}

public boolean isConditional() {
return isConditional;
}

public void setConditional(boolean isConditional) {
this.isConditional = isConditional;
}

public boolean isCustomSql() {
return isCustomSql;
}

public void setCustomSql(boolean isCustomSql) {
this.isCustomSql = isCustomSql;
}

public long getBatchCount() {
return batchCount;
}

public void setBatchCount(long batchCount) {
this.batchCount = batchCount;
}

public String getCurrentTableName() {
return currentTableName;
}

public void setCurrentTableName(String currentTableName) {
this.currentTableName = currentTableName;
}

public long getDataCount() {
return dataCount;
}

public void setDataCount(long dataCount) {
this.dataCount = dataCount;
}

public String getProcessStatus() {
return processStatus;
}

public void setProcessStatus(String processStatus) {
this.processStatus = processStatus;
}

public String getProcessName() {
return processName;
}

public void setProcessName(String processName) {
this.processName = processName;
}




}

0 comments on commit 4a4cb50

Please sign in to comment.