Skip to content

Commit

Permalink
0000855: Create a new heartbeat channel for sym_node_host. Deprecate …
Browse files Browse the repository at this point in the history
…heartbeat_time on sym_node
  • Loading branch information
chenson42 committed Nov 9, 2012
1 parent 82ef960 commit 7d18011
Show file tree
Hide file tree
Showing 22 changed files with 206 additions and 145 deletions.
@@ -1,3 +1,23 @@
/*
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU Lesser General Public License (the
* "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric;

import java.io.File;
Expand Down
Expand Up @@ -62,7 +62,9 @@ private Constants() {

public static final String SYMMETRIC_ENGINE = "symmetricEngine";

public static final String MBEAN_SERVER = "mbeanserver";
public static final String MBEAN_SERVER = "mbeanserver";

public static final String CHANNEL_HEARTBEAT = "heartbeat";

public static final String CHANNEL_CONFIG = "config";

Expand Down
Expand Up @@ -76,6 +76,7 @@ private ParameterConstants() {
public final static String AUTO_RELOAD_REVERSE_ENABLED = "auto.reload.reverse";
public final static String AUTO_INSERT_REG_SVR_IF_NOT_FOUND = "auto.insert.registration.svr.if.not.found";
public final static String AUTO_SYNC_CONFIGURATION = "auto.sync.configuration";
public final static String AUTO_SYNC_CONFIGURATION_ON_INCOMING_AT_REG_SVR = "auto.sync.configuration.on.incoming.at.registration.server";
public final static String AUTO_CONFIGURE_DATABASE = "auto.config.database";
public final static String AUTO_SYNC_TRIGGERS = "auto.sync.triggers";
public final static String AUTO_REFRESH_AFTER_CONFIG_CHANGED = "auto.refresh.after.config.changes.detected";
Expand Down
Expand Up @@ -46,7 +46,7 @@ public class TableConstants {
public static final String SYM_DATA_EVENT = "data_event";
public static final String SYM_TRANSFORM_TABLE = "transform_table";
public static final String SYM_LOAD_FILTER = "load_filter";
public static final String SYM_TRANSFORM_COLUMN = "transform_column";
public static final String SYM_TRANSFORM_COLUMN = "transform_column";
public static final String SYM_TRIGGER_ROUTER = "trigger_router";
public static final String SYM_CHANNEL = "channel";
public static final String SYM_NODE_SECURITY = "node_security";
Expand All @@ -62,7 +62,7 @@ public class TableConstants {
public static final String SYM_NODE_GROUP_CHANNEL_WINDOW = "node_group_channel_window";
public static final String SYM_NODE_HOST_CHANNEL_STATS = "node_host_channel_stats";
public static final String SYM_INCOMING_ERROR = "incoming_error";
public static final String SYM_SEQUENCE = "sequence";
public static final String SYM_SEQUENCE = "sequence";

private static List<String> tablesWithPrefix;

Expand Down
Expand Up @@ -55,7 +55,7 @@ public DefaultOfflineServerListener(IStatisticManager statisticManager,
* outgoing batches.
*/
public void clientNodeOffline(Node node) {
log.warn("Node {} is offline. Last heartbeat was {}, timezone {}. Syncing will be disabled and node security deleted.", new Object[] {node.getNodeId(), node.getHeartbeatTime(), node.getTimezoneOffset()});
log.warn("The '{}' node is offline. Syncing will be disabled and node security deleted", new Object[] {node.getNodeId()});
statisticManager.incrementNodesDisabled(1);
node.setSyncEnabled(false);
nodeService.save(node);
Expand Down
Expand Up @@ -22,46 +22,85 @@

import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.ext.IHeartbeatListener;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IDataService;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushHeartbeatListener implements IHeartbeatListener {

private IDataService dataService;
private INodeService nodeService;
private ISymmetricDialect symmetricDialect;
private IParameterService parameterService;
protected final Logger log = LoggerFactory.getLogger(getClass());

public PushHeartbeatListener(IParameterService parameterService, IDataService dataService,
INodeService nodeService, ISymmetricDialect symmetricDialect) {
this.parameterService = parameterService;
this.dataService = dataService;
this.nodeService = nodeService;
this.symmetricDialect = symmetricDialect;
private ISymmetricEngine engine;

public PushHeartbeatListener(ISymmetricEngine engine) {
this.engine = engine;
}

public void heartbeat(Node me, Set<Node> children) {
IParameterService parameterService = engine.getParameterService();
if (parameterService.is(ParameterConstants.HEARTBEAT_ENABLED)) {
// don't send new heart beat events if we haven't sent
// the last ones ...
if (!nodeService.isRegistrationServer()) {
ISymmetricDialect symmetricDialect = engine.getSymmetricDialect();
int outgoingErrorCount = engine.getOutgoingBatchService().countOutgoingBatchesInError();
int outgoingUnsentCount = 0;
if (outgoingErrorCount > 0) {
// only set the unsent count if there are batches in error
outgoingUnsentCount = engine.getOutgoingBatchService().countOutgoingBatchesUnsent();
}
if (!parameterService.getExternalId().equals(me.getExternalId())
|| !parameterService.getNodeGroupId().equals(me.getNodeGroupId())
|| (parameterService.getSyncUrl() != null && !parameterService.getSyncUrl().equals(me.getSyncUrl()))
|| !parameterService.getString(ParameterConstants.SCHEMA_VERSION, "").equals(me.getSchemaVersion())
|| (engine.getDeploymentType() != null && !engine.getDeploymentType().equals(me.getDeploymentType()))
|| !Version.version().equals(me.getSymmetricVersion())
|| !symmetricDialect.getName().equals(me.getDatabaseType())
|| !symmetricDialect.getVersion().equals(me.getDatabaseVersion())
|| me.getBatchInErrorCount() != outgoingErrorCount
|| me.getBatchToSendCount() != outgoingUnsentCount) {
log.info("Some attribute(s) of node changed. Recording changes");
me.setDeploymentType(engine.getDeploymentType());
me.setSymmetricVersion(Version.version());
me.setDatabaseType(symmetricDialect.getName());
me.setDatabaseVersion(symmetricDialect.getVersion());
me.setBatchInErrorCount(outgoingErrorCount);
me.setBatchToSendCount(outgoingUnsentCount);
me.setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION));
if (parameterService.is(ParameterConstants.AUTO_UPDATE_NODE_VALUES)) {
log.info("Updating my node configuration info according to the symmetric properties");
me.setExternalId(parameterService.getExternalId());
me.setNodeGroupId(parameterService.getNodeGroupId());
if (!StringUtils.isBlank(parameterService.getSyncUrl())) {
me.setSyncUrl(parameterService.getSyncUrl());
}
}

engine.getNodeService().save(me);
}

log.debug("Updating my node info");
engine.getNodeService().updateNodeHostForCurrentNode();
log.debug("Done updating my node info");

if (!engine.getNodeService().isRegistrationServer()) {
if (!symmetricDialect.getPlatform().getDatabaseInfo().isTriggersSupported()) {
dataService.insertHeartbeatEvent(me, false);
engine.getDataService().insertHeartbeatEvent(me, false);
for (Node node : children) {
dataService.insertHeartbeatEvent(node, false);
engine.getDataService().insertHeartbeatEvent(node, false);
}
}
}
}
}

public long getTimeBetweenHeartbeatsInSeconds() {
return parameterService.getLong(ParameterConstants.HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC, 600);
return engine.getParameterService().getLong(
ParameterConstants.HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC, 600);
}

}
Expand Up @@ -22,7 +22,6 @@
package org.jumpmind.symmetric.model;

import java.io.Serializable;
import java.util.Date;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -68,10 +67,6 @@ public class Node implements Serializable {

private boolean syncEnabled = true;

private String timezoneOffset;

private Date heartbeatTime = new Date();

private String createdAtNodeId;

private int batchToSendCount;
Expand Down Expand Up @@ -197,22 +192,6 @@ public String toString() {
return nodeGroupId + ":" + externalId + ":" + (nodeId == null ? "?" : nodeId);
}

public Date getHeartbeatTime() {
return heartbeatTime;
}

public void setHeartbeatTime(Date heartbeatTime) {
this.heartbeatTime = heartbeatTime;
}

public String getTimezoneOffset() {
return timezoneOffset;
}

public void setTimezoneOffset(String timezoneOffset) {
this.timezoneOffset = timezoneOffset;
}

public String getCreatedAtNodeId() {
return createdAtNodeId;
}
Expand Down
Expand Up @@ -177,44 +177,7 @@ public List<OutgoingBatch> getBatchesForChannels(Set<String> channelIds) {
}
}
return batchList;
}

