Skip to content

Commit

Permalink
SYMMETRICDS-173 - offline watchdog only monitors direct children. nod…
Browse files Browse the repository at this point in the history
…e security is now deleted on server for an offline node. New sync disabled return code is sent to the client. Default client handling is to remove node identity to allow re-registration. Integration test added.
  • Loading branch information
IntervalsPro committed Jan 26, 2010
1 parent a92f8ba commit 5108b21
Show file tree
Hide file tree
Showing 25 changed files with 400 additions and 67 deletions.
Expand Up @@ -29,7 +29,7 @@
import org.jumpmind.symmetric.config.IParameterFilter;
import org.jumpmind.symmetric.config.ITriggerCreationListener;
import org.jumpmind.symmetric.extract.IExtractorFilter;
import org.jumpmind.symmetric.io.IOfflineListener;
import org.jumpmind.symmetric.io.IOfflineClientListener;
import org.jumpmind.symmetric.load.IBatchListener;
import org.jumpmind.symmetric.load.IColumnFilter;
import org.jumpmind.symmetric.load.IDataLoaderFilter;
Expand Down Expand Up @@ -202,14 +202,14 @@ private void registerExtension(String beanName, IExtensionPoint ext) {
routingService.addBatchAlgorithm(beanName, (IBatchAlgorithm) ext);
}

