Skip to content

Commit

Permalink
Don't run jobs (by default) if an engine hasn't been registered.
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Jul 7, 2008
1 parent 34b4fc8 commit 550aeda
Show file tree
Hide file tree
Showing 13 changed files with 103 additions and 12 deletions.
Expand Up @@ -39,7 +39,6 @@ public class SymmetricWebServer {
protected static final Log logger = LogFactory.getLog(SymmetricWebServer.class);

public void start(int port) throws Exception {

Server server = new Server();
Connector connector = new SelectChannelConnector();
connector.setPort(port);
Expand Down
Expand Up @@ -129,5 +129,11 @@ public class Constants
public static final String MAX_CONCURRENT_WORKERS = "maxConcurrentWorkers";

public static final String DEFAULT_JMX_SERVER_EXPORTER = "defaultServerExporter";

public static final String PROTOCOL_NONE = "nop";

public static final String PROTOCOL_HTTP = "http";

public static final String PROTOCOL_INTERNAL = "internal";

}
Expand Up @@ -38,6 +38,8 @@ public class ParameterConstants {
public final static String NODE_GROUP_ID = "group.id";
public final static String EXTERNAL_ID = "external.id";
public final static String SCHEMA_VERSION = "schema.version";

public static final String STATISTIC_THRESHOLD_ALERTS_ENABLED = "statistic.threshold.alerts.enabled";

@Deprecated
public final static String RUNTIME_CONFIGURATION_CLASS = "configuration.class";
Expand Down
Expand Up @@ -28,8 +28,10 @@
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.logging.Log;
import org.jumpmind.symmetric.SymmetricEngine;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IRegistrationService;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
Expand All @@ -47,12 +49,20 @@ abstract public class AbstractJob extends TimerTask implements BeanFactoryAware,

private String beanName;

private boolean requiresRegistration = true;

@Override
public void run() {
try {
if (SymmetricEngine.findEngineByName(parameterService.getString(ParameterConstants.ENGINE_NAME))
.isStarted()) {
doJob();
IRegistrationService service = (IRegistrationService) beanFactory
.getBean(Constants.REGISTRATION_SERVICE);
if (!requiresRegistration || (requiresRegistration && service.isRegisteredWithServer())) {
doJob();
} else {
getLogger().warn("Did not run job because the engine is not registered.");
}
}
} catch (final Throwable ex) {
getLogger().error(ex, ex);
Expand Down Expand Up @@ -112,4 +122,8 @@ public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
}

public void setRequiresRegistration(boolean requiresRegistration) {
this.requiresRegistration = requiresRegistration;
}

}
@@ -0,0 +1,9 @@
package org.jumpmind.symmetric.service;

import javax.management.Notification;

public interface INotificationService {

public void sendNotification(Notification event);

}
Expand Up @@ -31,6 +31,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.ddlutils.model.Table;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.IDbDialect;
import org.jumpmind.symmetric.db.SqlScript;
Expand Down Expand Up @@ -281,6 +282,8 @@ public void heartbeat() {
node.setSymmetricVersion(Version.version());
if (!StringUtils.isBlank(parameterService.getMyUrl())) {
node.setSyncURL(parameterService.getMyUrl());
} else {
node.setSyncURL(Constants.PROTOCOL_NONE + "://" + AppUtils.getServerId());
}
nodeService.updateNode(node);
logger.info("Done updating my node information and heartbeat time.");
Expand Down
@@ -0,0 +1,23 @@
package org.jumpmind.symmetric.service.jmx;

import javax.management.Notification;

import org.jumpmind.symmetric.service.INotificationService;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.jmx.export.notification.NotificationPublisher;
import org.springframework.jmx.export.notification.NotificationPublisherAware;

@ManagedResource(description = "Provide an implementation of SymmetricDS notifications by JMX")
public class NotificationService implements NotificationPublisherAware, INotificationService {

NotificationPublisher notificationPublisher;

public void setNotificationPublisher(NotificationPublisher notificationPublisher) {
this.notificationPublisher = notificationPublisher;
}

public void sendNotification(Notification event) {
this.notificationPublisher.sendNotification(event);
}

}
Expand Up @@ -23,8 +23,11 @@
import java.util.HashMap;
import java.util.Map;

import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.INotificationService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IStatisticService;

public class StatisticManager implements IStatisticManager {
Expand All @@ -35,6 +38,10 @@ public class StatisticManager implements IStatisticManager {

IStatisticService statisticService;

INotificationService notificationService;

IParameterService parameterService;

synchronized public void init() {
if (statistics == null) {
refresh(new Date());
Expand All @@ -45,10 +52,21 @@ synchronized public void flush() {
Date captureEndTime = new Date();
if (statistics != null) {
statisticService.save(statistics.values(), captureEndTime);
publishAlerts();
}
refresh(captureEndTime);
}

/**
* Compare the statistics we have cached against the configured statistic
* alert thresholds and publish alerts if we fall outside the range.
*/
synchronized protected void publishAlerts() {
if (parameterService.is(ParameterConstants.STATISTIC_THRESHOLD_ALERTS_ENABLED)) {
// TODO
}
}

synchronized protected void refresh(Date lastCaptureEndTime) {
if (statistics == null) {
statistics = new HashMap<StatisticName, Statistic>();
Expand Down Expand Up @@ -82,4 +100,12 @@ public void setNodeService(INodeService nodeService) {
public void setStatisticService(IStatisticService statisticService) {
this.statisticService = statisticService;
}

public void setNotificationService(INotificationService notificationService) {
this.notificationService = notificationService;
}

public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
}
}
Expand Up @@ -25,6 +25,7 @@
import javax.net.ssl.SSLSession;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.IParameterService;
Expand All @@ -34,17 +35,13 @@

public class TransportManagerFactoryBean implements FactoryBean {

private static final String TRANSPORT_HTTP = "http";

private static final String TRANSPORT_INTERNAL = "internal";

private INodeService nodeService;

private IParameterService parameterService;

public Object getObject() throws Exception {
String transport = parameterService.getString(ParameterConstants.TRANSPORT_TYPE);
if (TRANSPORT_HTTP.equalsIgnoreCase(transport)) {
if (Constants.PROTOCOL_HTTP.equalsIgnoreCase(transport)) {
final String httpSslVerifiedServerNames = parameterService
.getString(ParameterConstants.TRANSPORT_HTTPS_VERIFIED_SERVERS);
HttpsURLConnection.setDefaultHostnameVerifier(new HostnameVerifier() {
Expand All @@ -61,7 +58,7 @@ public boolean verify(String s, SSLSession sslsession) {
}
});
return new HttpTransportManager(nodeService, parameterService);
} else if (TRANSPORT_INTERNAL.equalsIgnoreCase(transport)) {
} else if (Constants.PROTOCOL_INTERNAL.equalsIgnoreCase(transport)) {
return new InternalTransportManager(parameterService);
} else {
throw new IllegalStateException("An invalid transport type of " + transport + " was specified.");
Expand Down
6 changes: 5 additions & 1 deletion symmetric/src/main/resources/symmetric-default.properties
Expand Up @@ -263,4 +263,8 @@ transport.type=http
hsqldb.initialize.db=true

# Specify the type of line feed to use in JMX console methods. Possible values are: text or html.
jmx.line.feed=text
jmx.line.feed=text

# Specify whether we should publish alerts if any of the statistic flushes fall outside a specified
# threshold range.
statistic.threshold.alerts.enabled=false
11 changes: 8 additions & 3 deletions symmetric/src/main/resources/symmetric-jmx.xml
Expand Up @@ -16,6 +16,7 @@
<entry key=":name=Incoming" value-ref="incomingManagementService" />
<entry key=":name=Outgoing" value-ref="outgoingManagementService" />
<entry key=":name=Parameters" value-ref="parameterManagementService" />
<entry key=":name=Notifications" value-ref="notificationService" />
</map>
</property>
<property name="registrationBehaviorName" value="REGISTRATION_IGNORE_EXISTING" />
Expand All @@ -29,8 +30,9 @@
<entry key="org.jumpmind.symmetric.${engine.name}:name=Node" value-ref="nodeManagementService" />
<entry key="org.jumpmind.symmetric.${engine.name}:name=Incoming" value-ref="incomingManagementService" />
<entry key="org.jumpmind.symmetric.${engine.name}:name=Outgoing" value-ref="outgoingManagementService" />
<entry key="org.jumpmind.symmetric.${engine.name}:name=Parameters" value-ref="parameterManagementService" />

<entry key="org.jumpmind.symmetric.${engine.name}:name=Parameters"
value-ref="parameterManagementService" />

</map>
</property>
<property name="registrationBehaviorName" value="REGISTRATION_IGNORE_EXISTING" />
Expand Down Expand Up @@ -69,13 +71,16 @@
<property name="dataExtractorService" ref="dataExtractorService" />
<property name="statisticManager" ref="statisticManager" />
<property name="parameterService" ref="parameterService" />
<property name="concurrentConnectionManager" ref="concurrentConnectionManager" />
<property name="concurrentConnectionManager" ref="concurrentConnectionManager" />
</bean>

<bean id="incomingManagementService" class="org.jumpmind.symmetric.service.jmx.IncomingManagementService">
<property name="statisticManager" ref="statisticManager" />
</bean>

<bean id="notificationService" class="org.jumpmind.symmetric.service.jmx.NotificationService"></bean>


<bean id="outgoingManagementService" class="org.jumpmind.symmetric.service.jmx.OutgoingManagementService">
<property name="statisticManager" ref="statisticManager" />
</bean>
Expand Down
1 change: 1 addition & 0 deletions symmetric/src/main/resources/symmetric-jobs.xml
Expand Up @@ -60,6 +60,7 @@ http://www.springframework.org/schema/util http://www.springframework.org/schema
</bean>

<bean id="pullTimerTask" class="org.jumpmind.symmetric.job.PullJob" scope="prototype">
<property name="requiresRegistration" value="false"/>
<property name="pullService" ref="pullService" />
<property name="needsRescheduled" value="true" />
<property name="parameterService" ref="parameterService" />
Expand Down
2 changes: 2 additions & 0 deletions symmetric/src/main/resources/symmetric-services.xml
Expand Up @@ -18,6 +18,8 @@
<bean id="statisticManager" class="org.jumpmind.symmetric.statistic.StatisticManager">
<property name="statisticService" ref="statisticService"/>
<property name="nodeService" ref="nodeService"/>
<property name="notificationService" ref="notificationService"/>
<property name="parameterService" ref="parameterService"/>
</bean>

<bean id="concurrentConnectionManager" class="org.jumpmind.symmetric.transport.ConcurrentConnectionManager">
Expand Down

0 comments on commit 550aeda

Please sign in to comment.