public List<OutgoingBatch> getBatchesForChannelWindows(Node targetNode, NodeChannel channel,
List<NodeGroupChannelWindow> windows) {
List<OutgoingBatch> keeping = new ArrayList<OutgoingBatch>();

if (batches != null && batches.size() > 0) {
if (inTimeWindow(windows, targetNode.getTimezoneOffset())) {
int maxBatchesToSend = channel.getMaxBatchToSend();
for (OutgoingBatch outgoingBatch : batches) {
if (channel.getChannelId().equals(outgoingBatch.getChannelId()) && maxBatchesToSend > 0) {
keeping.add(outgoingBatch);
maxBatchesToSend--;
}
}
}
}
return keeping;
}

/**
* If {@link NodeGroupChannelWindow}s are defined for this channel, then
* check to see if the time (according to the offset passed in) is within on
* of the configured windows.
*/
public boolean inTimeWindow(List<NodeGroupChannelWindow> windows, String timezoneOffset) {
if (windows != null && windows.size() > 0) {
for (NodeGroupChannelWindow window : windows) {
if (window.inTimeWindow(timezoneOffset)) {
return true;
}
}
return false;
} else {
return true;
}

}
}

/**
* Removes all batches that are not associated with an 'activeChannel'.
Expand Down
Expand Up @@ -111,7 +111,7 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId,
public boolean isExternalIdRegistered(String nodeGroupId, String externalId);

public void save(Node node);

public void updateNodeHostForCurrentNode();

public void insertNodeIdentity(String nodeId);
Expand Down
Expand Up @@ -62,6 +62,8 @@ public interface IOutgoingBatchService {

public int countOutgoingBatchesInError();

public int countOutgoingBatchesUnsent();

public List<OutgoingBatchSummary> findOutgoingBatchSummary(OutgoingBatch.Status ... statuses);

public int countOutgoingBatches(List<String> nodeIds, List<String> channels,
Expand Down
Expand Up @@ -65,6 +65,7 @@ public ConfigurationService(IParameterService parameterService, ISymmetricDialec
super(parameterService, dialect);
this.nodeService = nodeService;
this.defaultChannels = new ArrayList<Channel>();
this.defaultChannels.add(new Channel(Constants.CHANNEL_HEARTBEAT, 2, 100, 100, true, 0));
this.defaultChannels.add(new Channel(Constants.CHANNEL_CONFIG, 0, 100, 100, true, 0));
this.defaultChannels.add(new Channel(Constants.CHANNEL_RELOAD, 1, 1, 1, true, 0));
this.defaultChannels.add(new Channel(Constants.CHANNEL_DEFAULT, 99999, 1000, 100, true, 0));
Expand Down
Expand Up @@ -765,6 +765,7 @@ protected void enableSyncTriggers(DataContext context) {

public void batchInError(DataContext context, Exception ex) {
try {
ex.printStackTrace();
Batch batch = context.getBatch();
if (context.getWriter() != null) {
this.currentBatch.setValues(context.getReader().getStatistics().get(batch),
Expand Down
Expand Up @@ -27,7 +27,6 @@
import java.sql.DataTruncation;
import java.sql.Types;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
Expand All @@ -47,7 +46,6 @@
import org.jumpmind.db.sql.UniqueKeyException;
import org.jumpmind.db.sql.mapper.NumberMapper;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
Expand Down Expand Up @@ -93,8 +91,7 @@ public DataService(ISymmetricEngine engine) {
this.engine = engine;
this.reloadListeners = new ArrayList<IReloadListener>();
this.heartbeatListeners = new ArrayList<IHeartbeatListener>();
this.heartbeatListeners.add(new PushHeartbeatListener(parameterService, this, engine.getNodeService(),
symmetricDialect));
this.heartbeatListeners.add(new PushHeartbeatListener(engine));
this.dataMapper = new DataMapper();

setSqlMap(new DataServiceSqlMap(symmetricDialect.getPlatform(),
Expand Down Expand Up @@ -984,38 +981,11 @@ public void heartbeat(boolean force) {
if (listeners.size() > 0) {
Node me = engine.getNodeService().findIdentity();
if (me != null) {
log.info("Updating time and version node info");
Calendar now = Calendar.getInstance();
now.set(Calendar.MILLISECOND, 0);
me.setDeploymentType(engine.getDeploymentType());
me.setHeartbeatTime(now.getTime());
me.setTimezoneOffset(AppUtils.getTimezoneOffset());
me.setSymmetricVersion(Version.version());
me.setDatabaseType(symmetricDialect.getName());
me.setDatabaseVersion(symmetricDialect.getVersion());
me.setBatchInErrorCount(engine.getOutgoingBatchService().countOutgoingBatchesInError());
if (parameterService.is(ParameterConstants.AUTO_UPDATE_NODE_VALUES)) {
log.info("Updating my node configuration info according to the symmetric properties");
me.setSchemaVersion(parameterService
.getString(ParameterConstants.SCHEMA_VERSION));
me.setExternalId(parameterService.getExternalId());
me.setNodeGroupId(parameterService.getNodeGroupId());
if (!StringUtils.isBlank(parameterService.getSyncUrl())) {
me.setSyncUrl(parameterService.getSyncUrl());
}
}

engine.getNodeService().save(me);
engine.getNodeService().updateNodeHostForCurrentNode();
log.info("Done updating my node info.");

Set<Node> children = engine.getNodeService().findNodesThatOriginatedFromNodeId(me.getNodeId());
for (IHeartbeatListener l : listeners) {
l.heartbeat(me, children);
}

updateLastHeartbeatTime(listeners);

} else {
log.debug("Did not run the heartbeat process because the node has not been configured");
}
Expand Down

0 comments on commit 7d18011

Please sign in to comment.