From d60d8a3f56c70632054326e67e9d5c17edb33b3c Mon Sep 17 00:00:00 2001 From: chenson42 Date: Tue, 8 Mar 2011 13:24:42 +0000 Subject: [PATCH] SYMMETRICDS-342 AND SYMMETRICDS-388 --- .../symmetric/AbstractSymmetricEngine.java | 5 +- .../jumpmind/symmetric/ISymmetricEngine.java | 8 +- .../symmetric/common/ParameterConstants.java | 4 +- .../io/DefaultOfflineClientListener.java | 3 + .../symmetric/io/IOfflineClientListener.java | 2 + .../jumpmind/symmetric/job/AbstractJob.java | 39 ++++-- .../jumpmind/symmetric/job/HeartbeatJob.java | 9 +- .../org/jumpmind/symmetric/job/PullJob.java | 17 +-- .../org/jumpmind/symmetric/job/PurgeJob.java | 6 +- .../org/jumpmind/symmetric/job/PushJob.java | 6 +- .../org/jumpmind/symmetric/job/RouterJob.java | 4 +- .../symmetric/job/StatisticFlushJob.java | 7 +- .../symmetric/job/SyncTriggersJob.java | 7 +- .../jumpmind/symmetric/job/WatchdogJob.java | 8 +- .../symmetric/model/OutgoingBatch.java | 4 + .../symmetric/model/RemoteNodeStatus.java | 113 ++++++++++++++++++ .../symmetric/model/RemoteNodeStatuses.java | 53 ++++++++ .../service/IDataExtractorService.java | 6 +- .../symmetric/service/IDataLoaderService.java | 5 +- .../symmetric/service/IPullService.java | 23 ++-- .../symmetric/service/IPurgeService.java | 8 +- .../symmetric/service/IPushService.java | 34 ++++-- .../symmetric/service/IRouterService.java | 2 +- .../symmetric/service/IStatisticService.java | 9 +- .../impl/AbstractOfflineDetectorService.java | 14 ++- .../service/impl/DataExtractorService.java | 18 +-- .../service/impl/DataLoaderService.java | 18 ++- .../symmetric/service/impl/PullService.java | 36 +++--- .../symmetric/service/impl/PurgeService.java | 73 ++++++----- .../symmetric/service/impl/PushService.java | 107 ++++++++--------- .../service/impl/RegistrationService.java | 3 +- .../symmetric/service/impl/RouterService.java | 6 +- .../service/impl/StatisticService.java | 29 +++++ .../statistic/IStatisticManager.java | 4 +- .../symmetric/statistic/JobStats.java | 65 ++++++++++ .../symmetric/statistic/StatisticManager.java | 91 +++++++++++--- .../service/impl/statistic-service-sql.xml | 18 ++- .../resources/symmetric-default.properties | 3 + .../src/main/resources/symmetric-jobs.xml | 1 + .../src/main/resources/symmetric-schema.xml | 4 - .../src/main/resources/symmetric-services.xml | 1 + .../statistic/MockStatisticManager.java | 3 + .../stress/AbstractMultiTierStressTest.java | 2 +- .../test/AbstractIntegrationTest.java | 2 +- .../test/LoadFromClientIntegrationTest.java | 2 +- .../symmetric/test/MultiTierUnitTest.java | 2 +- .../symmetric/test/SimpleIntegrationTest.java | 4 +- .../src/test/resources/test-data-drop-all.sql | 1 + 48 files changed, 660 insertions(+), 229 deletions(-) create mode 100644 symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java create mode 100644 symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatuses.java create mode 100644 symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/JobStats.java diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index a7c4ffb1bc..bbf40a0182 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -42,6 +42,7 @@ import org.jumpmind.symmetric.job.IJobManager; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeStatus; +import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.service.IAcknowledgeService; import org.jumpmind.symmetric.service.IBandwidthService; import org.jumpmind.symmetric.service.IClusterService; @@ -312,7 +313,7 @@ public String sendSQL(String nodeId, String catalogName, String schemaName, Stri return getDataService().sendSQL(nodeId, catalogName, schemaName, tableName, sql, false); } - public boolean push() { + public RemoteNodeStatuses push() { return getPushService().pushData(); } @@ -324,7 +325,7 @@ public NodeStatus getNodeStatus() { return getNodeService().getNodeStatus(); } - public boolean pull() { + public RemoteNodeStatuses pull() { return getPullService().pullData(); } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java index cff939391a..1331e7c418 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java @@ -30,6 +30,7 @@ import org.jumpmind.symmetric.job.PushJob; import org.jumpmind.symmetric.job.RouterJob; import org.jumpmind.symmetric.model.NodeStatus; +import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.service.IAcknowledgeService; import org.jumpmind.symmetric.service.IBandwidthService; import org.jumpmind.symmetric.service.IClusterService; @@ -101,9 +102,9 @@ public interface ISymmetricEngine { * Will perform a push the same way the {@link PushJob} would have. * * @see IPushService#pushData() - * @return true if data was pushed successfully + * @return {@link RemoteNodeStatuses} */ - public boolean push(); + public RemoteNodeStatuses push(); /** * Call this to resync triggers @@ -123,8 +124,9 @@ public interface ISymmetricEngine { * Will perform a pull the same way the {@link PullJob} would have. * * @see IPullService#pullData() + * @return {@link RemoteNodeStatuses} */ - public boolean pull(); + public RemoteNodeStatuses pull(); /** * Route captured data the same way the {@link RouterJob} would have. diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index e091de35af..9b33c60026 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -152,6 +152,8 @@ private ParameterConstants() { public final static String WEB_BATCH_SERVLET_ENABLE = "web.batch.servlet.enable"; public final static String OFFLINE_NODE_DETECTION_PERIOD_MINUTES = "offline.node.detection.period.minutes"; - public final static String HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC ="heartbeat.sync.on.push.period.sec"; + public final static String HEARTBEAT_SYNC_ON_PUSH_PERIOD_SEC ="heartbeat.sync.on.push.period.sec"; + + public final static String STATISTIC_RECORD_ENABLE = "statistic.record.enable"; } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DefaultOfflineClientListener.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DefaultOfflineClientListener.java index f495585058..59d1e796cc 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DefaultOfflineClientListener.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/io/DefaultOfflineClientListener.java @@ -45,6 +45,9 @@ public void busy(Node remoteNode) { public void notAuthenticated(Node remoteNode) { log.warn("AuthenticationFailed"); + } + + public void unknownError(Node remoteNode, Exception ex) { } public void offline(Node remoteNode) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/io/IOfflineClientListener.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/io/IOfflineClientListener.java index 3fd6bfd981..4a3fcfc9bc 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/io/IOfflineClientListener.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/io/IOfflineClientListener.java @@ -56,5 +56,7 @@ public interface IOfflineClientListener extends IExtensionPoint { * @param remoteNode */ public void registrationRequired(Node remoteNode); + + public void unknownError(Node remoteNode, Exception ex); } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java index ce5bb2e55a..12dd756c7d 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/AbstractJob.java @@ -27,11 +27,13 @@ import org.apache.commons.lang.StringUtils; import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.StandaloneSymmetricEngine; +import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.logging.ILog; import org.jumpmind.symmetric.common.logging.LogFactory; import org.jumpmind.symmetric.service.IParameterService; import org.jumpmind.symmetric.service.IRegistrationService; +import org.jumpmind.symmetric.statistic.IStatisticManager; import org.jumpmind.symmetric.util.RandomTimeSlot; import org.springframework.beans.factory.BeanNameAware; import org.springframework.jmx.export.annotation.ManagedAttribute; @@ -64,7 +66,9 @@ abstract public class AbstractJob implements Runnable, BeanNameAware, IJob { private long lastExecutionTimeInMs; - private long totalExecutionTimeInMs; + private long totalExecutionTimeInMs; + + private long lastExecutionProcessCount = 0; private long numberOfRuns; @@ -82,9 +86,11 @@ abstract public class AbstractJob implements Runnable, BeanNameAware, IJob { private ScheduledFuture scheduledJob; - private RandomTimeSlot randomTimeSlot; + private RandomTimeSlot randomTimeSlot; + + private boolean autoStartConfigured; - private boolean autoStartConfigured; + private IStatisticManager statisticManager; protected void init() { this.autoStartConfigured = parameterService.is(autoStartParameterName); @@ -155,13 +161,14 @@ public boolean invoke(boolean force) { running = true; synchronized (this) { ran = true; - long ts = System.currentTimeMillis(); + long startTime = System.currentTimeMillis(); + long processCount = 0; try { if (!requiresRegistration || (requiresRegistration && registrationService .isRegisteredWithServer())) { hasNotRegisteredMessageBeenLogged = false; - doJob(); + processCount = doJob(); } else { if (!hasNotRegisteredMessageBeenLogged) { log.warn("SymmetricEngineNotRegistered", getName()); @@ -169,9 +176,14 @@ public boolean invoke(boolean force) { } } } finally { - lastFinishTime = new Date(); - lastExecutionTimeInMs = System.currentTimeMillis() - ts; - totalExecutionTimeInMs += lastExecutionTimeInMs; + lastFinishTime = new Date(); + lastExecutionProcessCount = processCount; + long endTime = System.currentTimeMillis(); + lastExecutionTimeInMs = endTime - startTime; + totalExecutionTimeInMs += lastExecutionTimeInMs; + if (lastExecutionProcessCount > 0 || lastExecutionTimeInMs > Constants.LONG_OPERATION_THRESHOLD) { + statisticManager.addJobStats(jobName, startTime, endTime, lastExecutionProcessCount); + } numberOfRuns++; running = false; } @@ -196,7 +208,7 @@ public void run() { invoke(false); } - abstract void doJob() throws Exception; + abstract long doJob() throws Exception; public void setBeanName(final String beanName) { this.jobName = beanName; @@ -245,6 +257,11 @@ public boolean isStarted() { @ManagedMetric(description = "The amount of time this job spent in execution during it's last run") public long getLastExecutionTimeInMs() { return lastExecutionTimeInMs; + } + + @ManagedMetric(description = "The count of elements this job processed during it's last run") + public long getLastExecutionProcessCount() { + return lastExecutionProcessCount; } @ManagedAttribute(description = "The last time this job completed execution") @@ -304,5 +321,9 @@ public void setAutoStartParameterName(String autoStartParameterName) { public void setRandomTimeSlot(RandomTimeSlot randomTimeSlot) { this.randomTimeSlot = randomTimeSlot; + } + + public void setStatisticManager(IStatisticManager statisticManager) { + this.statisticManager = statisticManager; } } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java index f21619f729..49e50c1a23 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/HeartbeatJob.java @@ -26,7 +26,7 @@ import org.jumpmind.symmetric.service.IDataService; /** - * + * Background job that is responsible for updating this node's heart beat time. */ public class HeartbeatJob extends AbstractJob { @@ -35,13 +35,16 @@ public class HeartbeatJob extends AbstractJob { private IClusterService clusterService; @Override - public void doJob() throws Exception { + public long doJob() throws Exception { if (clusterService.lock(ClusterConstants.HEARTBEAT)) { try { - dataService.heartbeat(false); + dataService.heartbeat(false); + return -1l; } finally { clusterService.unlock(ClusterConstants.HEARTBEAT); } + } else { + return -1l; } } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PullJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PullJob.java index e0b29b4658..40d0ee2f82 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PullJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PullJob.java @@ -17,15 +17,14 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ - - package org.jumpmind.symmetric.job; +import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.service.INodeService; import org.jumpmind.symmetric.service.IPullService; /** - * + * Background job that pulls data from remote nodes and then loads it. */ public class PullJob extends AbstractJob { @@ -34,14 +33,16 @@ public class PullJob extends AbstractJob { private INodeService nodeService; @Override - public void doJob() throws Exception { - boolean dataPulled = pullService.pullData(); + public long doJob() throws Exception { + RemoteNodeStatuses statuses = pullService.pullData(); // Re-pull immediately if we are in the middle of an initial load // so that the initial load completes as quickly as possible. - while (nodeService.isDataLoadStarted() && dataPulled) { - dataPulled = pullService.pullData(); - } + while (nodeService.isDataLoadStarted() && statuses.wasDataProcessed()) { + statuses = pullService.pullData(); + } + + return statuses.getDataProcessedCount(); } public void setPullService(IPullService service) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PurgeJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PurgeJob.java index a65ea9ae80..65948fc03d 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PurgeJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PurgeJob.java @@ -24,7 +24,7 @@ import org.jumpmind.symmetric.service.IPurgeService; /** - * + * Background job that is responsible for purging already synchronized data */ public class PurgeJob extends AbstractJob { @@ -34,8 +34,8 @@ public PurgeJob() { } @Override - public void doJob() throws Exception { - purgeService.purge(); + public long doJob() throws Exception { + return purgeService.purge(); } public void setPurgeService(IPurgeService service) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushJob.java index 04938403b0..06f80c6afb 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushJob.java @@ -24,7 +24,7 @@ import org.jumpmind.symmetric.service.IPushService; /** - * + * Background job that is responsible for pushing data to linked nodes. */ public class PushJob extends AbstractJob { @@ -38,8 +38,8 @@ public void setPushService(IPushService service) { } @Override - public void doJob() throws Exception { - pushService.pushData(); + public long doJob() throws Exception { + return pushService.pushData().getDataProcessedCount(); } } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/RouterJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/RouterJob.java index e6a0aaf9cd..d37050392f 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/RouterJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/RouterJob.java @@ -33,8 +33,8 @@ public class RouterJob extends AbstractJob { IRouterService routingService; @Override - void doJob() throws Exception { - routingService.routeData(); + long doJob() throws Exception { + return routingService.routeData(); } public void setRoutingService(IRouterService routingService) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java index 6ae68f983f..d55b923e60 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/StatisticFlushJob.java @@ -24,7 +24,7 @@ import org.jumpmind.symmetric.statistic.IStatisticManager; /** - * + * Background job that is responsible for writing statistics to database tables. */ public class StatisticFlushJob extends AbstractJob { @@ -34,8 +34,9 @@ public StatisticFlushJob() { } @Override - public void doJob() throws Exception { - statisticManager.flush(); + public long doJob() throws Exception { + statisticManager.flush(); + return -1l; } public void setStatisticManager(IStatisticManager statisticManager) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java index c2a74d7a83..f9712813cf 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/SyncTriggersJob.java @@ -24,7 +24,7 @@ import org.jumpmind.symmetric.service.ITriggerRouterService; /** - * + * Background job that checks to see if triggers need to be regenerated. */ public class SyncTriggersJob extends AbstractJob { @@ -34,8 +34,9 @@ public SyncTriggersJob() { } @Override - public void doJob() throws Exception { - triggerRouterService.syncTriggers(); + public long doJob() throws Exception { + triggerRouterService.syncTriggers(); + return -1l; } public void setTriggerRouterService(ITriggerRouterService triggerService) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java index 955e653284..da76fe0e60 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/job/WatchdogJob.java @@ -24,15 +24,17 @@ import org.jumpmind.symmetric.service.INodeService; /** - * + * Background job that is responsible for checking on node health. It will disable nodes that + * have been offline for a configurable period of time. */ public class WatchdogJob extends AbstractJob { private INodeService nodeService; @Override - public void doJob() throws Exception { - nodeService.checkForOfflineNodes(); + public long doJob() throws Exception { + nodeService.checkForOfflineNodes(); + return -1l; } public void setNodeService(INodeService nodeService) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java index a1d3bd1cef..0d8db1f38c 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/OutgoingBatch.java @@ -382,5 +382,9 @@ public Date getCreateTime() { public void setCreateTime(Date createTime) { this.createTime = createTime; } + + public long totalEventCount() { + return insertEventCount + updateEventCount + deleteEventCount + otherEventCount; + } } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java new file mode 100644 index 0000000000..e4fdb0a996 --- /dev/null +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java @@ -0,0 +1,113 @@ +/* + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU Lesser General Public License (the + * "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.model; + +import java.io.Serializable; +import java.util.List; + +/** + * Indicates the status of an attempt to transport data from or to a remove + * node. + */ +public class RemoteNodeStatus implements Serializable { + + private static final long serialVersionUID = 1L; + + public static enum Status { + OFFLINE, BUSY, NOT_AUTHORIZED, REGISTRATION_REQUIRED, SYNC_DISABLED, NO_DATA, DATA_PROCESSED, DATA_ERROR, UNKNOWN_ERROR + }; + + private String nodeId; + private Status status; + private long dataProcessed; + private long batchesProcessed; + + public RemoteNodeStatus(String nodeId) { + this.status = Status.NO_DATA; + this.nodeId = nodeId; + } + + public boolean failed() { + return status != Status.NO_DATA && status != Status.DATA_PROCESSED; + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public Status getStatus() { + return status; + } + + public void setStatus(Status status) { + this.status = status; + } + + public long getDataProcessed() { + return dataProcessed; + } + + public long getBatchesProcessed() { + return batchesProcessed; + } + + public void updateIncomingStatus(List incomingBatches) { + if (incomingBatches != null) { + for (IncomingBatch incomingBatch : incomingBatches) { + dataProcessed += incomingBatch.getStatementCount(); + batchesProcessed++; + if (incomingBatch.getStatus() == org.jumpmind.symmetric.model.IncomingBatch.Status.ER) { + status = Status.DATA_ERROR; + } + } + } + + if (status != Status.DATA_ERROR && dataProcessed > 0) { + status = Status.DATA_PROCESSED; + } + } + + public void updateOutgoingStatus(List outgoingBatches, List batches) { + if (batches != null) { + for (BatchInfo batch : batches) { + if (!batch.isOk()) { + status = Status.DATA_ERROR; + } + } + } + + if (outgoingBatches != null) { + for (OutgoingBatch batch : outgoingBatches) { + batchesProcessed++; + dataProcessed += batch.totalEventCount(); + } + } + + if (status != Status.DATA_ERROR && dataProcessed > 0) { + status = Status.DATA_PROCESSED; + } + } + +} diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatuses.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatuses.java new file mode 100644 index 0000000000..c559673f13 --- /dev/null +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatuses.java @@ -0,0 +1,53 @@ +/* + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU Lesser General Public License (the + * "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.model; + +import java.util.ArrayList; + +public class RemoteNodeStatuses extends ArrayList { + + private static final long serialVersionUID = 1L; + + public boolean wasDataProcessed() { + boolean dataProcessed = false; + for (RemoteNodeStatus status : this) { + dataProcessed |= status.getDataProcessed() > 0; + } + return dataProcessed; + } + + public long getDataProcessedCount() { + long dataProcessed = size() > 0 ? 0 : -1l; + for (RemoteNodeStatus status : this) { + dataProcessed += status.getDataProcessed(); + } + return dataProcessed; + } + + public RemoteNodeStatus add(Node node) { + RemoteNodeStatus status = null; + if (node != null) { + status = new RemoteNodeStatus(node.getNodeId()); + add(status); + } + return status; + } +} diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java index fb6d9a9d2e..3f09cc29b7 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataExtractorService.java @@ -23,10 +23,12 @@ import java.io.IOException; import java.io.OutputStream; import java.io.Writer; +import java.util.List; import org.jumpmind.symmetric.extract.DataExtractorContext; import org.jumpmind.symmetric.extract.IExtractorFilter; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.TriggerHistory; import org.jumpmind.symmetric.model.TriggerRouter; import org.jumpmind.symmetric.transport.IOutgoingTransport; @@ -46,9 +48,9 @@ public void extractInitialLoadWithinBatchFor(Node node, TriggerRouter trigger, W DataExtractorContext ctx, TriggerHistory triggerHistory); /** - * @return true if work was done or false if there was no work to do. + * @return a list of batches that were extracted */ - public boolean extract(Node node, IOutgoingTransport transport) throws IOException; + public List extract(Node node, IOutgoingTransport transport) throws IOException; public boolean extractBatchRange(IOutgoingTransport transport, String startBatchId, String endBatchId) throws IOException; diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java index f0d04dcbb0..877849d717 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataLoaderService.java @@ -34,6 +34,7 @@ import org.jumpmind.symmetric.load.IDataLoaderStatistics; import org.jumpmind.symmetric.load.csv.CsvLoader; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.transport.ITransportManager; /** @@ -41,8 +42,10 @@ * database from a transport */ public interface IDataLoaderService { + + public RemoteNodeStatus loadDataFromPull(Node remote) throws IOException; - public boolean loadDataFromPull(Node remote) throws IOException; + public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOException; public void loadData(InputStream in, OutputStream out) throws IOException; diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java index 05382cc9ee..7329e86ac3 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPullService.java @@ -16,14 +16,21 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. */ - - -package org.jumpmind.symmetric.service; - + * under the License. + */ + +package org.jumpmind.symmetric.service; + +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.NodeGroupLinkAction; +import org.jumpmind.symmetric.model.RemoteNodeStatuses; + /** - * + * Service API that is responsible for pulling data from the list of configured + * {@link Node}s that are configured to {@link NodeGroupLinkAction#W} */ -public interface IPullService { - public boolean pullData(); +public interface IPullService { + + public RemoteNodeStatuses pullData(); + } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java index 5e724b7101..a5fd9cd0ac 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPurgeService.java @@ -32,13 +32,13 @@ */ public interface IPurgeService { - public void purge(); + public long purge(); - public void purgeDataGaps(Calendar retentionCutoff); + public long purgeDataGaps(Calendar retentionCutoff); - public void purgeOutgoing(Calendar retentionCutoff); + public long purgeOutgoing(Calendar retentionCutoff); - public void purgeIncoming(Calendar retentionCutoff); + public long purgeIncoming(Calendar retentionCutoff); @Transactional public void purgeAllIncomingEventsForNode(String nodeId); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPushService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPushService.java index d970494d60..2bfb0ec9b6 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPushService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IPushService.java @@ -16,19 +16,27 @@ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations - * under the License. */ - - -package org.jumpmind.symmetric.service; - + * under the License. + */ + +package org.jumpmind.symmetric.service; + +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.NodeGroupLinkAction; +import org.jumpmind.symmetric.model.RemoteNodeStatuses; + /** - * + * Service API that is responsible for pushing data to the list of configured + * {@link Node}s that are configured to {@link NodeGroupLinkAction#P} */ -public interface IPushService { - - /** - * @return true if data was pushed successfully - */ - public boolean pushData(); - +public interface IPushService { + + /** + * Attempt to push data, if any has been captured, to nodes that the + * captured data is targeted for. + * + * @return RemoteNodeStatuses the status of the push attempt(s) + */ + public RemoteNodeStatuses pushData(); + } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IRouterService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IRouterService.java index 77a09298b7..68462a9f0d 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IRouterService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IRouterService.java @@ -39,7 +39,7 @@ */ public interface IRouterService extends ISqlProvider { - public void routeData(); + public long routeData(); public long getUnroutedDataCount(); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IStatisticService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IStatisticService.java index ce63ada13c..a4ba31d18b 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IStatisticService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IStatisticService.java @@ -21,11 +21,13 @@ package org.jumpmind.symmetric.service; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.TreeMap; import org.jumpmind.symmetric.statistic.ChannelStats; import org.jumpmind.symmetric.statistic.HostStats; +import org.jumpmind.symmetric.statistic.JobStats; /** @@ -37,8 +39,13 @@ public interface IStatisticService { public void save(HostStats stats); + public void save(JobStats stats); + public TreeMap> getChannelStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes); public TreeMap getHostStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes); - + + public List getJobStatsForPeriod(Date start, Date end, String nodeId); + + } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractOfflineDetectorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractOfflineDetectorService.java index 17d746bf20..d3eb70a567 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractOfflineDetectorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractOfflineDetectorService.java @@ -30,6 +30,8 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.jumpmind.symmetric.io.IOfflineClientListener; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.RemoteNodeStatus; +import org.jumpmind.symmetric.model.RemoteNodeStatus.Status; import org.jumpmind.symmetric.service.IOfflineDetectorService; import org.jumpmind.symmetric.service.RegistrationRequiredException; import org.jumpmind.symmetric.transport.AuthenticationException; @@ -37,7 +39,7 @@ import org.jumpmind.symmetric.transport.SyncDisabledException; /** - * + * Abstract service that provides help methods for detecting offline status. */ public abstract class AbstractOfflineDetectorService extends AbstractService implements IOfflineDetectorService { @@ -62,19 +64,27 @@ public boolean removeOfflineListener(IOfflineClientListener listener) { } } - protected void fireOffline(Exception error, Node remoteNode) { + protected void fireOffline(Exception error, Node remoteNode, RemoteNodeStatus status) { if (offlineListeners != null) { for (IOfflineClientListener listener : offlineListeners) { if (isOffline(error)) { + status.setStatus(Status.OFFLINE); listener.offline(remoteNode); } else if (isBusy(error)) { + status.setStatus(Status.BUSY); listener.busy(remoteNode); } else if (isNotAuthenticated(error)) { + status.setStatus(Status.NOT_AUTHORIZED); listener.notAuthenticated(remoteNode); } else if (isSyncDisabled(error)) { + status.setStatus(Status.SYNC_DISABLED); listener.syncDisabled(remoteNode); } else if (isRegistrationRequired(error)) { + status.setStatus(Status.REGISTRATION_REQUIRED); listener.registrationRequired(remoteNode); + } else { + status.setStatus(Status.UNKNOWN_ERROR); + listener.unknownError(remoteNode, error); } } } diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 035f60376b..199976e0e9 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -390,7 +390,10 @@ private List filterBatchesForExtraction(OutgoingBatches batches, return batches.getBatches(); } - public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOException { + public List extract(Node node, IOutgoingTransport targetTransport) throws IOException { + + List activeBatches = null; + IDataExtractor dataExtractor = getDataExtractor(node.getSymmetricVersion()); if (!parameterService.is(ParameterConstants.START_ROUTE_JOB)) { @@ -401,7 +404,7 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE if (batches.containsBatches()) { - List activeBatches = filterBatchesForExtraction(batches, targetTransport + activeBatches = filterBatchesForExtraction(batches, targetTransport .getSuspendIgnoreChannelLists(configurationService)); if (activeBatches.size() > 0) { @@ -497,13 +500,10 @@ public boolean extract(Node node, IOutgoingTransport targetTransport) throws IOE configurationService.saveNodeChannelControl(nodeChannel, false); } - return true; - } else { - return false; - } - } else { - return false; - } + } + } + + return activeBatches != null ? activeBatches : new ArrayList(0); } protected void networkTransfer(BufferedReader reader, BufferedWriter writer) throws IOException { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 84b05b424b..9307b8e60b 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -55,6 +55,7 @@ import org.jumpmind.symmetric.model.IncomingBatch.Status; import org.jumpmind.symmetric.model.Node; import org.jumpmind.symmetric.model.NodeSecurity; +import org.jumpmind.symmetric.model.RemoteNodeStatus; import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataLoaderService; import org.jumpmind.symmetric.service.IIncomingBatchService; @@ -110,9 +111,15 @@ public DataLoaderService() { * Connect to the remote node and pull data. The acknowledgment of * commit/error status is sent separately after the data is processed. */ - public boolean loadDataFromPull(Node remote) + public RemoteNodeStatus loadDataFromPull(Node remote) + throws IOException { + RemoteNodeStatus status = new RemoteNodeStatus(remote != null ? remote.getNodeId() : null); + loadDataFromPull(remote, status); + return status; + } + + public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOException { - boolean wasWorkDone = false; try { Node local = nodeService.findIdentity(); if (local == null) { @@ -139,6 +146,7 @@ public boolean loadDataFromPull(Node remote) List list = loadDataAndReturnBatches(transport); if (list.size() > 0) { + status.updateIncomingStatus(list); local = nodeService.findIdentity(); localSecurity = nodeService.findNodeSecurity(local.getNodeId()); if (StringUtils.isNotBlank(transport.getRedirectionUrl())) { @@ -148,18 +156,16 @@ public boolean loadDataFromPull(Node remote) remote.setSyncUrl(url); } sendAck(remote, local, localSecurity, list); - wasWorkDone = true; } } catch (RegistrationRequiredException e) { log.warn("RegistrationLost"); - loadDataFromPull(null); + loadDataFromPull(null, status); nodeService.findIdentity(false); - wasWorkDone = true; } catch (MalformedURLException e) { log.error("URLConnectingFailure", remote.getNodeId(), remote.getSyncUrl()); + throw e; } - return wasWorkDone; } /** diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java index 38492ad04b..9128202280 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java @@ -27,6 +27,8 @@ import java.util.List; import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.RemoteNodeStatus; +import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.service.ClusterConstants; import org.jumpmind.symmetric.service.IClusterService; import org.jumpmind.symmetric.service.IDataLoaderService; @@ -51,8 +53,8 @@ public class PullService extends AbstractOfflineDetectorService implements IPull private IClusterService clusterService; - synchronized public boolean pullData() { - boolean dataPulled = false; + synchronized public RemoteNodeStatuses pullData() { + RemoteNodeStatuses statuses = new RemoteNodeStatuses(); if (clusterService.lock(ClusterConstants.PULL)) { try { // register if we haven't already been registered @@ -60,40 +62,38 @@ synchronized public boolean pullData() { List nodes = nodeService.findNodesToPull(); if (nodes != null && nodes.size() > 0) { - for (Node node : nodes) { + for (Node node : nodes) { + RemoteNodeStatus status = statuses.add(node); String nodeName = " for " + node; try { - log.debug("DataPulling", nodeName); - if (dataLoaderService.loadDataFromPull(node)) { + log.debug("DataPulling", nodeName); + dataLoaderService.loadDataFromPull(node, status); + if (status.getDataProcessed() > 0) { log.info("DataPulled", nodeName); - dataPulled = true; - } else { - log.debug("DataPullingFailed", nodeName); - - } + } } catch (ConnectException ex) { log.warn("TransportFailedConnectionUnavailable", (node.getSyncUrl() == null ? parameterService.getRegistrationUrl() : node .getSyncUrl())); - fireOffline(ex, node); + fireOffline(ex, node, status); } catch (ConnectionRejectedException ex) { log.warn("TransportFailedConnectionBusy"); - fireOffline(ex, node); + fireOffline(ex, node, status); } catch (AuthenticationException ex) { log.warn("AuthenticationFailed"); - fireOffline(ex, node); + fireOffline(ex, node, status); } catch (SyncDisabledException ex) { log.warn("SyncDisabled"); - fireOffline(ex, node); + fireOffline(ex, node, status); } catch (SocketException ex) { log.warn("Message", ex.getMessage()); - fireOffline(ex, node); + fireOffline(ex, node, status); } catch (TransportException ex) { log.warn("Message", ex.getMessage()); - fireOffline(ex, node); + fireOffline(ex, node, status); } catch (IOException ex) { log.error(ex); - fireOffline(ex, node); + fireOffline(ex, node, status); } } } @@ -106,7 +106,7 @@ synchronized public boolean pullData() { log.info("DataPullingFailedLock"); } - return dataPulled; + return statuses; } public void setNodeService(INodeService nodeService) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java index a2d0797cd3..caf61f609e 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java @@ -60,31 +60,34 @@ public class PurgeService extends AbstractService implements IPurgeService { private IStatisticManager statisticManager; - public void purge() { + public long purge() { + long rowsPurged = 0; if (nodeService.isRegistrationServer() || nodeService.getNodeStatus() == NodeStatus.DATA_LOAD_COMPLETED) { Calendar retentionCutoff = Calendar.getInstance(); retentionCutoff.add(Calendar.MINUTE, -parameterService.getInt(ParameterConstants.PURGE_RETENTION_MINUTES)); - purgeOutgoing(retentionCutoff); - purgeIncoming(retentionCutoff); + rowsPurged += purgeOutgoing(retentionCutoff); + rowsPurged += purgeIncoming(retentionCutoff); retentionCutoff = Calendar.getInstance(); retentionCutoff.add(Calendar.MINUTE, -parameterService .getInt(ParameterConstants.ROUTING_DATA_READER_TYPE_GAP_RETENTION_MINUTES)); - purgeDataGaps(retentionCutoff); + rowsPurged += purgeDataGaps(retentionCutoff); } else { log.warn("DataPurgeSkippingNoInitialLoad"); - } + } + return rowsPurged; } - public void purgeDataGaps(Calendar retentionCutoff) { + public long purgeDataGaps(Calendar retentionCutoff) { + long rowsPurged = -1l; try { if (clusterService.lock(ClusterConstants.PURGE_DATA_GAPS)) { try { log.info("DataPurgeDataGapsRunning"); - int count = jdbcTemplate.update(getSql("deleteFromDataGapsSql"), new Object[] { retentionCutoff + rowsPurged = jdbcTemplate.update(getSql("deleteFromDataGapsSql"), new Object[] { retentionCutoff .getTime() }); - log.info("DataPurgeDataGapsRun", count); + log.info("DataPurgeDataGapsRun", rowsPurged); } finally { clusterService.unlock(ClusterConstants.PURGE_DATA_GAPS); log.info("DataPurgeDataGapsCompleted"); @@ -95,18 +98,20 @@ public void purgeDataGaps(Calendar retentionCutoff) { } } catch (Exception ex) { log.error(ex); - } + } + return rowsPurged; } - public void purgeOutgoing(Calendar retentionCutoff) { + public long purgeOutgoing(Calendar retentionCutoff) { + long rowsPurged = 0; try { if (clusterService.lock(ClusterConstants.PURGE_OUTGOING)) { try { log.info("DataPurgeOutgoingRunning", SimpleDateFormat.getDateTimeInstance().format( retentionCutoff.getTime())); - purgeStrandedBatches(); - purgeDataRows(retentionCutoff); - purgeOutgoingBatch(retentionCutoff); + rowsPurged += purgeStrandedBatches(); + rowsPurged += purgeDataRows(retentionCutoff); + rowsPurged += purgeOutgoingBatch(retentionCutoff); } finally { clusterService.unlock(ClusterConstants.PURGE_OUTGOING); log.info("DataPurgeOutgoingCompleted"); @@ -116,10 +121,11 @@ public void purgeOutgoing(Calendar retentionCutoff) { } } catch (Exception ex) { log.error(ex); - } + } + return rowsPurged; } - private void purgeOutgoingBatch(final Calendar time) { + private long purgeOutgoingBatch(final Calendar time) { log.info("DataPurgeOutgoingRange"); long[] minMax = queryForMinMax(getSql("selectOutgoingBatchRangeSql"), new Object[] { time.getTime() }); int maxNumOfBatchIdsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_BATCH_IDS); @@ -129,35 +135,39 @@ private void purgeOutgoingBatch(final Calendar time) { statisticManager.incrementPurgedDataEventRows(dataEventsPurgedCount); int outgoingbatchPurgedCount = purgeByMinMax(minMax, getSql("deleteOutgoingBatchSql"), time.getTime(), maxNumOfBatchIdsToPurgeInTx); statisticManager.incrementPurgedBatchOutgoingRows(outgoingbatchPurgedCount); - purgeUnroutedDataEvents(time.getTime()); + long unroutedPurgedCount = purgeUnroutedDataEvents(time.getTime()); + return dataEventsPurgedCount + outgoingbatchPurgedCount + unroutedPurgedCount; } - private void purgeStrandedBatches() { + private long purgeStrandedBatches() { int updateStrandedBatchesCount = getSimpleTemplate().update(getSql("updateStrandedBatches")); if (updateStrandedBatchesCount > 0) { log.info("DataPurgeUpdatedStrandedBatches", updateStrandedBatchesCount); statisticManager.incrementPurgedBatchOutgoingRows(updateStrandedBatchesCount); - } + } + return updateStrandedBatchesCount; } - private void purgeUnroutedDataEvents(Date time) { + private long purgeUnroutedDataEvents(Date time) { Map params = new HashMap(); params.put(PARAM_CUTOFF_TIME, new Timestamp(time.getTime())); int unroutedDataEventCount = getSimpleTemplate().update(getSql("deleteUnroutedDataEventSql"), params); if (unroutedDataEventCount > 0) { statisticManager.incrementPurgedDataEventRows(unroutedDataEventCount); log.info("DataPurgeTableCompleted", unroutedDataEventCount, "unrouted data_event"); - } + } + return unroutedDataEventCount; } - private void purgeDataRows(final Calendar time) { + private long purgeDataRows(final Calendar time) { log.info("DataPurgeRowsRange"); long[] minMax = queryForMinMax(getSql("selectDataRangeSql"), new Object[0]); int maxNumOfDataIdsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS); - int dataPurgedCount = purgeByMinMax(minMax, getSql("deleteDataSql"), time.getTime(), maxNumOfDataIdsToPurgeInTx); - statisticManager.incrementPurgedDataRows(dataPurgedCount); - dataPurgedCount = purgeByMinMax(minMax, getSql("deleteStrandedData"), time.getTime(), maxNumOfDataIdsToPurgeInTx); - statisticManager.incrementPurgedDataRows(dataPurgedCount); + int dataDeletedCount = purgeByMinMax(minMax, getSql("deleteDataSql"), time.getTime(), maxNumOfDataIdsToPurgeInTx); + statisticManager.incrementPurgedDataRows(dataDeletedCount); + int strandedDeletedCount = purgeByMinMax(minMax, getSql("deleteStrandedData"), time.getTime(), maxNumOfDataIdsToPurgeInTx); + statisticManager.incrementPurgedDataRows(strandedDeletedCount); + return dataDeletedCount + strandedDeletedCount; } @@ -207,12 +217,13 @@ private int purgeByMinMax(long[] minMax, String deleteSql, Date retentionTime, i return totalCount; } - public void purgeIncoming(Calendar retentionCutoff) { + public long purgeIncoming(Calendar retentionCutoff) { + long purgedRowCount = 0; try { if (clusterService.lock(ClusterConstants.PURGE_INCOMING)) { try { log.info("DataPurgeIncomingRunning"); - purgeIncomingBatch(retentionCutoff); + purgedRowCount = purgeIncomingBatch(retentionCutoff); } finally { clusterService.unlock(ClusterConstants.PURGE_INCOMING); log.info("DataPurgeIncomingCompleted"); @@ -222,10 +233,11 @@ public void purgeIncoming(Calendar retentionCutoff) { } } catch (Exception ex) { log.error(ex); - } + } + return purgedRowCount; } - private void purgeIncomingBatch(final Calendar time) { + private long purgeIncomingBatch(final Calendar time) { log.info("DataPurgeIncomingRange"); List nodeBatchRangeList = jdbcTemplate.query(getSql("selectIncomingBatchRangeSql"), new Object[] { time.getTime() }, new RowMapper() { @@ -234,7 +246,8 @@ public NodeBatchRange mapRow(ResultSet rs, int rowNum) throws SQLException { } }); int incomingBatchesPurgedCount = purgeByNodeBatchRangeList(getSql("deleteIncomingBatchSql"), nodeBatchRangeList); - statisticManager.incrementPurgedBatchIncomingRows(incomingBatchesPurgedCount); + statisticManager.incrementPurgedBatchIncomingRows(incomingBatchesPurgedCount); + return incomingBatchesPurgedCount; } private int purgeByNodeBatchRangeList(String deleteSql, List nodeBatchRangeList) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java index cb2efcadd9..5124af3029 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java @@ -21,31 +21,34 @@ package org.jumpmind.symmetric.service.impl; -import java.io.BufferedReader; -import java.net.ConnectException; -import java.net.SocketException; -import java.util.List; - -import org.apache.commons.lang.StringUtils; -import org.jumpmind.symmetric.model.BatchInfo; -import org.jumpmind.symmetric.model.Node; -import org.jumpmind.symmetric.model.NodeSecurity; -import org.jumpmind.symmetric.service.ClusterConstants; -import org.jumpmind.symmetric.service.IAcknowledgeService; -import org.jumpmind.symmetric.service.IClusterService; -import org.jumpmind.symmetric.service.IDataExtractorService; -import org.jumpmind.symmetric.service.INodeService; -import org.jumpmind.symmetric.service.IPushService; -import org.jumpmind.symmetric.service.RegistrationRequiredException; -import org.jumpmind.symmetric.transport.AuthenticationException; -import org.jumpmind.symmetric.transport.ConnectionRejectedException; -import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport; -import org.jumpmind.symmetric.transport.ITransportManager; -import org.jumpmind.symmetric.transport.SyncDisabledException; -import org.jumpmind.symmetric.transport.TransportException; +import java.io.BufferedReader; +import java.net.ConnectException; +import java.net.SocketException; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.jumpmind.symmetric.model.BatchInfo; +import org.jumpmind.symmetric.model.Node; +import org.jumpmind.symmetric.model.NodeSecurity; +import org.jumpmind.symmetric.model.OutgoingBatch; +import org.jumpmind.symmetric.model.RemoteNodeStatus; +import org.jumpmind.symmetric.model.RemoteNodeStatuses; +import org.jumpmind.symmetric.service.ClusterConstants; +import org.jumpmind.symmetric.service.IAcknowledgeService; +import org.jumpmind.symmetric.service.IClusterService; +import org.jumpmind.symmetric.service.IDataExtractorService; +import org.jumpmind.symmetric.service.INodeService; +import org.jumpmind.symmetric.service.IPushService; +import org.jumpmind.symmetric.service.RegistrationRequiredException; +import org.jumpmind.symmetric.transport.AuthenticationException; +import org.jumpmind.symmetric.transport.ConnectionRejectedException; +import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport; +import org.jumpmind.symmetric.transport.ITransportManager; +import org.jumpmind.symmetric.transport.SyncDisabledException; +import org.jumpmind.symmetric.transport.TransportException; /** - * + * @see IPushService */ public class PushService extends AbstractOfflineDetectorService implements IPushService { @@ -59,13 +62,9 @@ public class PushService extends AbstractOfflineDetectorService implements IPush private IClusterService clusterService; - private static enum PushStatus { - PUSHED, ERROR, NOTHING_TO_PUSH - }; + synchronized public RemoteNodeStatuses pushData() { + RemoteNodeStatuses statuses = new RemoteNodeStatuses(); - synchronized public boolean pushData() { - boolean pushedData = false; - boolean inError = false; Node identity = nodeService.findIdentity(); if (identity != null) { if (clusterService.lock(ClusterConstants.PUSH)) { @@ -77,12 +76,11 @@ synchronized public boolean pushData() { if (identitySecurity != null) { for (Node node : nodes) { log.debug("DataPushing", node); - PushStatus status = pushToNode(node, identity, identitySecurity); - if (status == PushStatus.PUSHED) { - pushedData = true; + RemoteNodeStatus status = pushToNode(node, identity, identitySecurity); + statuses.add(status); + if (status.getBatchesProcessed() > 0) { log.info("DataPushed", node); - } else if (status == PushStatus.ERROR) { - inError = true; + } else if (status.failed()) { log.warn("DataPushingFailed"); } log.debug("DataPushingCompleted", node); @@ -98,17 +96,18 @@ synchronized public boolean pushData() { log.info("DataPushingFailedLock"); } } - return pushedData && !inError; + return statuses; } - private PushStatus pushToNode(Node remote, Node identity, NodeSecurity identitySecurity) { - PushStatus status = PushStatus.ERROR; + private RemoteNodeStatus pushToNode(Node remote, Node identity, NodeSecurity identitySecurity) { + RemoteNodeStatus status = new RemoteNodeStatus(remote.getNodeId()); IOutgoingWithResponseTransport transport = null; try { transport = transportManager.getPushTransport(remote, identity, identitySecurity .getNodePassword(), parameterService.getRegistrationUrl()); - - if (extractor.extract(remote, transport)) { + + List extractedBatches = extractor.extract(remote, transport); + if (extractedBatches.size() > 0) { log.info("DataSent", remote); BufferedReader reader = transport.readResponse(); String ackString = reader.readLine(); @@ -124,48 +123,42 @@ private PushStatus pushToNode(Node remote, Node identity, NodeSecurity identityS List batches = transportManager.readAcknowledgement(ackString, ackExtendedString); - status = PushStatus.PUSHED; - for (BatchInfo batchInfo : batches) { log.debug("DataAckSaving", batchInfo.getBatchId(), (batchInfo.isOk() ? "OK" : "error")); - if (!batchInfo.isOk()) { - status = PushStatus.ERROR; - } ackService.ack(batchInfo); - - } - } else { - status = PushStatus.NOTHING_TO_PUSH; - } + } + + status.updateOutgoingStatus(extractedBatches, batches); + } } catch (ConnectException ex) { log.warn("TransportFailedConnectionUnavailable", (remote.getSyncUrl() == null ? parameterService.getRegistrationUrl() : remote .getSyncUrl())); - fireOffline(ex, remote); + fireOffline(ex, remote, status); } catch (ConnectionRejectedException ex) { log.warn("TransportFailedConnectionBusy"); - fireOffline(ex, remote); + fireOffline(ex, remote, status); } catch (SocketException ex) { log.warn("Message", ex.getMessage()); - fireOffline(ex, remote); + fireOffline(ex, remote, status); } catch (TransportException ex) { log.warn("Message", ex.getMessage()); - fireOffline(ex, remote); + fireOffline(ex, remote, status); } catch (AuthenticationException ex) { log.warn("AuthenticationFailed"); - fireOffline(ex, remote); + fireOffline(ex, remote, status); } catch (SyncDisabledException ex) { log.warn("SyncDisabled"); - fireOffline(ex, remote); + fireOffline(ex, remote, status); } catch (RegistrationRequiredException ex) { log.warn("RegistrationRequired"); - fireOffline(ex, remote); + fireOffline(ex, remote, status); } catch (Exception ex) { // just report the error because we want to push to other nodes // in our list log.error(ex); - fireOffline(ex, remote); + fireOffline(ex, remote, status); } finally { try { transport.close(); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java index f93fc753c9..e03f6718fb 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RegistrationService.java @@ -41,6 +41,7 @@ import org.jumpmind.symmetric.model.NodeSecurity; import org.jumpmind.symmetric.model.RegistrationRequest; import org.jumpmind.symmetric.model.RegistrationRequest.RegistrationStatus; +import org.jumpmind.symmetric.model.RemoteNodeStatus.Status; import org.jumpmind.symmetric.security.INodePasswordFilter; import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.IDataLoaderService; @@ -267,7 +268,7 @@ public void registerWithServer() { boolean errorOccurred = false; try { log.info("NodeRegisterting", parameterService.getRegistrationUrl()); - registered = dataLoaderService.loadDataFromPull(null); + registered = dataLoaderService.loadDataFromPull(null).getStatus() == Status.DATA_PROCESSED; } catch (ConnectException e) { log.warn("NodeRegistertingFailedConnection"); } catch (UnknownHostException e) { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index e304e2de9a..de4f0a4b33 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -131,14 +131,15 @@ public synchronized void destroy() { /** * This method will route data to specific nodes. */ - synchronized public void routeData() { + synchronized public long routeData() { + long dataCount = -1l; if (clusterService.lock(ClusterConstants.ROUTE)) { try { insertInitialLoadEvents(); long ts = System.currentTimeMillis(); IDataToRouteGapDetector gapDetector = dataToRouteReaderFactory.getDataToRouteGapDetector(); gapDetector.beforeRouting(); - int dataCount = routeDataForEachChannel(); + dataCount = routeDataForEachChannel(); gapDetector.afterRouting(); ts = System.currentTimeMillis() - ts; if (dataCount > 0 || ts > Constants.LONG_OPERATION_THRESHOLD) { @@ -148,6 +149,7 @@ synchronized public void routeData() { clusterService.unlock(ClusterConstants.ROUTE); } } + return dataCount; } protected void insertInitialLoadEvents() { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java index 5ffd42f6bb..6272cb13ac 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/StatisticService.java @@ -34,6 +34,7 @@ import org.jumpmind.symmetric.statistic.ChannelStatsByPeriodMap; import org.jumpmind.symmetric.statistic.HostStats; import org.jumpmind.symmetric.statistic.HostStatsByPeriodMap; +import org.jumpmind.symmetric.statistic.JobStats; import org.springframework.jdbc.core.RowMapper; /** @@ -56,6 +57,21 @@ public void save(ChannelStats stats) { Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT, Types.BIGINT }); } + + public void save(JobStats stats) { + jdbcTemplate.update( + getSql("insertJobStatsSql"), + new Object[] { stats.getNodeId(), stats.getHostName(), stats.getJobName(), + stats.getStartTime(), stats.getEndTime(), stats.getProcessedCount() }, new int[] { + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, + Types.TIMESTAMP, Types.BIGINT }); + } + + public List getJobStatsForPeriod(Date start, Date end, + String nodeId) { + return jdbcTemplate.query(getSql("selectChannelStatsSql"), + new JobStatsMapper(), start, end, nodeId); + } public TreeMap> getChannelStatsForPeriod(Date start, Date end, String nodeId, int periodSizeInMinutes) { @@ -98,6 +114,19 @@ public Date truncateToMinutes(Date date) { cal.set(Calendar.SECOND, 0); return cal.getTime(); } + + class JobStatsMapper implements RowMapper { + public JobStats mapRow(ResultSet rs, int rowNum) throws SQLException { + JobStats stats = new JobStats(); + stats.setNodeId(rs.getString(1)); + stats.setHostName(rs.getString(2)); + stats.setJobName(rs.getString(3)); + stats.setStartTime(truncateToMinutes(rs.getTimestamp(4))); + stats.setEndTime(truncateToMinutes(rs.getTimestamp(5))); + stats.setProcessedCount(rs.getLong(6)); + return stats; + } + } class ChannelStatsMapper implements RowMapper { public ChannelStats mapRow(ResultSet rs, int rowNum) throws SQLException { diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java index d2a2da7397..f8de5b6712 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/IStatisticManager.java @@ -27,7 +27,9 @@ * This manager provides an API record statistics */ public interface IStatisticManager { - public void flush(); + public void flush(); + + public void addJobStats(String jobName, long startTime, long endTime, long processedCount); public void incrementDataLoadedErrors(String channelId, long count); diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/JobStats.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/JobStats.java new file mode 100644 index 0000000000..69c7cb2356 --- /dev/null +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/JobStats.java @@ -0,0 +1,65 @@ +/* + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU Lesser General Public License (the + * "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.statistic; + +import java.util.Date; + +public class JobStats extends AbstractNodeHostStats { + + private String jobName; + private long processedCount; + + public JobStats() { + } + + public JobStats(String nodeId, String hostName, Date startTime, Date endTime, String jobName) { + super(nodeId, hostName, startTime, endTime); + this.jobName = jobName; + } + + public JobStats(String nodeId, String hostName, Date startTime, Date endTime, String jobName, + long processedCount) { + this(nodeId, hostName, startTime, endTime, jobName); + this.processedCount = processedCount; + } + + public JobStats(String jobName, long startTime, long endTime, long processedCount) { + this(null, null, new Date(startTime), new Date(endTime), jobName); + this.processedCount = processedCount; + } + + public String getJobName() { + return jobName; + } + + public void setJobName(String channelId) { + this.jobName = channelId; + } + + public long getProcessedCount() { + return processedCount; + } + + public void setProcessedCount(long processedCount) { + this.processedCount = processedCount; + } + +} \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java index 2fa89ba9ce..8deb045c4c 100644 --- a/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java +++ b/symmetric/symmetric-core/src/main/java/org/jumpmind/symmetric/statistic/StatisticManager.java @@ -20,12 +20,14 @@ */ package org.jumpmind.symmetric.statistic; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; +import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.logging.ILog; import org.jumpmind.symmetric.common.logging.LogFactory; import org.jumpmind.symmetric.model.Node; @@ -48,6 +50,8 @@ public class StatisticManager implements IStatisticManager { private Map channelStats = new HashMap(); + private List jobStats = new ArrayList(); + private HostStats hostStats; protected INodeService nodeService; @@ -66,6 +70,10 @@ public class StatisticManager implements IStatisticManager { Semaphore hostStatsLock = new Semaphore(NUMBER_OF_PERMITS, true); + Semaphore jobStatsLock = new Semaphore(NUMBER_OF_PERMITS, true); + + boolean recordStatistics = true; + public StatisticManager() { } @@ -73,6 +81,16 @@ protected void init() { incrementRestart(); } + public void addJobStats(String jobName, long startTime, long endTime, long processedCount) { + jobStatsLock.acquireUninterruptibly(); + try { + JobStats stats = new JobStats(jobName, startTime, endTime, processedCount); + jobStats.add(stats); + } finally { + jobStatsLock.release(); + } + } + public void incrementDataRouted(String channelId, long count) { channelStatsLock.acquireUninterruptibly(); try { @@ -83,7 +101,12 @@ public void incrementDataRouted(String channelId, long count) { } public void setDataUnRouted(String channelId, long count) { - getChannelStats(channelId).setDataUnRouted(count); + channelStatsLock.acquireUninterruptibly(); + try { + getChannelStats(channelId).setDataUnRouted(count); + } finally { + channelStatsLock.release(); + } } public void incrementDataExtracted(String channelId, long count) { @@ -321,21 +344,24 @@ public void incrementTriggersCreatedCount(long count) { } public void flush() { + recordStatistics = parameterService.is(ParameterConstants.STATISTIC_RECORD_ENABLE); if (channelStats != null) { channelStatsLock.acquireUninterruptibly(NUMBER_OF_PERMITS); try { - Date endTime = new Date(); - for (ChannelStats stats : channelStats.values()) { - if (stats.getNodeId().equals(UNKNOWN)) { - Node node = nodeService.getCachedIdentity(); - if (node != null) { - stats.setNodeId(node.getNodeId()); + if (recordStatistics) { + Date endTime = new Date(); + for (ChannelStats stats : channelStats.values()) { + if (stats.getNodeId().equals(UNKNOWN)) { + Node node = nodeService.getCachedIdentity(); + if (node != null) { + stats.setNodeId(node.getNodeId()); + } } + stats.setEndTime(endTime); + statisticService.save(stats); } - stats.setEndTime(endTime); - statisticService.save(stats); - resetChannelStats(true); } + resetChannelStats(true); } finally { channelStatsLock.release(NUMBER_OF_PERMITS); } @@ -344,19 +370,45 @@ public void flush() { if (hostStats != null) { hostStatsLock.acquireUninterruptibly(NUMBER_OF_PERMITS); try { - if (hostStats.getNodeId().equals(UNKNOWN)) { - Node node = nodeService.getCachedIdentity(); - if (node != null) { - hostStats.setNodeId(node.getNodeId()); + if (recordStatistics) { + if (hostStats.getNodeId().equals(UNKNOWN)) { + Node node = nodeService.getCachedIdentity(); + if (node != null) { + hostStats.setNodeId(node.getNodeId()); + } } + hostStats.setEndTime(new Date()); + statisticService.save(hostStats); } - hostStats.setEndTime(new Date()); - statisticService.save(hostStats); hostStats = null; } finally { hostStatsLock.release(NUMBER_OF_PERMITS); } } + + if (jobStats != null) { + List toFlush = null; + jobStatsLock.acquireUninterruptibly(NUMBER_OF_PERMITS); + try { + toFlush = jobStats; + jobStats = new ArrayList(); + } finally { + jobStatsLock.release(NUMBER_OF_PERMITS); + } + + if (toFlush != null && recordStatistics) { + Node node = nodeService.getCachedIdentity(); + if (node != null) { + String nodeId = node.getNodeId(); + String serverId = AppUtils.getServerId(); + for (JobStats stats : toFlush) { + stats.setNodeId(nodeId); + stats.setHostName(serverId); + statisticService.save(stats); + } + } + } + } } public Map getWorkingChannelStats() { @@ -399,7 +451,6 @@ protected ChannelStats getChannelStats(String channelId) { null, channelId); channelStats.put(channelId, stats); } else { - log.warn("StatisticNodeNotAvailableWarning"); stats = new ChannelStats(UNKNOWN, AppUtils.getServerId(), new Date(), null, channelId); } @@ -409,14 +460,12 @@ protected ChannelStats getChannelStats(String channelId) { } protected HostStats getHostStats() { - resetChannelStats(false); if (hostStats == null) { Node node = nodeService.getCachedIdentity(); if (node != null) { hostStats = new HostStats(node.getNodeId(), AppUtils.getServerId(), new Date(), null); } else { - log.warn("StatisticNodeNotAvailableWarning"); hostStats = new HostStats(UNKNOWN, AppUtils.getServerId(), new Date(), null); } @@ -444,4 +493,8 @@ public void setConfigurationService(IConfigurationService configurationService) this.configurationService = configurationService; } + public void setRecordStatistics(boolean recordStatistics) { + this.recordStatistics = recordStatistics; + } + } \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/resources/org/jumpmind/symmetric/service/impl/statistic-service-sql.xml b/symmetric/symmetric-core/src/main/resources/org/jumpmind/symmetric/service/impl/statistic-service-sql.xml index 2f76e31ac2..41d7d5c60d 100644 --- a/symmetric/symmetric-core/src/main/resources/org/jumpmind/symmetric/service/impl/statistic-service-sql.xml +++ b/symmetric/symmetric-core/src/main/resources/org/jumpmind/symmetric/service/impl/statistic-service-sql.xml @@ -53,7 +53,23 @@ from $[sym.sync.table.prefix]_node_host_stats where start_time >= ? and end_time <= ? and node_id=? order by start_time asc ]]> - + + + + + + = ? and end_time <= ? and node_id=? order by start_time asc + ]]> + \ No newline at end of file diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric/symmetric-core/src/main/resources/symmetric-default.properties index fb77862fd6..0ebf737140 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric/symmetric-core/src/main/resources/symmetric-default.properties @@ -535,3 +535,6 @@ trigger.update.capture.changed.data.only.enabled=false # This is the expected increment value for the data_id in the data table. data.id.increment.by=1 + +# Control whether statistics are recorded. +statistic.record.enable=true diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-jobs.xml b/symmetric/symmetric-core/src/main/resources/symmetric-jobs.xml index f90e498aa6..864558a827 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-jobs.xml +++ b/symmetric/symmetric-core/src/main/resources/symmetric-jobs.xml @@ -34,6 +34,7 @@ http://www.springframework.org/schema/util http://www.springframework.org/schema + diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml index 09fbe5047f..6003c5e48d 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric/symmetric-core/src/main/resources/symmetric-schema.xml @@ -124,7 +124,6 @@ - diff --git a/symmetric/symmetric-core/src/main/resources/symmetric-services.xml b/symmetric/symmetric-core/src/main/resources/symmetric-services.xml index 40bf94adcb..d91391d972 100644 --- a/symmetric/symmetric-core/src/main/resources/symmetric-services.xml +++ b/symmetric/symmetric-core/src/main/resources/symmetric-services.xml @@ -15,6 +15,7 @@ + diff --git a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java index fbb157fc71..134a9c63f7 100644 --- a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java +++ b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/statistic/MockStatisticManager.java @@ -27,6 +27,9 @@ public class MockStatisticManager implements IStatisticManager { public void flush() { } + public void addJobStats(String jobName, long startTime, long endTime, long processedCount) { + } + public void incrementRestart() { } diff --git a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/stress/AbstractMultiTierStressTest.java b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/stress/AbstractMultiTierStressTest.java index bf3cec1f17..84d41c1980 100644 --- a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/stress/AbstractMultiTierStressTest.java +++ b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/stress/AbstractMultiTierStressTest.java @@ -110,7 +110,7 @@ public void pushTest() { 12); w2.start(); w1.start(); - while (regionServer.getEngine().push() || !w2.done || !w1.done) { + while (regionServer.getEngine().push().wasDataProcessed() || !w2.done || !w1.done) { try { Thread.sleep(5); } catch (Exception ex) { diff --git a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/AbstractIntegrationTest.java b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/AbstractIntegrationTest.java index 4ca5e3f3dd..ac710d3b1f 100644 --- a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/AbstractIntegrationTest.java +++ b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/AbstractIntegrationTest.java @@ -122,7 +122,7 @@ protected boolean clientPush() { int tries = 0; boolean pushed = false; while (!pushed && tries < 10) { - pushed = getClientEngine().push(); + pushed = getClientEngine().push().wasDataProcessed(); AppUtils.sleep(100); tries++; } diff --git a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/LoadFromClientIntegrationTest.java b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/LoadFromClientIntegrationTest.java index 1b92ebf02d..4a77a02f35 100644 --- a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/LoadFromClientIntegrationTest.java +++ b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/LoadFromClientIntegrationTest.java @@ -47,7 +47,7 @@ public void registerClientWithRoot() { Assert.assertTrue(result, result.startsWith("Successfully opened initial load for node")); getClientEngine().route(); - while (getClientEngine().push()) { + while (getClientEngine().push().wasDataProcessed()) { AppUtils.sleep(5); } Assert.assertEquals(0, getInitialLoadEnabled()); diff --git a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/MultiTierUnitTest.java b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/MultiTierUnitTest.java index 2eda51b256..b9ce444446 100644 --- a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/MultiTierUnitTest.java +++ b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/MultiTierUnitTest.java @@ -154,7 +154,7 @@ public void testHeartbeatFromRegion01ToHomeServer() throws Exception { .assertNotSame( "The root heartbeat time should not be the same as the updated client heartbeat time", clientHeartbeatTimeAfter, rootHeartbeatTimeBefore); - while (region01Server.getEngine().push()) { + while (region01Server.getEngine().push().wasDataProcessed()) { // continue to push while there data to push } Date rootHeartbeatTimeAfter = homeJdbcTemplate.queryForObject(checkHeartbeatSql, diff --git a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/SimpleIntegrationTest.java b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/SimpleIntegrationTest.java index ed79e39c9a..c9a7172d4d 100644 --- a/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/SimpleIntegrationTest.java +++ b/symmetric/symmetric-server/src/test/java/org/jumpmind/symmetric/test/SimpleIntegrationTest.java @@ -257,7 +257,7 @@ public void syncToClientMultipleUpdates() { rootJdbcTemplate.update("update test_customer set zip=? where customer_id=?", new Object[] { NEW_ZIP, 100 }); rootJdbcTemplate.update("update test_customer set name=? where customer_id=?", new Object[] { NEW_NAME, 100 }); - boolean didPullData = getClientEngine().pull(); + boolean didPullData = getClientEngine().pull().wasDataProcessed(); Assert.assertTrue(didPullData); @@ -1134,7 +1134,7 @@ public Object doInConnection(Connection con) throws SQLException, DataAccessExce .warn("If you see this message more than once the root database isn't respecting the fact that auto commit is set to false!"); } count++; - } while (getClientEngine().pull()); + } while (getClientEngine().pull().wasDataProcessed()); int newCount = clientJdbcTemplate.queryForInt("select count(*) from ONE_COLUMN_TABLE"); Assert.assertEquals(50, newCount - oldCount); clientParameterService.saveParameter(ParameterConstants.DATA_LOADER_MAX_ROWS_BEFORE_COMMIT, diff --git a/symmetric/symmetric-server/src/test/resources/test-data-drop-all.sql b/symmetric/symmetric-server/src/test/resources/test-data-drop-all.sql index 410689cff3..a2f9954e2b 100644 --- a/symmetric/symmetric-server/src/test/resources/test-data-drop-all.sql +++ b/symmetric/symmetric-server/src/test/resources/test-data-drop-all.sql @@ -21,6 +21,7 @@ drop table sym_outgoing_batch; drop table sym_parameter; drop table sym_node_host_channel_stats; drop table sym_node_host_stats; +drop table sym_node_host_job_stats; drop table sym_registration_redirect; drop table sym_registration_request; -- drop table sym_alert_def;