diff --git a/symmetric-assemble/build.gradle b/symmetric-assemble/build.gradle
index 459d75fe87..ab613b6ce0 100644
--- a/symmetric-assemble/build.gradle
+++ b/symmetric-assemble/build.gradle
@@ -127,6 +127,7 @@ project(':symmetric-core') {
compile project(":symmetric-io")
compile project(":symmetric-util")
compile "commons-fileupload:commons-fileupload:$commonsFileuploadVersion"
+ compile "javax.mail:mail:1.4.5"
testCompile project(path: ':symmetric-util', configuration: 'testArtifacts')
testCompile project(path: ':symmetric-jdbc', configuration: 'testArtifacts')
}
diff --git a/symmetric-assemble/src/asciidoc/configuration.ad b/symmetric-assemble/src/asciidoc/configuration.ad
index e9c88ded5d..dc50e2012f 100644
--- a/symmetric-assemble/src/asciidoc/configuration.ad
+++ b/symmetric-assemble/src/asciidoc/configuration.ad
@@ -43,5 +43,6 @@ include::configuration/parameters.ad[]
ifdef::pro[]
include::configuration/users.ad[]
include::configuration/ldap.ad[]
+include::configuration/mail-server.ad[]
include::configuration/license-key.ad[]
endif::pro[]
diff --git a/symmetric-assemble/src/asciidoc/configuration/mail-server.ad b/symmetric-assemble/src/asciidoc/configuration/mail-server.ad
new file mode 100644
index 0000000000..1596dc4b9f
--- /dev/null
+++ b/symmetric-assemble/src/asciidoc/configuration/mail-server.ad
@@ -0,0 +1,18 @@
+
+=== Mail Server
+
+A mail server can be configured for sending email notifications.
+
+Target Nodes:: The node group ID that will use this configuration.
+Hostname:: The hostname or IP address of the mail server to contact for sending mail.
+Transport:: The transport mechanism is either SMTP (Simple Mail Transfer Protocol) or SMTPS (encrypted with SSL).
+Port:: The default port for SMTP is 25, while the default port for SMTPS is 465.
+Use StartTLS:: After connecting over SMTP, the TLS protocol is used to encrypt content.
+Use Authentication:: The mail server requires a login and password before email can be sent.
+User:: The user login to use for authentication.
+Password:: The login password to use for authentication.
+
+ifdef::pro[]
+The "Test" button will use the current settings on the screen to connect to the mail server with the configured
+transport and report any errors.
+endif::pro[]
diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java
index 27f416d53d..c4ed7738eb 100644
--- a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java
+++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/JobManager.java
@@ -64,7 +64,7 @@ public JobManager(ISymmetricEngine engine) {
this.jobs.add(new FileSyncPullJob(engine,taskScheduler));
this.jobs.add(new FileSyncPushJob(engine,taskScheduler));
this.jobs.add(new InitialLoadExtractorJob(engine,taskScheduler));
-
+ this.jobs.add(new NotificationJob(engine, taskScheduler));
}
public IJob getJob(String name) {
diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/job/NotificationJob.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/NotificationJob.java
new file mode 100644
index 0000000000..edae4af9a5
--- /dev/null
+++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/job/NotificationJob.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.job;
+
+import org.jumpmind.symmetric.ISymmetricEngine;
+import org.jumpmind.symmetric.service.ClusterConstants;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+public class NotificationJob extends AbstractJob {
+
+ public NotificationJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
+ super("job.notification", true, engine.getParameterService().is("start.notification.job"), engine, taskScheduler);
+ }
+
+ @Override
+ public void doJob(boolean force) throws Exception {
+ if (engine != null) {
+ engine.getNotificationService().update();
+ }
+ }
+
+ public String getClusterLockName() {
+ return ClusterConstants.NOTIFICATION;
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java
index 90e6c399df..6be3074c7e 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java
@@ -79,8 +79,10 @@
import org.jumpmind.symmetric.service.IGroupletService;
import org.jumpmind.symmetric.service.IIncomingBatchService;
import org.jumpmind.symmetric.service.ILoadFilterService;
+import org.jumpmind.symmetric.service.IMailService;
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeService;
+import org.jumpmind.symmetric.service.INotificationService;
import org.jumpmind.symmetric.service.IOfflinePullService;
import org.jumpmind.symmetric.service.IOfflinePushService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
@@ -106,8 +108,10 @@
import org.jumpmind.symmetric.service.impl.GroupletService;
import org.jumpmind.symmetric.service.impl.IncomingBatchService;
import org.jumpmind.symmetric.service.impl.LoadFilterService;
+import org.jumpmind.symmetric.service.impl.MailService;
import org.jumpmind.symmetric.service.impl.NodeCommunicationService;
import org.jumpmind.symmetric.service.impl.NodeService;
+import org.jumpmind.symmetric.service.impl.NotificationService;
import org.jumpmind.symmetric.service.impl.OfflinePullService;
import org.jumpmind.symmetric.service.impl.OfflinePushService;
import org.jumpmind.symmetric.service.impl.OutgoingBatchService;
@@ -219,7 +223,11 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine {
protected INodeCommunicationService nodeCommunicationService;
- protected IFileSyncService fileSyncService;
+ protected IFileSyncService fileSyncService;
+
+ protected INotificationService notificationService;
+
+ protected IMailService mailService;
protected Date lastRestartTime = null;
@@ -344,6 +352,8 @@ protected void init() {
nodeService, dataLoaderService, clusterService, nodeCommunicationService,
configurationService, extensionService, offlineTransportManager);
this.fileSyncService = new FileSyncService(this);
+ this.notificationService = new NotificationService(parameterService, symmetricDialect, extensionService);
+ this.mailService = new MailService(parameterService, symmetricDialect);
this.jobManager = createJobManager();
extensionService.addExtensionPoint(new DefaultOfflineServerListener(
@@ -1044,6 +1054,14 @@ public IExtensionService getExtensionService() {
return extensionService;
}
+ public INotificationService getNotificationService() {
+ return notificationService;
+ }
+
+ public IMailService getMailService() {
+ return mailService;
+ }
+
public IStagingManager getStagingManager() {
return stagingManager;
}
@@ -1059,7 +1077,7 @@ public INodeCommunicationService getNodeCommunicationService() {
public IGroupletService getGroupletService() {
return groupletService;
}
-
+
private void removeMeFromMap(Map map) {
Set keys = new HashSet(map.keySet());
for (String key : keys) {
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java
index 6a0eb68c99..f1e2a51b45 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java
@@ -45,8 +45,10 @@
import org.jumpmind.symmetric.service.IGroupletService;
import org.jumpmind.symmetric.service.IIncomingBatchService;
import org.jumpmind.symmetric.service.ILoadFilterService;
+import org.jumpmind.symmetric.service.IMailService;
import org.jumpmind.symmetric.service.INodeCommunicationService;
import org.jumpmind.symmetric.service.INodeService;
+import org.jumpmind.symmetric.service.INotificationService;
import org.jumpmind.symmetric.service.IOfflinePullService;
import org.jumpmind.symmetric.service.IOfflinePushService;
import org.jumpmind.symmetric.service.IOutgoingBatchService;
@@ -289,6 +291,10 @@ public interface ISymmetricEngine {
public IExtensionService getExtensionService();
+ public INotificationService getNotificationService();
+
+ public IMailService getMailService();
+
public IStagingManager getStagingManager();
public ISqlTemplate getSqlTemplate();
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java
index 42e771eef4..0b69a33c05 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java
@@ -56,6 +56,7 @@ private ParameterConstants() {
public final static String START_STATISTIC_FLUSH_JOB = "start.stat.flush.job";
public final static String START_STAGE_MGMT_JOB = "start.stage.management.job";
public final static String START_WATCHDOG_JOB = "start.watchdog.job";
+ public final static String START_NOTIFICATION_JOB = "start.notification.job";
public final static String PULL_THREAD_COUNT_PER_SERVER = "pull.thread.per.server.count";
public final static String PULL_MINIMUM_PERIOD_MS = "pull.period.minimum.ms";
@@ -311,6 +312,15 @@ private ParameterConstants() {
public final static String DATA_CREATE_TIME_TIMEZONE = "data.create_time.timezone";
+ public static final String SMTP_HOST = "smtp.host";
+ public static final String SMTP_TRANSPORT = "smtp.transport";
+ public static final String SMTP_PORT = "smtp.port";
+ public static final String SMTP_FROM = "smtp.from";
+ public static final String SMTP_USER = "smtp.user";
+ public static final String SMTP_PASSWORD = "smtp.password";
+ public static final String SMTP_USE_STARTTLS = "smtp.starttls";
+ public static final String SMTP_USE_AUTH = "smtp.auth";
+
public static Map getParameterMetaData() {
return parameterMetaData;
}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java
index 330dfab3db..fd9a9ef163 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/TableConstants.java
@@ -73,6 +73,7 @@ public class TableConstants {
public static final String SYM_FILE_INCOMING = "file_incoming";
public static final String SYM_CONSOLE_USER = "console_user";
public static final String SYM_EXTENSION = "extension";
+ public static final String SYM_NOTIFICATION = "notification";
private static List tablesWithPrefix;
@@ -127,6 +128,7 @@ protected static List populateConfigTables(String tablePrefix) {
configTables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_NODE_IDENTITY));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_EXTENSION));
+ configTables.add(getTableName(tablePrefix, TableConstants.SYM_NOTIFICATION));
return configTables;
}
@@ -175,6 +177,7 @@ protected static List populateAllTables(String tablePrefix) {
tables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT));
tables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_INCOMING));
tables.add(getTableName(tablePrefix, SYM_EXTENSION));
+ tables.add(getTableName(tablePrefix, SYM_NOTIFICATION));
return tables;
}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Notification.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Notification.java
new file mode 100644
index 0000000000..4c5357a941
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Notification.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.model;
+
+import java.util.Date;
+
+public class Notification {
+
+ protected String notificationId;
+
+ protected String externalId;
+
+ protected String nodeGroupId;
+
+ protected String type;
+
+ protected boolean enabled;
+
+ protected long threshold;
+
+ protected int period;
+
+ protected int sampleMinutes;
+
+ protected int severityLevel;
+
+ protected long windowMinutes;
+
+ protected Date createTime;
+
+ protected String lastUpdateBy;
+
+ protected Date lastUpdateTime;
+
+ public String getNotificationId() {
+ return notificationId;
+ }
+
+ public Notification() {
+ }
+
+ public void setNotificationId(String notificationId) {
+ this.notificationId = notificationId;
+ }
+
+ public String getExternalId() {
+ return externalId;
+ }
+
+ public void setExternalId(String externalId) {
+ this.externalId = externalId;
+ }
+
+ public String getNodeGroupId() {
+ return nodeGroupId;
+ }
+
+ public void setNodeGroupId(String nodeGroupId) {
+ this.nodeGroupId = nodeGroupId;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ }
+
+ public long getThreshold() {
+ return threshold;
+ }
+
+ public void setThreshold(long threshhold) {
+ this.threshold = threshhold;
+ }
+
+ public int getPeriod() {
+ return period;
+ }
+
+ public void setPeriod(int period) {
+ this.period = period;
+ }
+
+ public int getSampleMinutes() {
+ return sampleMinutes;
+ }
+
+ public void setSampleMinutes(int sampleMinutes) {
+ this.sampleMinutes = sampleMinutes;
+ }
+
+ public int getSeverityLevel() {
+ return severityLevel;
+ }
+
+ public void setSeverityLevel(int severityLevel) {
+ this.severityLevel = severityLevel;
+ }
+
+ public long getWindowMinutes() {
+ return windowMinutes;
+ }
+
+ public void setWindowMinutes(long windowMinutes) {
+ this.windowMinutes = windowMinutes;
+ }
+
+ public Date getCreateTime() {
+ return createTime;
+ }
+
+ public void setCreateTime(Date createTime) {
+ this.createTime = createTime;
+ }
+
+ public String getLastUpdateBy() {
+ return lastUpdateBy;
+ }
+
+ public void setLastUpdateBy(String lastUpdateBy) {
+ this.lastUpdateBy = lastUpdateBy;
+ }
+
+ public Date getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+
+ public void setLastUpdateTime(Date lastUpdateTime) {
+ this.lastUpdateTime = lastUpdateTime;
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NotificationEvent.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NotificationEvent.java
new file mode 100644
index 0000000000..c71174a212
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NotificationEvent.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.model;
+
+import java.util.Date;
+
+public class NotificationEvent {
+
+ protected String notificationId;
+
+ protected String nodeId;
+
+ protected String hostName;
+
+ protected Date eventTime;
+
+ protected long value;
+
+ protected long threshold;
+
+ protected long period;
+
+ protected int severityLevel;
+
+ public String getNotificationId() {
+ return notificationId;
+ }
+
+ public void setNotificationId(String notificationId) {
+ this.notificationId = notificationId;
+ }
+
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public Date getEventTime() {
+ return eventTime;
+ }
+
+ public void setEventTime(Date eventTime) {
+ this.eventTime = eventTime;
+ }
+
+ public long getValue() {
+ return value;
+ }
+
+ public void setValue(long value) {
+ this.value = value;
+ }
+
+ public long getThreshold() {
+ return threshold;
+ }
+
+ public void setThreshold(long threshold) {
+ this.threshold = threshold;
+ }
+
+ public long getPeriod() {
+ return period;
+ }
+
+ public void setPeriod(long period) {
+ this.period = period;
+ }
+
+ public int getSeverityLevel() {
+ return severityLevel;
+ }
+
+ public void setSeverityLevel(int severityLevel) {
+ this.severityLevel = severityLevel;
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/AbstractNotificationCheck.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/AbstractNotificationCheck.java
new file mode 100644
index 0000000000..9e88e13e7e
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/AbstractNotificationCheck.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.notification;
+
+import java.lang.management.ThreadInfo;
+
+import org.jumpmind.symmetric.ISymmetricEngine;
+import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
+
+public abstract class AbstractNotificationCheck implements INotificationCheck, ISymmetricEngineAware {
+
+ protected final int TOP_THREADS = 3;
+
+ protected final int MAX_STACK_DEPTH = 30;
+
+ protected ISymmetricEngine engine;
+
+ @Override
+ public boolean shouldLockCluster() {
+ return true;
+ }
+
+ @Override
+ public void setSymmetricEngine(ISymmetricEngine engine) {
+ this.engine = engine;
+ }
+
+ protected void rankTopUsage(ThreadInfo infos[], long usages[], ThreadInfo info, long usage) {
+ for (int i = 0; i < infos.length; i++) {
+ if (usage > usages[i]) {
+ for (int j = i + 1; j < infos.length; j++) {
+ infos[j] = infos[j - 1];
+ usages[j] = usages[j - 1];
+ }
+ infos[i] = info;
+ usages[i] = usage;
+ break;
+ }
+ }
+ }
+
+ protected String logStackTrace(ThreadInfo info) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Stack trace for thread ").append(info.getThreadId()).append(":\n");
+ for (StackTraceElement element : info.getStackTrace()) {
+ sb.append(element.getClassName()).append(".").append(element.getMethodName());
+ sb.append("(").append(element.getFileName()).append(":").append(element.getLineNumber()).append(")");
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/INotificationCheck.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/INotificationCheck.java
new file mode 100644
index 0000000000..d6e6d0c00e
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/INotificationCheck.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.notification;
+
+import org.jumpmind.extension.IExtensionPoint;
+import org.jumpmind.symmetric.model.Notification;
+
+public interface INotificationCheck extends IExtensionPoint {
+
+ public long check(Notification notification);
+
+ public boolean shouldLockCluster();
+
+ public boolean requiresPeriod();
+
+ public String getType();
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckBatchError.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckBatchError.java
new file mode 100644
index 0000000000..e469cf962e
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckBatchError.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.notification;
+
+import java.util.List;
+
+import org.jumpmind.symmetric.ISymmetricEngine;
+import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
+import org.jumpmind.symmetric.model.IncomingBatch;
+import org.jumpmind.symmetric.model.Notification;
+import org.jumpmind.symmetric.model.OutgoingBatch;
+import org.jumpmind.symmetric.model.OutgoingBatches;
+import org.jumpmind.symmetric.service.IIncomingBatchService;
+import org.jumpmind.symmetric.service.IOutgoingBatchService;
+
+public class NotificationCheckBatchError implements INotificationCheck, ISymmetricEngineAware {
+
+ protected IOutgoingBatchService outgoingBatchService;
+
+ protected IIncomingBatchService incomingBatchService;
+
+ @Override
+ public String getType() {
+ return "batchError";
+ }
+
+ @Override
+ public long check(Notification notification) {
+ int outgoingErrorCount = 0;
+ OutgoingBatches outgoingBatches = outgoingBatchService.getOutgoingBatchErrors(1000);
+ for (OutgoingBatch batch : outgoingBatches.getBatches()) {
+ int batchErrorMinutes = (int) (System.currentTimeMillis() - batch.getCreateTime().getTime()) / 60000;
+ if (batchErrorMinutes >= notification.getThreshold()) {
+ outgoingErrorCount++;
+ }
+ }
+
+ int incomingErrorCount = 0;
+ List incomingBatches = incomingBatchService.findIncomingBatchErrors(1000);
+ for (IncomingBatch batch : incomingBatches) {
+ int batchErrorMinutes = (int) (System.currentTimeMillis() - batch.getCreateTime().getTime()) / 60000;
+ if (batchErrorMinutes >= notification.getThreshold()) {
+ incomingErrorCount++;
+ }
+ }
+
+ return outgoingErrorCount + incomingErrorCount;
+ }
+
+ @Override
+ public boolean shouldLockCluster() {
+ return true;
+ }
+
+ @Override
+ public boolean requiresPeriod() {
+ return false;
+ }
+
+ @Override
+ public void setSymmetricEngine(ISymmetricEngine engine) {
+ outgoingBatchService = engine.getOutgoingBatchService();
+ incomingBatchService = engine.getIncomingBatchService();
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckBatchUnsent.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckBatchUnsent.java
new file mode 100644
index 0000000000..ef86ffc14d
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckBatchUnsent.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.notification;
+
+import org.jumpmind.symmetric.ISymmetricEngine;
+import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
+import org.jumpmind.symmetric.model.Notification;
+import org.jumpmind.symmetric.service.IOutgoingBatchService;
+
+public class NotificationCheckBatchUnsent implements INotificationCheck, ISymmetricEngineAware {
+
+ protected IOutgoingBatchService outgoingBatchService;
+
+ @Override
+ public String getType() {
+ return "batchUnsent";
+ }
+
+ @Override
+ public long check(Notification notification) {
+ return outgoingBatchService.countOutgoingBatchesUnsent();
+ }
+
+ @Override
+ public boolean shouldLockCluster() {
+ return true;
+ }
+
+ @Override
+ public boolean requiresPeriod() {
+ return false;
+ }
+
+ @Override
+ public void setSymmetricEngine(ISymmetricEngine engine) {
+ outgoingBatchService = engine.getOutgoingBatchService();
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckCpu.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckCpu.java
new file mode 100644
index 0000000000..b93647a358
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckCpu.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.notification;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.lang.management.RuntimeMXBean;
+import java.lang.management.ThreadInfo;
+import java.lang.reflect.Method;
+
+import org.jumpmind.symmetric.model.Notification;
+
+import com.sun.management.ThreadMXBean;
+
+public class NotificationCheckCpu extends AbstractNotificationCheck {
+
+ protected OperatingSystemMXBean osBean;
+
+ protected RuntimeMXBean runtimeBean;
+
+ public NotificationCheckCpu() {
+ osBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
+ runtimeBean = ManagementFactory.getRuntimeMXBean();
+ }
+
+ @Override
+ public String getType() {
+ return "cpu";
+ }
+
+ @Override
+ public long check(Notification notification) {
+ int availableProcessors = osBean.getAvailableProcessors();
+ long prevUpTime = runtimeBean.getUptime();
+ long prevProcessCpuTime = getProcessCpuTime();
+
+ try {
+ Thread.sleep(500);
+ } catch (Exception ignore) {
+ }
+
+ long upTime = runtimeBean.getUptime();
+ long processCpuTime = getProcessCpuTime();
+ long elapsedCpu = processCpuTime - prevProcessCpuTime;
+ long elapsedTime = upTime - prevUpTime;
+
+ return (long) (elapsedCpu / (elapsedTime * 1000f * availableProcessors));
+ }
+
+ protected long getProcessCpuTime() {
+ long cpuTime = 0;
+ try {
+ Method method = osBean.getClass().getMethod("getProcessCpuTime");
+ method.setAccessible(true);
+ cpuTime = (Long) method.invoke(osBean);
+ } catch (Exception ignore) {
+ }
+ return cpuTime;
+ }
+
+ protected String getNotificationMessage(long value, long threshold, long period) {
+ ThreadInfo infos[] = new ThreadInfo[TOP_THREADS];
+ long cpuUsages[] = new long[TOP_THREADS];
+ ThreadMXBean threadBean = (com.sun.management.ThreadMXBean) ManagementFactory.getThreadMXBean();
+ for (long threadId : threadBean.getAllThreadIds()) {
+ ThreadInfo info = threadBean.getThreadInfo(threadId);
+ if (info.getThreadState() != Thread.State.TERMINATED) {
+ rankTopUsage(infos, cpuUsages, info, threadBean.getThreadCpuTime(threadId));
+ }
+ }
+
+ String text = "CPU usage is at " + value;
+ for (int i = 0; i < infos.length; i++) {
+ text += "Top #" + (i + 1) + " CPU thread " + infos[i].getThreadId() + " is using " + (cpuUsages[i] / 1000000000f) + "s";
+ text += logStackTrace(threadBean.getThreadInfo(infos[i].getThreadId(), MAX_STACK_DEPTH));
+ }
+ return text;
+ }
+
+ @Override
+ public boolean requiresPeriod() {
+ return true;
+ }
+
+ @Override
+ public boolean shouldLockCluster() {
+ return false;
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckDataGap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckDataGap.java
new file mode 100644
index 0000000000..4d2ecbc14d
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckDataGap.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.notification;
+
+import org.jumpmind.symmetric.ISymmetricEngine;
+import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
+import org.jumpmind.symmetric.model.DataGap;
+import org.jumpmind.symmetric.model.Notification;
+import org.jumpmind.symmetric.service.IDataService;
+
+public class NotificationCheckDataGap implements INotificationCheck, ISymmetricEngineAware {
+
+ protected IDataService dataService;
+
+ @Override
+ public String getType() {
+ return "dataGap";
+ }
+
+ @Override
+ public long check(Notification notification) {
+ return dataService.countDataGapsByStatus(DataGap.Status.GP);
+ }
+
+ @Override
+ public boolean shouldLockCluster() {
+ return true;
+ }
+
+ @Override
+ public boolean requiresPeriod() {
+ return false;
+ }
+
+ @Override
+ public void setSymmetricEngine(ISymmetricEngine engine) {
+ dataService = engine.getDataService();
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckDisk.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckDisk.java
new file mode 100644
index 0000000000..26cbbbca3e
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckDisk.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.notification;
+
+import java.io.File;
+
+import org.jumpmind.symmetric.ISymmetricEngine;
+import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
+import org.jumpmind.symmetric.model.Notification;
+
+public class NotificationCheckDisk implements INotificationCheck, ISymmetricEngineAware {
+
+ protected File tempDirectory;
+
+ @Override
+ public String getType() {
+ return "disk";
+ }
+
+ @Override
+ public long check(Notification notification) {
+ return (long) ((1f - ((double) tempDirectory.getUsableSpace() / (double) tempDirectory.getTotalSpace())) * 100f);
+ }
+
+ @Override
+ public boolean shouldLockCluster() {
+ return false;
+ }
+
+ @Override
+ public boolean requiresPeriod() {
+ return false;
+ }
+
+ @Override
+ public void setSymmetricEngine(ISymmetricEngine engine) {
+ tempDirectory = new File(engine.getParameterService().getTempDirectory());
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckMemory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckMemory.java
new file mode 100644
index 0000000000..35d4fc7b31
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckMemory.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.notification;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryPoolMXBean;
+import java.lang.management.MemoryType;
+import java.lang.management.ThreadInfo;
+
+import org.jumpmind.symmetric.model.Notification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sun.management.ThreadMXBean;
+
+public class NotificationCheckMemory extends AbstractNotificationCheck {
+
+ protected final Logger log = LoggerFactory.getLogger(getClass());
+
+ protected MemoryPoolMXBean tenuredPool;
+
+ @Override
+ public String getType() {
+ return "memory";
+ }
+
+ public NotificationCheckMemory() {
+ for (MemoryPoolMXBean pool : ManagementFactory.getMemoryPoolMXBeans()) {
+ if (pool.getType() == MemoryType.HEAP && pool.isUsageThresholdSupported()) {
+ tenuredPool = pool;
+ break;
+ }
+ }
+ if (tenuredPool == null) {
+ log.warn("Unable to find tenured memory pool");
+ }
+ }
+
+ @Override
+ public long check(Notification notification) {
+ long usage = 0;
+ if (tenuredPool != null) {
+ usage = (long) (tenuredPool.getUsage().getUsed() / tenuredPool.getUsage().getMax());
+ }
+ return usage;
+ }
+
+ public String getMessage(long value, long threshold, long period) {
+ long maxMemory = tenuredPool.getUsage().getMax();
+ long usedMemory = tenuredPool.getUsage().getUsed();
+ String text = "Memory threshold exceeded, " + usedMemory + " of " + maxMemory;
+
+ ThreadInfo infos[] = new ThreadInfo[TOP_THREADS];
+ long byteUsages[] = new long[TOP_THREADS];
+ ThreadMXBean threadBean = (com.sun.management.ThreadMXBean) ManagementFactory.getThreadMXBean();
+ for (long threadId : threadBean.getAllThreadIds()) {
+ ThreadInfo info = threadBean.getThreadInfo(threadId);
+ if (info.getThreadState() != Thread.State.TERMINATED) {
+ rankTopUsage(infos, byteUsages, info, threadBean.getThreadAllocatedBytes(threadId));
+ }
+ }
+
+ for (int i = 0; i < infos.length; i++) {
+ text += "Top #" + (i + 1) + " memory thread " + infos[i].getThreadId() + " is using "
+ + String.format("%.1f", ((double) byteUsages[i] / 1048576f)) + "MB";
+ text += logStackTrace(threadBean.getThreadInfo(infos[i].getThreadId(), MAX_STACK_DEPTH));
+ }
+ return text;
+ }
+
+ @Override
+ public boolean requiresPeriod() {
+ return false;
+ }
+
+ @Override
+ public boolean shouldLockCluster() {
+ return false;
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckUnrouted.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckUnrouted.java
new file mode 100644
index 0000000000..730798a273
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/notification/NotificationCheckUnrouted.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.notification;
+
+import org.jumpmind.symmetric.ISymmetricEngine;
+import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
+import org.jumpmind.symmetric.model.Notification;
+import org.jumpmind.symmetric.service.IRouterService;
+
+public class NotificationCheckUnrouted implements INotificationCheck, ISymmetricEngineAware {
+
+ protected IRouterService routerService;
+
+ @Override
+ public String getType() {
+ return "dataUnrouted";
+ }
+
+ @Override
+ public long check(Notification notification) {
+ return routerService.getUnroutedDataCount();
+ }
+
+ @Override
+ public boolean shouldLockCluster() {
+ return true;
+ }
+
+ @Override
+ public boolean requiresPeriod() {
+ return false;
+ }
+
+ @Override
+ public void setSymmetricEngine(ISymmetricEngine engine) {
+ routerService = engine.getRouterService();
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java
index c4c56da618..ae64e14afb 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/ClusterConstants.java
@@ -46,6 +46,7 @@ public class ClusterConstants {
public static final String FILE_SYNC_SHARED = "FILE_SYNC_SHARED";
public static final String FILE_SYNC_PULL = "FILE_SYNC_PULL";
public static final String FILE_SYNC_PUSH = "FILE_SYNC_PUSH";
+ public static final String NOTIFICATION = "NOTIFICATION";
public static final String TYPE_CLUSTER = "CLUSTER";
public static final String TYPE_EXCLUSIVE = "EXCLUSIVE";
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java
index d86ba846ad..4b5ea6cbcb 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IDataService.java
@@ -111,6 +111,8 @@ public void insertScriptEvent(ISqlTransaction transaction, String channelId,
public void checkForAndUpdateMissingChannelIds(long firstDataId, long lastDataId);
+ public long countDataGapsByStatus(DataGap.Status status);
+
public List findDataGapsByStatus(DataGap.Status status);
public List findDataGaps();
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IMailService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IMailService.java
new file mode 100644
index 0000000000..160fd46246
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IMailService.java
@@ -0,0 +1,11 @@
+package org.jumpmind.symmetric.service;
+
+import org.jumpmind.properties.TypedProperties;
+
+public interface IMailService {
+
+ public void sendEmail(String subject, String text, String recipients);
+
+ public String testTransport(TypedProperties prop);
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INotificationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INotificationService.java
new file mode 100644
index 0000000000..f6936981fa
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/INotificationService.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.service;
+
+import java.util.List;
+
+import org.jumpmind.symmetric.model.Notification;
+import org.jumpmind.symmetric.model.NotificationEvent;
+
+public interface INotificationService {
+
+ public void update();
+
+ public List getNotifications();
+
+ public void deleteNotification(String notificationId);
+
+ public void saveNotification(Notification notification);
+
+ public List getNotificationEvents();
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java
index 4c53fe722c..8e67e66be9 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java
@@ -54,7 +54,6 @@
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataEventType;
-import org.jumpmind.symmetric.io.data.transform.TransformTable;
import org.jumpmind.symmetric.job.PushHeartbeatListener;
import org.jumpmind.symmetric.load.IReloadListener;
import org.jumpmind.symmetric.model.Channel;
@@ -1382,6 +1381,10 @@ protected Data createData(ISqlTransaction transaction, Trigger trigger, String w
return data;
}
+ public long countDataGapsByStatus(DataGap.Status status) {
+ return sqlTemplate.queryForLong(getSql("countDataGapsByStatusSql"), new Object[] { status.name() });
+ }
+
public List findDataGapsByStatus(DataGap.Status status) {
return sqlTemplate.query(getSql("findDataGapsByStatusSql"), new ISqlRowMapper() {
public DataGap mapRow(Row rs) {
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java
index 059e01f4a6..73d8fd91d5 100644
--- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataServiceSqlMap.java
@@ -88,6 +88,8 @@ public DataServiceSqlMap(IDatabasePlatform platform, Map replace
putSql("findMinDataSql", ""
+ "select min(data_id) from $(data) where data_id >= ?");
+ putSql("countDataGapsByStatusSql", "select count(*) from $(data_gap) where status = ?");
+
putSql("findDataGapsByStatusSql",
""
+ "select start_id, end_id, create_time from $(data_gap) where status=? order by start_id asc ");
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MailService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MailService.java
new file mode 100644
index 0000000000..7a7e094ee1
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/MailService.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.service.impl;
+
+import java.util.Date;
+import java.util.Properties;
+
+import javax.mail.Message.RecipientType;
+import javax.mail.MessagingException;
+import javax.mail.NoSuchProviderException;
+import javax.mail.Session;
+import javax.mail.Transport;
+import javax.mail.internet.MimeMessage;
+
+import org.jumpmind.properties.TypedProperties;
+import org.jumpmind.symmetric.common.ParameterConstants;
+import org.jumpmind.symmetric.db.ISymmetricDialect;
+import org.jumpmind.symmetric.service.IMailService;
+import org.jumpmind.symmetric.service.IParameterService;
+
+public class MailService extends AbstractService implements IMailService {
+
+ protected static final String JAVAMAIL_HOST_NAME = "mail.host";
+ protected static final String JAVAMAIL_TRANSPORT = "mail.transport";
+ protected static final String JAVAMAIL_PORT_NUMBER = "mail.port";
+ protected static final String JAVAMAIL_FROM = "mail.from";
+ protected static final String JAVAMAIL_USERNAME = "mail.user";
+ protected static final String JAVAMAIL_PASSWORD = "mail.password";
+ protected static final String JAVAMAIL_USE_STARTTLS = "mail.smtp.starttls.enable";
+ protected static final String JAVAMAIL_USE_AUTH = "mail.smtp.auth";
+
+ public MailService(IParameterService parameterService, ISymmetricDialect symmetricDialect) {
+ super(parameterService, symmetricDialect);
+ }
+
+ public void sendEmail(String subject, String text, String recipients) {
+ Session session = Session.getInstance(getJavaMailProperties());
+ Transport transport;
+ try {
+ transport = session.getTransport(parameterService.getString(ParameterConstants.SMTP_TRANSPORT, "smtp"));
+ } catch (NoSuchProviderException e) {
+ log.error("Failure while obtaining transport", e);
+ return;
+ }
+
+ try {
+ if (parameterService.is(ParameterConstants.SMTP_USE_AUTH, false)) {
+ transport.connect(parameterService.getString(ParameterConstants.SMTP_USER),
+ parameterService.getString(ParameterConstants.SMTP_PASSWORD));
+ } else {
+ transport.connect();
+ }
+ } catch (MessagingException e) {
+ log.error("Failure while connecting to transport", e);
+ return;
+ }
+
+ try {
+ MimeMessage message = new MimeMessage(session);
+ message.setSentDate(new Date());
+ message.setRecipients(RecipientType.BCC, recipients);
+ message.setSubject(subject);
+ message.setText(text);
+ try {
+ transport.sendMessage(message, message.getAllRecipients());
+ } catch (MessagingException e) {
+ log.error("Failure while sending notification", e);
+ }
+ } catch (MessagingException e) {
+ log.error("Failure while preparing notification", e);
+ } finally {
+ try {
+ transport.close();
+ } catch (MessagingException e) {
+ }
+ }
+ }
+
+ public String testTransport(TypedProperties prop) {
+ String error = null;
+ Transport transport = null;
+ try {
+ Session session = Session.getInstance(getJavaMailProperties(prop));
+ transport = session.getTransport(prop.get(ParameterConstants.SMTP_TRANSPORT, "smtp"));
+ if (prop.is(ParameterConstants.SMTP_USE_AUTH, false)) {
+ transport.connect(prop.get(ParameterConstants.SMTP_USER),
+ prop.get(ParameterConstants.SMTP_PASSWORD));
+ } else {
+ transport.connect();
+ }
+ } catch (NoSuchProviderException e) {
+ error = e.getMessage();
+ } catch (MessagingException e) {
+ error = e.getMessage();
+ } finally {
+ try {
+ if (transport != null) {
+ transport.close();
+ }
+ } catch (MessagingException e) {
+ }
+ }
+ return error;
+ }
+
+ protected Properties getJavaMailProperties() {
+ Properties prop = new Properties();
+ prop.setProperty(JAVAMAIL_HOST_NAME, parameterService.getString(ParameterConstants.SMTP_HOST, "localhost"));
+ prop.setProperty(JAVAMAIL_PORT_NUMBER, parameterService.getString(ParameterConstants.SMTP_PORT, "25"));
+ prop.setProperty(JAVAMAIL_FROM, parameterService.getString(ParameterConstants.SMTP_FROM, "root@localhost"));
+ prop.setProperty(JAVAMAIL_USE_STARTTLS, parameterService.getString(ParameterConstants.SMTP_USE_STARTTLS, "false"));
+ prop.setProperty(JAVAMAIL_USE_AUTH, parameterService.getString(ParameterConstants.SMTP_USE_AUTH, "false"));
+ return prop;
+ }
+
+ protected Properties getJavaMailProperties(TypedProperties typedProp) {
+ Properties prop = new Properties();
+ prop.setProperty(JAVAMAIL_HOST_NAME, typedProp.get(ParameterConstants.SMTP_HOST, "localhost"));
+ prop.setProperty(JAVAMAIL_PORT_NUMBER, typedProp.get(ParameterConstants.SMTP_PORT, "25"));
+ prop.setProperty(JAVAMAIL_FROM, typedProp.get(ParameterConstants.SMTP_FROM, "root@localhost"));
+ prop.setProperty(JAVAMAIL_USE_STARTTLS, String.valueOf(typedProp.is(ParameterConstants.SMTP_USE_STARTTLS, false)));
+ prop.setProperty(JAVAMAIL_USE_AUTH, String.valueOf(typedProp.is(ParameterConstants.SMTP_USE_AUTH, false)));
+ return prop;
+ }
+
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NotificationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NotificationService.java
new file mode 100644
index 0000000000..5e21ea212b
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NotificationService.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.service.impl;
+
+import java.util.List;
+import java.util.Map;
+
+import org.jumpmind.db.sql.ISqlRowMapper;
+import org.jumpmind.db.sql.Row;
+import org.jumpmind.symmetric.db.ISymmetricDialect;
+import org.jumpmind.symmetric.model.Notification;
+import org.jumpmind.symmetric.model.NotificationEvent;
+import org.jumpmind.symmetric.notification.INotificationCheck;
+import org.jumpmind.symmetric.notification.NotificationCheckCpu;
+import org.jumpmind.symmetric.service.IExtensionService;
+import org.jumpmind.symmetric.service.INotificationService;
+import org.jumpmind.symmetric.service.IParameterService;
+
+public class NotificationService extends AbstractService implements INotificationService {
+
+ protected IExtensionService extensionService;
+
+ public NotificationService(IParameterService parameterService, ISymmetricDialect symmetricDialect, IExtensionService extensionService) {
+ super(parameterService, symmetricDialect);
+ setSqlMap(new NotificationServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens()));
+
+ this.extensionService = extensionService;
+
+ extensionService.addExtensionPoint("cpu", new NotificationCheckCpu());
+ }
+
+ @Override
+ public synchronized void update() {
+ Map notificationChecks = extensionService.getExtensionPointMap(INotificationCheck.class);
+ // TODO: cache notifications until cleared by ConfigurationChangedDataRouter
+ List notifications = getNotifications();
+
+ for (Notification notification : notifications) {
+ if (notification.isEnabled()) {
+ INotificationCheck notificationCheck = notificationChecks.get(notification.getType());
+ if (notificationCheck != null) {
+ long value = notificationCheck.check(notification);
+
+ if (notificationCheck.requiresPeriod()) {
+ // TODO: accumulate average over period, then check threshold
+
+ } else if (value >= notification.getThreshold()) {
+ // TODO: record sym_notification_event
+ }
+ } else {
+ log.warn("Could not find notification of type '" + notification.getType() + "'");
+ }
+ }
+ }
+
+ // TODO: for each sym_notification_action, see if sym_notification_events exist to act upon
+ }
+
+ @Override
+ public List getNotifications() {
+ return sqlTemplate.query(getSql("selectNotificationSql"), new NotificationRowMapper());
+ }
+
+ @Override
+ public List getNotificationEvents() {
+ return sqlTemplate.query(getSql("selectNotificationEventSql"), new NotificationEventRowMapper());
+ }
+
+ @Override
+ public void deleteNotification(String notificationId) {
+ sqlTemplate.update(getSql("deleteNotificationSql"), notificationId);
+ }
+
+ @Override
+ public void saveNotification(Notification notification) {
+ int count = sqlTemplate.update(getSql("updateNotificationSql"), notification.getExternalId(), notification.getNodeGroupId(),
+ notification.getType(), notification.isEnabled(), notification.getThreshold(), notification.getPeriod(), notification.getSampleMinutes(),
+ notification.getSeverityLevel(), notification.getWindowMinutes(), notification.getLastUpdateBy(),
+ notification.getLastUpdateTime(), notification.getNotificationId());
+ if (count == 0) {
+ sqlTemplate.update(getSql("insertNotificationSql"), notification.getNotificationId(), notification.getExternalId(), notification.getNodeGroupId(),
+ notification.getType(), notification.isEnabled(), notification.getThreshold(), notification.getPeriod(), notification.getSampleMinutes(),
+ notification.getSeverityLevel(), notification.getWindowMinutes(), notification.getCreateTime(), notification.getLastUpdateBy(),
+ notification.getLastUpdateTime());
+ }
+ }
+
+ class NotificationRowMapper implements ISqlRowMapper {
+ public Notification mapRow(Row row) {
+ Notification n = new Notification();
+ n.setNotificationId(row.getString("notification_id"));
+ n.setExternalId(row.getString("external_id"));
+ n.setNodeGroupId(row.getString("node_group_id"));
+ n.setType(row.getString("type"));
+ n.setEnabled(row.getBoolean("enabled"));
+ n.setThreshold(row.getLong("threshold"));
+ n.setPeriod(row.getInt("period"));
+ n.setSampleMinutes(row.getInt("sample_minutes"));
+ n.setSeverityLevel(row.getInt("severity_level"));
+ n.setWindowMinutes(row.getInt("window_minutes"));
+ n.setCreateTime(row.getDateTime("create_time"));
+ n.setLastUpdateBy(row.getString("last_update_by"));
+ n.setLastUpdateTime(row.getDateTime("last_update_time"));
+ return n;
+ }
+ }
+
+ class NotificationEventRowMapper implements ISqlRowMapper {
+ public NotificationEvent mapRow(Row row) {
+ NotificationEvent n = new NotificationEvent();
+ n.setNotificationId(row.getString("notification_id"));
+ n.setNodeId(row.getString("node_id"));
+ n.setHostName(row.getString("host_name"));
+ n.setEventTime(row.getTime("event_time"));
+ n.setValue(row.getLong("value"));
+ n.setThreshold(row.getLong("threshold"));
+ n.setPeriod(row.getLong("period"));
+ n.setSeverityLevel(row.getInt("severity_level"));
+ return n;
+ }
+ }
+}
diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NotificationServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NotificationServiceSqlMap.java
new file mode 100644
index 0000000000..8a8deef885
--- /dev/null
+++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NotificationServiceSqlMap.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to JumpMind Inc under one or more contributor
+ * license agreements. See the NOTICE file distributed
+ * with this work for additional information regarding
+ * copyright ownership. JumpMind Inc licenses this file
+ * to you under the GNU General Public License, version 3.0 (GPLv3)
+ * (the "License"); you may not use this file except in compliance
+ * with the License.
+ *
+ * You should have received a copy of the GNU General Public License,
+ * version 3.0 (GPLv3) along with this library; if not, see
+ * .
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jumpmind.symmetric.service.impl;
+
+import java.util.Map;
+
+import org.jumpmind.db.platform.IDatabasePlatform;
+
+public class NotificationServiceSqlMap extends AbstractSqlMap {
+
+ public NotificationServiceSqlMap(IDatabasePlatform platform, Map replacementTokens) {
+ super(platform, replacementTokens);
+
+ putSql("selectNotificationSql",
+ "select notification_id, external_id, node_group_id, type, enabled, threshold, period, sample_minutes, severity_level, window_minutes, " +
+ "create_time, last_update_by, last_update_time from $(notification)");
+
+ putSql("insertNotificationSql",
+ "insert into $(notification) " +
+ "(notification_id, external_id, node_group_id, type, enabled, threshold, period, sample_minutes, severity_level, window_minutes, " +
+ "create_time, last_update_by, last_update_time) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+
+ putSql("updateNotificationSql",
+ "update $(notification) " +
+ "set external_id = ?, node_group_id = ?, type = ?, enabled = ?, threshold = ?, period = ?, sample_minutes = ?, severity_level = ?, " +
+ "window_minutes = ?, last_update_by = ?, last_update_time = ? where notification_id = ?");
+
+ putSql("deleteNotificationSql",
+ "delete from $(notification) where notification_id = ?");
+
+ }
+}
\ No newline at end of file
diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml
index 60653d3b0e..85bfc4123c 100644
--- a/symmetric-core/src/main/resources/symmetric-schema.xml
+++ b/symmetric-core/src/main/resources/symmetric-schema.xml
@@ -506,6 +506,22 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+