From 7d18011fa5fb5bacce6f9fa639e6600d486eeded Mon Sep 17 00:00:00 2001 From: chenson42 Date: Fri, 9 Nov 2012 23:13:07 +0000 Subject: [PATCH] 0000855: Create a new heartbeat channel for sym_node_host. Deprecate heartbeat_time on sym_node --- .../symmetric/AbstractSymmetricEngine.java | 20 +++++ .../jumpmind/symmetric/common/Constants.java | 4 +- .../symmetric/common/ParameterConstants.java | 1 + .../symmetric/common/TableConstants.java | 4 +- .../job/DefaultOfflineServerListener.java | 2 +- .../symmetric/job/PushHeartbeatListener.java | 75 ++++++++++++++----- .../org/jumpmind/symmetric/model/Node.java | 21 ------ .../symmetric/model/OutgoingBatches.java | 39 +--------- .../symmetric/service/INodeService.java | 2 +- .../service/IOutgoingBatchService.java | 2 + .../service/impl/ConfigurationService.java | 1 + .../service/impl/DataLoaderService.java | 1 + .../symmetric/service/impl/DataService.java | 32 +------- .../symmetric/service/impl/NodeService.java | 32 ++++---- .../service/impl/NodeServiceSqlMap.java | 12 +-- .../service/impl/OutgoingBatchService.java | 53 ++++++++++++- .../impl/OutgoingBatchServiceSqlMap.java | 3 + .../service/impl/TriggerRouterService.java | 8 +- .../resources/symmetric-default.properties | 12 ++- .../src/main/resources/symmetric-schema.xml | 4 +- .../impl/AbstractRouterServiceTest.java | 20 +++++ .../symmetric/web/InfoUriHandler.java | 3 - 22 files changed, 206 insertions(+), 145 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 60351bab3f..dbfe1cee94 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -1,3 +1,23 @@ +/* + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU Lesser General Public License (the + * "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jumpmind.symmetric; import java.io.File; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java index 2e33b06992..aaaf724b4b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java @@ -62,7 +62,9 @@ private Constants() { public static final String SYMMETRIC_ENGINE = "symmetricEngine"; - public static final String MBEAN_SERVER = "mbeanserver"; + public static final String MBEAN_SERVER = "mbeanserver"; + + public static final String CHANNEL_HEARTBEAT = "heartbeat"; public static final String CHANNEL_CONFIG = "config"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 93e17851c4..0105c2f4b8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -76,6 +76,7 @@ private ParameterConstants() { public final static String AUTO_RELOAD_REVERSE_ENABLED = "auto.reload.reverse"; public final static String AUTO_INSERT_REG_SVR_IF_NOT_FOUND = "auto.insert.registration.svr.if.not.found"; public final static String AUTO_SYNC_CONFIGURATION = "auto.sync.configuration"; + public final static String AUTO_SYNC_CONFIGURATION_ON_INCOMING_AT_REG_SVR = "auto.sync.configuration.on.incoming.at.registration.server"; public final static String AUTO_CONFIGURE_DATABASE = "auto.config.database"; public final static String AUTO_SYNC_TRIGGERS = "auto.sync.triggers"; public final static String AUTO_REFRESH_AFTER_CONFIG_CHANGED = "auto.refresh.after.config.changes.detected"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java index e5eed18847..7fc4127e87 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java @@ -46,7 +46,7 @@ public class TableConstants { public static final String SYM_DATA_EVENT = "data_event"; public static final String SYM_TRANSFORM_TABLE = "transform_table"; public static final String SYM_LOAD_FILTER = "load_filter"; - public static final String SYM_TRANSFORM_COLUMN = "transform_column"; + public static final String SYM_TRANSFORM_COLUMN = "transform_column"; public static final String SYM_TRIGGER_ROUTER = "trigger_router"; public static final String SYM_CHANNEL = "channel"; public static final String SYM_NODE_SECURITY = "node_security"; @@ -62,7 +62,7 @@ public class TableConstants { public static final String SYM_NODE_GROUP_CHANNEL_WINDOW = "node_group_channel_window"; public static final String SYM_NODE_HOST_CHANNEL_STATS = "node_host_channel_stats"; public static final String SYM_INCOMING_ERROR = "incoming_error"; - public static final String SYM_SEQUENCE = "sequence"; + public static final String SYM_SEQUENCE = "sequence"; private static List tablesWithPrefix; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/DefaultOfflineServerListener.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/DefaultOfflineServerListener.java index 9577e686b6..a8d4f81768 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/DefaultOfflineServerListener.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/DefaultOfflineServerListener.java @@ -55,7 +55,7 @@ public DefaultOfflineServerListener(IStatisticManager statisticManager, * outgoing batches. */ public void clientNodeOffline(Node node) { - log.warn("Node {} is offline. Last heartbeat was {}, timezone {}. Syncing will be disabled and node security deleted.", new Object[] {node.getNodeId(), node.getHeartbeatTime(), node.getTimezoneOffset()}); + log.warn("The '{}' node is offline. Syncing will be disabled and node security deleted", new Object[] {node.getNodeId()}); statisticManager.incrementNodesDisabled(1); node.setSyncEnabled(false); nodeService.save(node); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java index b083f618da..708c5a8b7b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java @@ -22,38 +22,76 @@ import java.util.Set; +import org.apache.commons.lang.StringUtils; +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.Version; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.ext.IHeartbeatListener; import org.jumpmind.symmetric.model.Node; -import org.jumpmind.symmetric.service.IDataService; -import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IParameterService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PushHeartbeatListener implements IHeartbeatListener { - private IDataService dataService; - private INodeService nodeService; - private ISymmetricDialect symmetricDialect; - private IParameterService parameterService; + protected final Logger log = LoggerFactory.getLogger(getClass()); - public PushHeartbeatListener(IParameterService parameterService, IDataService dataService, - INodeService nodeService, ISymmetricDialect symmetricDialect) { - this.parameterService = parameterService; - this.dataService = dataService; - this.nodeService = nodeService; - this.symmetricDialect = symmetricDialect; + private ISymmetricEngine engine; + + public PushHeartbeatListener(ISymmetricEngine engine) { + this.engine = engine; } public void heartbeat(Node me, Set children) { + IParameterService parameterService = engine.getParameterService(); if (parameterService.is(ParameterConstants.HEARTBEAT_ENABLED)) { - // don't send new heart beat events if we haven't sent - // the last ones ... - if (!nodeService.isRegistrationServer()) { + ISymmetricDialect symmetricDialect = engine.getSymmetricDialect(); + int outgoingErrorCount = engine.getOutgoingBatchService().countOutgoingBatchesInError(); + int outgoingUnsentCount = 0; + if (outgoingErrorCount > 0) { + // only set the unsent count if there are batches in error + outgoingUnsentCount = engine.getOutgoingBatchService().countOutgoingBatchesUnsent(); + } + if (!parameterService.getExternalId().equals(me.getExternalId()) + || !parameterService.getNodeGroupId().equals(me.getNodeGroupId()) + || (parameterService.getSyncUrl() != null && !parameterService.getSyncUrl().equals(me.getSyncUrl())) + || !parameterService.getString(ParameterConstants.SCHEMA_VERSION, "").equals(me.getSchemaVersion()) + || (engine.getDeploymentType() != null && !engine.getDeploymentType().equals(me.getDeploymentType())) + || !Version.version().equals(me.getSymmetricVersion()) + || !symmetricDialect.getName().equals(me.getDatabaseType()) + || !symmetricDialect.getVersion().equals(me.getDatabaseVersion()) + || me.getBatchInErrorCount() != outgoingErrorCount + || me.getBatchToSendCount() != outgoingUnsentCount) { + log.info("Some attribute(s) of node changed. Recording changes"); + me.setDeploymentType(engine.getDeploymentType()); + me.setSymmetricVersion(Version.version()); + me.setDatabaseType(symmetricDialect.getName()); + me.setDatabaseVersion(symmetricDialect.getVersion()); + me.setBatchInErrorCount(outgoingErrorCount); + me.setBatchToSendCount(outgoingUnsentCount); + me.setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION)); + if (parameterService.is(ParameterConstants.AUTO_UPDATE_NODE_VALUES)) { + log.info("Updating my node configuration info according to the symmetric properties"); + me.setExternalId(parameterService.getExternalId()); + me.setNodeGroupId(parameterService.getNodeGroupId()); + if (!StringUtils.isBlank(parameterService.getSyncUrl())) { + me.setSyncUrl(parameterService.getSyncUrl()); + } + } + + engine.getNodeService().save(me); + } + + log.debug("Updating my node info"); + engine.getNodeService().updateNodeHostForCurrentNode(); + log.debug("Done updating my node info"); + + if (!engine.getNodeService().isRegistrationServer()) { if (!symmetricDialect.getPlatform().getDatabaseInfo().isTriggersSupported()) { - dataService.insertHeartbeatEvent(me, false); + engine.getDataService().insertHeartbeatEvent(me, false); for (Node node : children) { - dataService.insertHeartbeatEvent(node, false); + engine.getDataService().insertHeartbeatEvent(node, false); } } } @@ -61,7 +99,8 @@ public void heartbeat(Node me, Set children) { } public long getTimeBetweenHeartbeatsInSeconds() { - return parameterService.getLong(ParameterConstants.HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC, 600); + return engine.getParameterService().getLong( + ParameterConstants.HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC, 600); } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java index fecee75a3b..0512527e73 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java @@ -22,7 +22,6 @@ package org.jumpmind.symmetric.model; import java.io.Serializable; -import java.util.Date; import java.util.Properties; import org.apache.commons.lang.StringUtils; @@ -68,10 +67,6 @@ public class Node implements Serializable { private boolean syncEnabled = true; - private String timezoneOffset; - - private Date heartbeatTime = new Date(); - private String createdAtNodeId; private int batchToSendCount; @@ -197,22 +192,6 @@ public String toString() { return nodeGroupId + ":" + externalId + ":" + (nodeId == null ? "?" : nodeId); } - public Date getHeartbeatTime() { - return heartbeatTime; - } - - public void setHeartbeatTime(Date heartbeatTime) { - this.heartbeatTime = heartbeatTime; - } - - public String getTimezoneOffset() { - return timezoneOffset; - } - - public void setTimezoneOffset(String timezoneOffset) { - this.timezoneOffset = timezoneOffset; - } - public String getCreatedAtNodeId() { return createdAtNodeId; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java index 7d2f9b2c5a..24fb3c6734 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatches.java @@ -177,44 +177,7 @@ public List getBatchesForChannels(Set channelIds) { } } return batchList; - } - - public List getBatchesForChannelWindows(Node targetNode, NodeChannel channel, - List windows) { - List keeping = new ArrayList(); - - if (batches != null && batches.size() > 0) { - if (inTimeWindow(windows, targetNode.getTimezoneOffset())) { - int maxBatchesToSend = channel.getMaxBatchToSend(); - for (OutgoingBatch outgoingBatch : batches) { - if (channel.getChannelId().equals(outgoingBatch.getChannelId()) && maxBatchesToSend > 0) { - keeping.add(outgoingBatch); - maxBatchesToSend--; - } - } - } - } - return keeping; - } - - /** - * If {@link NodeGroupChannelWindow}s are defined for this channel, then - * check to see if the time (according to the offset passed in) is within on - * of the configured windows. - */ - public boolean inTimeWindow(List windows, String timezoneOffset) { - if (windows != null && windows.size() > 0) { - for (NodeGroupChannelWindow window : windows) { - if (window.inTimeWindow(timezoneOffset)) { - return true; - } - } - return false; - } else { - return true; - } - - } + } /** * Removes all batches that are not associated with an 'activeChannel'. diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java index 08eab5eb2b..3f03e775fb 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INodeService.java @@ -111,7 +111,7 @@ public void ignoreNodeChannelForExternalId(boolean ignore, String channelId, public boolean isExternalIdRegistered(String nodeGroupId, String externalId); public void save(Node node); - + public void updateNodeHostForCurrentNode(); public void insertNodeIdentity(String nodeId); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java index f1c2def407..78f32ed253 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IOutgoingBatchService.java @@ -62,6 +62,8 @@ public interface IOutgoingBatchService { public int countOutgoingBatchesInError(); + public int countOutgoingBatchesUnsent(); + public List findOutgoingBatchSummary(OutgoingBatch.Status ... statuses); public int countOutgoingBatches(List nodeIds, List channels, diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java index 308bba1225..f3c5ca7828 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java @@ -65,6 +65,7 @@ public ConfigurationService(IParameterService parameterService, ISymmetricDialec super(parameterService, dialect); this.nodeService = nodeService; this.defaultChannels = new ArrayList(); + this.defaultChannels.add(new Channel(Constants.CHANNEL_HEARTBEAT, 2, 100, 100, true, 0)); this.defaultChannels.add(new Channel(Constants.CHANNEL_CONFIG, 0, 100, 100, true, 0)); this.defaultChannels.add(new Channel(Constants.CHANNEL_RELOAD, 1, 1, 1, true, 0)); this.defaultChannels.add(new Channel(Constants.CHANNEL_DEFAULT, 99999, 1000, 100, true, 0)); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index c3cac73ba3..53b3f89fc1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -765,6 +765,7 @@ protected void enableSyncTriggers(DataContext context) { public void batchInError(DataContext context, Exception ex) { try { + ex.printStackTrace(); Batch batch = context.getBatch(); if (context.getWriter() != null) { this.currentBatch.setValues(context.getReader().getStatistics().get(batch), diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 4985055961..1d375fb109 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -27,7 +27,6 @@ import java.sql.DataTruncation; import java.sql.Types; import java.util.ArrayList; -import java.util.Calendar; import java.util.Collections; import java.util.Comparator; import java.util.Date; @@ -47,7 +46,6 @@ import org.jumpmind.db.sql.UniqueKeyException; import org.jumpmind.db.sql.mapper.NumberMapper; import org.jumpmind.symmetric.ISymmetricEngine; -import org.jumpmind.symmetric.Version; import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.TableConstants; @@ -93,8 +91,7 @@ public DataService(ISymmetricEngine engine) { this.engine = engine; this.reloadListeners = new ArrayList(); this.heartbeatListeners = new ArrayList(); - this.heartbeatListeners.add(new PushHeartbeatListener(parameterService, this, engine.getNodeService(), - symmetricDialect)); + this.heartbeatListeners.add(new PushHeartbeatListener(engine)); this.dataMapper = new DataMapper(); setSqlMap(new DataServiceSqlMap(symmetricDialect.getPlatform(), @@ -984,38 +981,11 @@ public void heartbeat(boolean force) { if (listeners.size() > 0) { Node me = engine.getNodeService().findIdentity(); if (me != null) { - log.info("Updating time and version node info"); - Calendar now = Calendar.getInstance(); - now.set(Calendar.MILLISECOND, 0); - me.setDeploymentType(engine.getDeploymentType()); - me.setHeartbeatTime(now.getTime()); - me.setTimezoneOffset(AppUtils.getTimezoneOffset()); - me.setSymmetricVersion(Version.version()); - me.setDatabaseType(symmetricDialect.getName()); - me.setDatabaseVersion(symmetricDialect.getVersion()); - me.setBatchInErrorCount(engine.getOutgoingBatchService().countOutgoingBatchesInError()); - if (parameterService.is(ParameterConstants.AUTO_UPDATE_NODE_VALUES)) { - log.info("Updating my node configuration info according to the symmetric properties"); - me.setSchemaVersion(parameterService - .getString(ParameterConstants.SCHEMA_VERSION)); - me.setExternalId(parameterService.getExternalId()); - me.setNodeGroupId(parameterService.getNodeGroupId()); - if (!StringUtils.isBlank(parameterService.getSyncUrl())) { - me.setSyncUrl(parameterService.getSyncUrl()); - } - } - - engine.getNodeService().save(me); - engine.getNodeService().updateNodeHostForCurrentNode(); - log.info("Done updating my node info."); - Set children = engine.getNodeService().findNodesThatOriginatedFromNodeId(me.getNodeId()); for (IHeartbeatListener l : listeners) { l.heartbeat(me, children); } - updateLastHeartbeatTime(listeners); - } else { log.debug("Did not run the heartbeat process because the node has not been configured"); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java index 9e22545023..98ebc3be46 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeService.java @@ -165,7 +165,7 @@ public NodeSecurity findNodeSecurity(String id) { public List findNodeHosts(String nodeId) { return sqlTemplate.query(getSql("selectNodeHostPrefixSql", "selectNodeHostByNodeIdSql"), new NodeHostRowMapper(), nodeId); - } + } public void updateNodeHostForCurrentNode() { if (nodeHostForCurrentNode == null) { @@ -257,8 +257,8 @@ public void save(Node node) { getSql("insertNodeSql"), new Object[] { node.getNodeGroupId(), node.getExternalId(), node.getDatabaseType(), node.getDatabaseVersion(), node.getSchemaVersion(), - node.getSymmetricVersion(), node.getSyncUrl(), node.getHeartbeatTime(), - node.isSyncEnabled() ? 1 : 0, node.getTimezoneOffset(), + node.getSymmetricVersion(), node.getSyncUrl(), new Date(), + node.isSyncEnabled() ? 1 : 0, AppUtils.getTimezoneOffset(), node.getBatchToSendCount(), node.getBatchInErrorCount(), node.getCreatedAtNodeId(), node.getDeploymentType(), node.getNodeId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, @@ -273,8 +273,8 @@ public boolean updateNode(Node node) { getSql("updateNodeSql"), new Object[] { node.getNodeGroupId(), node.getExternalId(), node.getDatabaseType(), node.getDatabaseVersion(), node.getSchemaVersion(), - node.getSymmetricVersion(), node.getSyncUrl(), node.getHeartbeatTime(), - node.isSyncEnabled() ? 1 : 0, node.getTimezoneOffset(), + node.getSymmetricVersion(), node.getSyncUrl(), new Date(), + node.isSyncEnabled() ? 1 : 0, AppUtils.getTimezoneOffset(), node.getBatchToSendCount(), node.getBatchInErrorCount(), node.getCreatedAtNodeId(), node.getDeploymentType(), node.getNodeId() }, new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, @@ -567,25 +567,27 @@ public List findOfflineNodes(long minutesOffline) { if (myNode != null) { long offlineNodeDetectionMillis = minutesOffline * 60 * 1000; - List list = sqlTemplate.query( - getSql("selectNodePrefixSql", "findOfflineNodesSql"), new NodeRowMapper(), - myNode.getNodeId(), myNode.getNodeId()); + List list = sqlTemplate.query(getSql("findOfflineNodesSql"), new Object[] { + myNode.getNodeId(), myNode.getNodeId()}, (int[])null); - for (Node node : list) { + for (Row node : list) { + String nodeId = node.getString("node_id"); + Date time = node.getDateTime("heartbeat_time"); + String offset = node.getString("timezone_offset"); // Take the timezone of the client node into account when // checking the hearbeat time. Date clientNodeCurrentTime = null; - if (node.getTimezoneOffset() != null) { + if (offset != null) { clientNodeCurrentTime = AppUtils - .getLocalDateForOffset(node.getTimezoneOffset()); + .getLocalDateForOffset(offset); } else { clientNodeCurrentTime = new Date(); } long cutOffTimeMillis = clientNodeCurrentTime.getTime() - offlineNodeDetectionMillis; - if (node.getHeartbeatTime() == null - || node.getHeartbeatTime().getTime() < cutOffTimeMillis) { - offlineNodeList.add(node); + if (time == null + || time.getTime() < cutOffTimeMillis) { + offlineNodeList.add(findNode(nodeId)); } } } @@ -635,8 +637,6 @@ public Node mapRow(Row rs) { node.setDatabaseVersion(rs.getString("database_version")); node.setSymmetricVersion(rs.getString("symmetric_version")); node.setCreatedAtNodeId(rs.getString("created_at_node_id")); - node.setHeartbeatTime(rs.getDateTime("heartbeat_time")); - node.setTimezoneOffset(rs.getString("timezone_offset")); node.setBatchToSendCount(rs.getInt("batch_to_send_count")); node.setBatchInErrorCount(rs.getInt("batch_in_error_count")); node.setDeploymentType(rs.getString("deployment_type")); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java index b15601682c..2ec68e03ad 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeServiceSqlMap.java @@ -97,13 +97,12 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace "" + "select node_id, host_name, ip_address, os_user, os_name, os_arch, os_version, available_processors, " + " free_memory_bytes, total_memory_bytes, max_memory_bytes, java_version, java_vendor, symmetric_version, " - + " timezone_offset, heartbeat_time, last_restart_time, create_time from $(node_host) "); + + " timezone_offset, heartbeat_time, last_restart_time, create_time from $(node_host) h"); - putSql("selectNodeHostByNodeIdSql", "" + "where node_id=? "); + putSql("selectNodeHostByNodeIdSql", "where node_id=?"); putSql("selectNodePrefixSql", - "" - + "select c.node_id, c.node_group_id, c.external_id, c.sync_enabled, c.sync_url, " + "select c.node_id, c.node_group_id, c.external_id, c.sync_enabled, c.sync_url, " + " c.schema_version, c.database_type, c.database_version, c.symmetric_version, c.created_at_node_id, c.heartbeat_time, c.timezone_offset, c.batch_to_send_count, c.batch_in_error_count, c.deployment_type from " + " $(node) c "); @@ -133,8 +132,9 @@ public NodeServiceSqlMap(IDatabasePlatform platform, Map replace + " total_memory_bytes=?, max_memory_bytes=?, java_version=?, java_vendor=?, symmetric_version=?, timezone_offset=?, heartbeat_time=?, " + " last_restart_time=? where node_id=? and host_name=? "); - putSql("findOfflineNodesSql", "" - + "where sync_enabled = 1 and node_id != ? and created_at_node_id = ? "); + putSql("findOfflineNodesSql", + "select h.node_id, max(h.heartbeat_time), h.timezone_offset from $(node_host) h inner join $(node) n on h.node_id=n.node_id" + + " where n.sync_enabled = 1 and n.node_id != ? and n.created_at_node_id = ? group by h.node_id, h.timezone_offset"); } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java index d2855cf5c2..2db5bcc2b0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchService.java @@ -40,6 +40,8 @@ import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeChannel; +import org.jumpmind.symmetric.model.NodeGroupChannelWindow; +import org.jumpmind.symmetric.model.NodeHost; import org.jumpmind.symmetric.model.NodeSecurity; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatch.Status; @@ -51,6 +53,7 @@ import org.jumpmind.symmetric.service.IOutgoingBatchService; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.service.ISequenceService; +import org.jumpmind.util.AppUtils; import org.jumpmind.util.FormatUtils; /** @@ -170,6 +173,10 @@ public OutgoingBatch findOutgoingBatch(long batchId, String nodeId) { public int countOutgoingBatchesInError() { return sqlTemplate.queryForInt(getSql("countOutgoingBatchesErrorsSql")); } + + public int countOutgoingBatchesUnsent() { + return sqlTemplate.queryForInt(getSql("countOutgoingBatchesUnsentSql")); + } public int countOutgoingBatches(List nodeIds, List channels, List statuses) { @@ -254,7 +261,7 @@ maxNumberOfBatchesToSelect, new OutgoingBatchMapper(includeDisabledChannels, tru for (NodeChannel channel : channels) { if (parameterService.is(ParameterConstants.DATA_EXTRACTOR_ENABLED) || channel.getChannelId().equals(Constants.CHANNEL_CONFIG)) { - keepers.addAll(batches.getBatchesForChannelWindows( + keepers.addAll(getBatchesForChannelWindows(batches, node, channel, configurationService.getNodeGroupChannelWindows( @@ -270,6 +277,50 @@ maxNumberOfBatchesToSelect, new OutgoingBatchMapper(includeDisabledChannels, tru return batches; } + + public List getBatchesForChannelWindows(OutgoingBatches batches, Node targetNode, NodeChannel channel, + List windows) { + List keeping = new ArrayList(); + List current = batches.getBatches(); + if (current != null && current.size() > 0) { + if (inTimeWindow(windows, targetNode)) { + int maxBatchesToSend = channel.getMaxBatchToSend(); + for (OutgoingBatch outgoingBatch : current) { + if (channel.getChannelId().equals(outgoingBatch.getChannelId()) && maxBatchesToSend > 0) { + keeping.add(outgoingBatch); + maxBatchesToSend--; + } + } + } + } + return keeping; + } + + /** + * If {@link NodeGroupChannelWindow}s are defined for this channel, then + * check to see if the time (according to the offset passed in) is within on + * of the configured windows. + */ + public boolean inTimeWindow(List windows, Node targetNode) { + if (windows != null && windows.size() > 0) { + for (NodeGroupChannelWindow window : windows) { + String timezoneOffset = null; + List hosts = nodeService.findNodeHosts(targetNode.getNodeId()); + if (hosts.size() > 0) { + timezoneOffset = hosts.get(0).getTimezoneOffset(); + } else { + timezoneOffset = AppUtils.getTimezoneOffset(); + } + if (window.inTimeWindow(timezoneOffset)) { + return true; + } + } + return false; + } else { + return true; + } + + } public OutgoingBatches getOutgoingBatchRange(String startBatchId, String endBatchId) { OutgoingBatches batches = new OutgoingBatches(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java index 8f3e08d496..75f0bbbd36 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/OutgoingBatchServiceSqlMap.java @@ -54,6 +54,9 @@ public OutgoingBatchServiceSqlMap(IDatabasePlatform platform, Map - - + + diff --git a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java index 2eb0d6c844..a1bbc02c1c 100644 --- a/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java +++ b/symmetric-core/src/test/java/org/jumpmind/symmetric/service/impl/AbstractRouterServiceTest.java @@ -1,3 +1,23 @@ +/* + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU Lesser General Public License (the + * "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.jumpmind.symmetric.service.impl; import java.sql.Types; diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/InfoUriHandler.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/InfoUriHandler.java index ab836c9133..73229d35bc 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/InfoUriHandler.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/InfoUriHandler.java @@ -42,8 +42,6 @@ */ public class InfoUriHandler extends AbstractUriHandler { - private static final long serialVersionUID = 1L; - private INodeService nodeService; private IConfigurationService configurationService; @@ -79,7 +77,6 @@ public void handle(HttpServletRequest req, HttpServletResponse res) throws IOExc properties.setProperty(InfoConstants.DATABASE_TYPE, node.getDatabaseType()); properties.setProperty(InfoConstants.DATABASE_VERSION, node.getDatabaseVersion()); properties.setProperty(InfoConstants.DEPLOYMENT_TYPE, node.getDeploymentType()); - properties.setProperty(InfoConstants.TIMEZONE_OFFSET, node.getTimezoneOffset()); properties.setProperty(InfoConstants.SYMMETRIC_VERSION, node.getSymmetricVersion()); }