Skip to content
Permalink
Browse files

0003883: Lost heartbeat when using more than two node groups

  • Loading branch information...
elong
elong committed Mar 4, 2019
1 parent 28fd1fa commit da503d43d3f643268753fc11fda17386b41d6265
@@ -87,8 +87,9 @@ public void heartbeat(Node me) {
}

log.debug("Updating my node info");
engine.getOutgoingBatchService().markHeartbeatAsSent();
engine.getNodeService().updateNodeHostForCurrentNode();
if (engine.getOutgoingBatchService().countOutgoingBatchesUnsentHeartbeat() == 0) {
engine.getNodeService().updateNodeHostForCurrentNode();
}
log.debug("Done updating my node info");

if (!engine.getNodeService().isRegistrationServer()) {
@@ -48,8 +48,6 @@

public void markAllConfigAsSentForNode(String nodeId);

public void markHeartbeatAsSent();

public void updateAbandonedRoutingBatches();

public OutgoingBatch findOutgoingBatch(long batchId, String nodeId);
@@ -98,7 +96,9 @@ public OutgoingBatches getOutgoingBatches(String nodeId, String channelThread, N
public int countOutgoingBatchesInError(String channelId);

public int countOutgoingBatchesUnsent(String channelId);


public int countOutgoingBatchesUnsentHeartbeat();

public Map<String, Integer> countOutgoingBatchesPendingByChannel(String nodeId);

public long countUnsentRowsByTargetNode(String nodeId);
@@ -20,9 +20,6 @@
*/
package org.jumpmind.symmetric.service.impl;

import static org.jumpmind.symmetric.common.TableConstants.SYM_NODE_HOST;
import static org.jumpmind.symmetric.common.TableConstants.getTableName;

import java.io.Serializable;
import java.sql.Types;
import java.util.ArrayList;
@@ -149,23 +146,6 @@ public void markAllConfigAsSentForNode(String nodeId) {
} while (updateCount > 0);
}


@Override
public void markHeartbeatAsSent() {
String sql = getSql("cancelChannelBatchesSelectSql");

List<Row> elgibleBatches = sqlTemplateDirty.query(sql, new Object[] { Constants.CHANNEL_HEARTBEAT, "OK", getTableName(getTablePrefix(), SYM_NODE_HOST) });

if (elgibleBatches != null) {
String updateSql = getSql("cancelChannelBatchSql");
for (Row elgibleBatch : elgibleBatches) {
String nodeId = elgibleBatch.getString("node_id");
long batchId = elgibleBatch.getLong("batch_id");
sqlTemplate.update(updateSql, nodeId, batchId);
}
}
}

public void copyOutgoingBatches(String channelId, long startBatchId, String fromNodeId, String toNodeId) {
log.info("Copying outgoing batches for channel '{}' from node '{}' to node '{}' starting at {}",
new Object[] { channelId, fromNodeId, toNodeId, startBatchId });
@@ -334,6 +314,11 @@ public int countOutgoingBatchesUnsent(String channelId) {
return sqlTemplateDirty.queryForInt(getSql("countOutgoingBatchesUnsentOnChannelSql"), channelId);
}

@Override
public int countOutgoingBatchesUnsentHeartbeat() {
return sqlTemplateDirty.queryForInt(getSql("countOutgoingBatchesUnsentHeartbeat"));
}

@Override
public Map<String, Integer> countOutgoingBatchesPendingByChannel(String nodeId) {
List<Row> rows = sqlTemplateDirty.query(getSql("countOutgoingBatchesByChannelSql"), new Object[] { nodeId });
@@ -43,12 +43,6 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
putSql("cancelLoadBatchesSql",
"update $(outgoing_batch) set ignore_count=1, status='OK', error_flag=0, last_update_time=current_timestamp where load_id=?");

putSql("cancelChannelBatchesSelectSql",
"select node_id, batch_id from $(outgoing_batch) where channel_id = ? and status <> ? and summary=? and sent_count=0");

putSql("cancelChannelBatchSql",
"update $(outgoing_batch) set ignore_count=1, status='OK', error_flag=0, last_update_time=current_timestamp where node_id = ? and batch_id = ?");

putSql("insertOutgoingBatchSql",
"insert into $(outgoing_batch) "
+ " (batch_id, node_id, channel_id, status, load_id, extract_job_flag, load_flag, common_flag, reload_row_count, other_row_count, "
@@ -148,6 +142,11 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform,
putSql("countOutgoingBatchesUnsentOnChannelSql",
"select count(*) from $(outgoing_batch) where status != 'OK' and channel_id=?");

putSql("countOutgoingBatchesUnsentHeartbeat",
"select count(*) from $(outgoing_batch) b inner join $(data_event) e on e.batch_id = b.batch_id " +
"inner join $(data) d on d.data_id = e.data_id " +
"where b.channel_id = 'heartbeat' and b.status != 'OK' and d.source_node_id is null");

putSql("selectOutgoingBatchSummaryPrefixSql",
"select b.status ");

@@ -33,6 +33,7 @@
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.transform.AdditiveColumnTransform;
import org.jumpmind.symmetric.io.data.transform.BinaryLeftColumnTransform;
@@ -245,6 +246,7 @@ public void clearCache() {

byTableName.add(transformTable);
}
addBuiltInTableTransforms(byByLinkByTransformPoint);
lastCacheTimeInMs = System.currentTimeMillis();
this.transformsCacheByNodeGroupLinkByTransformPoint = byByLinkByTransformPoint;
}
@@ -253,6 +255,33 @@ public void clearCache() {
return byByLinkByTransformPoint;
}

private void addBuiltInTableTransforms(Map<NodeGroupLink, Map<TransformPoint, List<TransformTableNodeGroupLink>>> byLinkByTransformPoint) {
List<NodeGroupLink> nodeGroupLinks = configurationService.getNodeGroupLinks(true);
for (NodeGroupLink nodeGroupLink : nodeGroupLinks) {
Map<TransformPoint, List<TransformTableNodeGroupLink>> byTransformPoint = byLinkByTransformPoint.get(nodeGroupLink);
if (byTransformPoint == null) {
byTransformPoint = new HashMap<TransformPoint, List<TransformTableNodeGroupLink>>();
byLinkByTransformPoint.put(nodeGroupLink, byTransformPoint);
}
List<TransformTableNodeGroupLink> transforms = byTransformPoint.get(TransformPoint.LOAD);
if (transforms == null) {
transforms = new ArrayList<TransformTableNodeGroupLink>();
byTransformPoint.put(TransformPoint.LOAD, transforms);
}
TransformColumn column = new TransformColumn("heartbeat_time", "heartbeat_time", false);
column.setTransformType("variable");
column.setTransformExpression("system_timestamp");
String tableName = TableConstants.getTableName(parameterService.getTablePrefix(), TableConstants.SYM_NODE_HOST);
TransformTableNodeGroupLink transform = new TransformTableNodeGroupLink();
transform.setSourceTableName(tableName);
transform.setTargetTableName(tableName);
transform.setTransformPoint(TransformPoint.LOAD);
transform.addTransformColumn(column);
transform.setNodeGroupLink(nodeGroupLink);
transforms.add(transform);
}
}

private List<TransformTableNodeGroupLink> getTransformTablesFromDB(boolean includeColumns, boolean replaceTokens) {
List<TransformTableNodeGroupLink> transforms = sqlTemplate.query(
getSql("selectTransformTable"), new TransformTableMapper());

0 comments on commit da503d4

Please sign in to comment.
You can’t perform that action at this time.