if (ext instanceof IOfflineListener) {
if (ext instanceof IOfflineClientListener) {
for(IOfflineDetectorService service : offlineDetectorServices) {
service.addOfflineListener((IOfflineListener)ext);
service.addOfflineListener((IOfflineClientListener)ext);
}
}

if (ext instanceof IOfflineNodeHandler) {
nodeService.setOfflineNodeHandler((IOfflineNodeHandler)ext);
if (ext instanceof IOfflineServerListener) {
nodeService.addOfflineServerListener((IOfflineServerListener)ext);
}
}

Expand Down
Expand Up @@ -29,6 +29,6 @@
* to be offline. An offline node has a heartbeat older than a
* configured amount of time.
*/
public interface IOfflineNodeHandler extends IExtensionPoint {
public void handleOfflineNodes(List<Node> offlineNodeList);
public interface IOfflineServerListener extends IExtensionPoint {
public void clientNodeOffline(Node node);
}
@@ -0,0 +1,66 @@
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*/
package org.jumpmind.symmetric.io;

import org.jumpmind.symmetric.common.logging.ILog;
import org.jumpmind.symmetric.common.logging.LogFactory;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.impl.NodeService;

public class DefaultOfflineClientListener implements IOfflineClientListener {
protected ILog log = LogFactory.getLog(getClass());
protected IParameterService parameterService;


private NodeService nodeService;


public void busy(Node remoteNode) {
log.warn("TransportFailedConnectionBusy");
}

public void notAuthenticated(Node remoteNode) {
log.warn("AuthenticationFailed");
}

public void offline(Node remoteNode) {
log.warn("TransportFailedConnectionUnavailable",
(remoteNode.getSyncUrl() == null ? parameterService.getRegistrationUrl() : remoteNode
.getSyncUrl()));
}

public void syncDisabled(Node remoteNode) {
log.warn("SyncDisabledException");
nodeService.deleteIdentity();
}

public boolean isAutoRegister() {
return true;
}

public void setNodeService(NodeService nodeService) {
this.nodeService = nodeService;
}

public void setParameterService(IParameterService parameterService) {
this.parameterService = parameterService;
}
}
Expand Up @@ -26,7 +26,7 @@
* This is an extension point that is called when the current instance has detected it cannot sync with another
* {@link Node}.
*/
public interface IOfflineListener extends IExtensionPoint {
public interface IOfflineClientListener extends IExtensionPoint {

/**
* Called when the remote node is unreachable.
Expand All @@ -42,5 +42,12 @@ public interface IOfflineListener extends IExtensionPoint {
* Called when this node has been rejected because the remote node is currently too busy to handle the sync request.
*/
public void busy(Node remoteNode);

/**
* Called when this node is rejected because synchronization is disabled on the remoteNode
*
* @param remoteNode
*/
public void syncDisabled(Node remoteNode);

}
Expand Up @@ -24,32 +24,53 @@

import org.jumpmind.symmetric.common.logging.ILog;
import org.jumpmind.symmetric.common.logging.LogFactory;
import org.jumpmind.symmetric.ext.IOfflineNodeHandler;
/*
* SymmetricDS is an open source database synchronization solution.
*
* Copyright (C) Chris Henson <chenson42@users.sourceforge.net>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 3 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*/
import org.jumpmind.symmetric.ext.IOfflineServerListener;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.impl.OutgoingBatchService;

/**
* A default implementation of the Offline Node Handler. Syncing
* is disabled for the node and cleanup processing is done for
* outgoing batches.
* A default implementation of the Offline Server Listener.
*
* @author Jeff Bailey
*
*/
public class DefaultOfflineNodeHandler implements IOfflineNodeHandler {
public class DefaultOfflineServerListener implements IOfflineServerListener {

protected INodeService nodeService;
protected OutgoingBatchService outgoingBatchService;
protected static final ILog log = LogFactory.getLog(DefaultOfflineNodeHandler.class);
protected static final ILog log = LogFactory.getLog(DefaultOfflineServerListener.class);

public void handleOfflineNodes(List<Node> offlineNodeList) {
for (Node node : offlineNodeList) {
log.info("NodeOffline", node.getNodeId(), node.getHeartbeatTime());
node.setSyncEnabled(false);
nodeService.updateNode(node);
outgoingBatchService.markAllAsSentForNode(node);
}
/**
* Handle a client node that was determined to be offline.
* Syncing is disabled for the node, node security is deleted, and cleanup processing is done for
* outgoing batches.
*/
public void clientNodeOffline(Node node) {
log.warn("NodeOffline", node.getNodeId(), node.getHeartbeatTime(), node.getTimezoneOffset());
node.setSyncEnabled(false);
nodeService.updateNode(node);
outgoingBatchService.markAllAsSentForNode(node);
nodeService.deleteNodeSecurity(node.getNodeId());
}

public boolean isAutoRegister() {
Expand Down
Expand Up @@ -22,11 +22,12 @@

package org.jumpmind.symmetric.service;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import org.jumpmind.symmetric.config.INodeIdGenerator;
import org.jumpmind.symmetric.ext.IOfflineNodeHandler;
import org.jumpmind.symmetric.ext.IOfflineServerListener;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeSecurity;
Expand Down Expand Up @@ -55,6 +56,8 @@ public interface INodeService {
public NodeSecurity findNodeSecurity(String nodeId);

public NodeSecurity findNodeSecurity(String nodeId, boolean createIfNotFound);

public void deleteNodeSecurity(String nodeId);

public String findSymmetricVersion();

Expand All @@ -71,6 +74,8 @@ public interface INodeService {
public Node findIdentity();

public Node findIdentity(boolean useCache);

public void deleteIdentity();

public List<Node> findNodesToPull();

Expand Down Expand Up @@ -132,10 +137,7 @@ public interface INodeService {
*/
public List<Node> findOfflineNodes();

/**
* Set the handler for offline nodes.
*
* @param offlineNodeHandler
*/
public void setOfflineNodeHandler(IOfflineNodeHandler offlineNodeHandler);
public void addOfflineServerListener(IOfflineServerListener listener);

public boolean removeOfflineServerListener(IOfflineServerListener listener);
}
@@ -1,11 +1,11 @@
package org.jumpmind.symmetric.service;

import org.jumpmind.symmetric.io.IOfflineListener;
import org.jumpmind.symmetric.io.IOfflineClientListener;

public interface IOfflineDetectorService {

public void addOfflineListener(IOfflineListener listener);
public void addOfflineListener(IOfflineClientListener listener);

public boolean removeOfflineListener(IOfflineListener listener);
public boolean removeOfflineListener(IOfflineClientListener listener);

}
Expand Up @@ -25,28 +25,29 @@
import java.util.List;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.jumpmind.symmetric.io.IOfflineListener;
import org.jumpmind.symmetric.io.IOfflineClientListener;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IOfflineDetectorService;
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.SyncDisabledException;

public abstract class AbstractOfflineDetectorService extends AbstractService implements IOfflineDetectorService {

private List<IOfflineListener> offlineListeners;
private List<IOfflineClientListener> offlineListeners;

public void setOfflineListeners(List<IOfflineListener> listeners) {
public void setOfflineListeners(List<IOfflineClientListener> listeners) {
this.offlineListeners = listeners;
}

public void addOfflineListener(IOfflineListener listener) {
public void addOfflineListener(IOfflineClientListener listener) {
if (offlineListeners == null) {
offlineListeners = new ArrayList<IOfflineListener>();
offlineListeners = new ArrayList<IOfflineClientListener>();
}
offlineListeners.add(listener);
}

public boolean removeOfflineListener(IOfflineListener listener) {
public boolean removeOfflineListener(IOfflineClientListener listener) {
if (offlineListeners != null) {
return offlineListeners.remove(listener);
} else {
Expand All @@ -56,13 +57,15 @@ public boolean removeOfflineListener(IOfflineListener listener) {

protected void fireOffline(Exception error, Node remoteNode) {
if (offlineListeners != null) {
for (IOfflineListener listener : offlineListeners) {
for (IOfflineClientListener listener : offlineListeners) {
if (isOffline(error)) {
listener.offline(remoteNode);
} else if (isBusy(error)) {
listener.busy(remoteNode);
} else if (isNotAuthenticated(error)) {
listener.notAuthenticated(remoteNode);
} else if (isSyncDisabled(error)) {
listener.syncDisabled(remoteNode);
}
}
}
Expand Down Expand Up @@ -101,4 +104,16 @@ protected boolean isBusy(Exception ex) {
}
return offline;
}

protected boolean isSyncDisabled(Exception ex) {
boolean offline = false;
if (ex != null) {
Throwable cause = ExceptionUtils.getRootCause(ex);
offline = cause instanceof SyncDisabledException;
if (offline == false && ex instanceof SyncDisabledException) {
offline = true;
}
}
return offline;
}
}
Expand Up @@ -64,6 +64,7 @@
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.IIncomingTransport;
import org.jumpmind.symmetric.transport.ITransportManager;
import org.jumpmind.symmetric.transport.SyncDisabledException;
import org.jumpmind.symmetric.transport.TransportException;
import org.jumpmind.symmetric.transport.file.FileIncomingTransport;
import org.jumpmind.symmetric.transport.internal.InternalIncomingTransport;
Expand Down Expand Up @@ -230,6 +231,9 @@ protected List<IncomingBatch> loadDataAndReturnBatches(IIncomingTransport transp
throw ex;
} catch (AuthenticationException ex) {
log.warn("AuthenticationFailed");
} catch (SyncDisabledException ex) {
log.warn("SyncDisabledException");
throw ex;
} catch (Throwable e) {
if (dataLoader != null && dataLoader.getContext().getBatchId() > 0 && batch == null) {
batch = new IncomingBatch(dataLoader.getContext());
Expand Down

0 comments on commit 5108b21

Please sign in to comment.