Skip to content

Commit

Permalink
0002311: Don't use 503 when SymmetricDS is unavailable as some proxies
Browse files Browse the repository at this point in the history
return that as well.
0002312: Return different error codes for when an engine is stopped or
starting versus the server is too busy

Conflicts:
	symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java
  • Loading branch information
chenson42 committed Sep 24, 2015
1 parent 65ece36 commit 48d1f2d
Show file tree
Hide file tree
Showing 14 changed files with 113 additions and 50 deletions.
Expand Up @@ -29,10 +29,11 @@
*/
final public class Constants {

private Constants() {
private Constants() {

}

public static final String SYSTEM_USER = "system";

public static final String SYSTEM_USER = "system";

public static final long VIRTUAL_BATCH_FOR_REGISTRATION = IoConstants.IGNORE_TABLES_BATCH;

Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.jumpmind.symmetric.service.RegistrationRequiredException;
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;

/**
Expand All @@ -63,6 +64,9 @@ protected void fireOffline(Exception error, Node remoteNode, RemoteNodeStatus st
if (isOffline(error)) {
log.warn("Could not communicate with {} at {} because: {}", new Object[] {remoteNode, syncUrl, cause.getMessage()});
status.setStatus(Status.OFFLINE);
} else if (isServiceUnavailable(error)) {
log.info("{} at {} was unavailable", new Object[] {remoteNode, syncUrl});
status.setStatus(Status.OFFLINE);
} else if (isBusy(error)) {
log.info("{} at {} was busy", new Object[] {remoteNode, syncUrl});
status.setStatus(Status.BUSY);
Expand Down Expand Up @@ -142,6 +146,16 @@ protected boolean isBusy(Exception ex) {
return offline;
}

protected boolean isServiceUnavailable(Exception ex){
boolean offline = false;
if (ex != null) {
Throwable cause = ExceptionUtils.getRootCause(ex);
offline = ex instanceof ServiceUnavailableException ||
cause instanceof ServiceUnavailableException;
}
return offline;
}

protected boolean isSyncDisabled(Exception ex) {
boolean syncDisabled = false;
if (ex != null) {
Expand Down
Expand Up @@ -115,6 +115,7 @@
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.IIncomingTransport;
import org.jumpmind.symmetric.transport.ITransportManager;
import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;
import org.jumpmind.symmetric.transport.TransportException;
import org.jumpmind.symmetric.transport.http.HttpTransportManager;
Expand Down Expand Up @@ -495,6 +496,8 @@ protected void logAndRethrow(Node remoteNode, Throwable ex) throws IOException {
log.warn("Registration attempt failed. Registration was not open");
} else if (ex instanceof ConnectionRejectedException) {
throw (ConnectionRejectedException) ex;
} else if (ex instanceof ServiceUnavailableException) {
throw (ServiceUnavailableException) ex;
} else if (ex instanceof AuthenticationException) {
log.warn("Could not authenticate with node '{}'",
remoteNode != null ? remoteNode.getNodeId() : "?");
Expand Down
Expand Up @@ -45,10 +45,7 @@
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IPullService;
import org.jumpmind.symmetric.service.IRegistrationService;
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.SyncDisabledException;
import org.jumpmind.symmetric.transport.TransportException;
import org.jumpmind.symmetric.transport.OfflineException;

/**
* @see IPullService
Expand Down Expand Up @@ -121,7 +118,6 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
Node node = nodeCommunication.getNode();
if (StringUtils.isNotBlank(node.getSyncUrl()) ||
!parameterService.isRegistrationServer()) {
try {
int pullCount = 0;
long batchesProcessedCount = 0;
do {
Expand All @@ -131,8 +127,26 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
if (pullCount > 1) {
log.info("Immediate pull requested while in reload mode");
}


try {
dataLoaderService.loadDataFromPull(node, status);
} catch (ConnectException ex) {
log.warn(
"Failed to connect to the transport: {}",
(node.getSyncUrl() == null ? parameterService.getRegistrationUrl() : node
.getSyncUrl()));
fireOffline(ex, node, status);
} catch (OfflineException ex) {
fireOffline(ex, node, status);
} catch (UnknownHostException ex) {
fireOffline(ex, node, status);
} catch (SocketException ex) {
log.warn("{}", ex.getMessage());
fireOffline(ex, node, status);
} catch (IOException ex) {
log.error("An IO exception happened while attempting to pull data", ex);
fireOffline(ex, node, status);
}

if (!status.failed() &&
(status.getDataProcessed() > 0 || status.getBatchesProcessed() > 0)) {
Expand All @@ -154,30 +168,7 @@ public void execute(NodeCommunication nodeCommunication, RemoteNodeStatus status
*/
} while (nodeService.isDataLoadStarted() && !status.failed()
&& status.getBatchesProcessed() > batchesProcessedCount);
} catch (ConnectException ex) {
log.warn(
"Failed to connect to the transport: {}",
(node.getSyncUrl() == null ? parameterService.getRegistrationUrl() : node
.getSyncUrl()));
fireOffline(ex, node, status);
} catch (ConnectionRejectedException ex) {
fireOffline(ex, node, status);
} catch (AuthenticationException ex) {
fireOffline(ex, node, status);
} catch (UnknownHostException ex) {
fireOffline(ex, node, status);
} catch (SyncDisabledException ex) {
fireOffline(ex, node, status);
} catch (SocketException ex) {
log.warn("{}", ex.getMessage());
fireOffline(ex, node, status);
} catch (TransportException ex) {
log.warn("{}", ex.getMessage());
fireOffline(ex, node, status);
} catch (IOException ex) {
log.error("An IO exception happened while attempting to pull data", ex);
fireOffline(ex, node, status);
}

} else {
log.warn("Cannot pull node '{}' in the group '{}'. The sync url is blank",
node.getNodeId(), node.getNodeGroupId());
Expand Down
Expand Up @@ -20,9 +20,8 @@
*/
package org.jumpmind.symmetric.transport;

import org.jumpmind.exception.IoException;

public class AuthenticationException extends IoException {
public class AuthenticationException extends OfflineException {

private static final long serialVersionUID = -6322765147037755510L;

Expand Down
Expand Up @@ -20,9 +20,8 @@
*/
package org.jumpmind.symmetric.transport;

import org.jumpmind.exception.IoException;

public class ConnectionRejectedException extends IoException {
public class ConnectionRejectedException extends OfflineException {

private static final long serialVersionUID = 3770259092569043530L;

Expand Down
@@ -0,0 +1,21 @@
package org.jumpmind.symmetric.transport;

import org.jumpmind.exception.IoException;

public class OfflineException extends IoException {

private static final long serialVersionUID = 1L;

public OfflineException() {
super();
}

public OfflineException(Exception e) {
super(e);
}

public OfflineException(String msg, Object... args) {
super(msg, args);
}

}
@@ -0,0 +1,28 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.transport;


public class ServiceUnavailableException extends OfflineException {

private static final long serialVersionUID = 3770259092569043530L;

}
Expand Up @@ -20,9 +20,8 @@
*/
package org.jumpmind.symmetric.transport;

import org.jumpmind.exception.IoException;

public class SyncDisabledException extends IoException {
public class SyncDisabledException extends OfflineException {

private static final long serialVersionUID = 8646739179107142193L;
}
Expand Up @@ -22,7 +22,7 @@

import java.io.IOException;

public class TransportException extends RuntimeException {
public class TransportException extends OfflineException {

private static final long serialVersionUID = -6127189404858972114L;

Expand Down
Expand Up @@ -26,7 +26,7 @@
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;


import org.apache.commons.io.IOUtils;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.IParameterService;
Expand All @@ -35,6 +35,7 @@
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.IIncomingTransport;
import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;
import org.jumpmind.symmetric.transport.TransportUtils;
import org.jumpmind.symmetric.web.WebConstants;
Expand Down Expand Up @@ -96,9 +97,11 @@ public InputStream openStream() throws IOException {
case WebConstants.REGISTRATION_REQUIRED:
throw new RegistrationRequiredException();
case WebConstants.SYNC_DISABLED:
throw new SyncDisabledException();
case WebConstants.SC_SERVICE_UNAVAILABLE:
throw new SyncDisabledException();
case WebConstants.SC_SERVICE_BUSY:
throw new ConnectionRejectedException();
case WebConstants.SC_SERVICE_UNAVAILABLE:
throw new ServiceUnavailableException();
case WebConstants.SC_FORBIDDEN:
throw new AuthenticationException();
default:
Expand Down
Expand Up @@ -40,6 +40,7 @@
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;
import org.jumpmind.symmetric.web.WebConstants;

Expand Down Expand Up @@ -252,8 +253,10 @@ public BufferedWriter openWriter() {
* @throws {@link AuthenticationException}
*/
private void analyzeResponseCode(int code) throws IOException {
if (WebConstants.SC_SERVICE_UNAVAILABLE == code) {
if (WebConstants.SC_SERVICE_BUSY == code) {
throw new ConnectionRejectedException();
} else if (WebConstants.SC_SERVICE_UNAVAILABLE == code) {
throw new ServiceUnavailableException();
} else if (WebConstants.SC_FORBIDDEN == code) {
throw new AuthenticationException();
} else if (WebConstants.SYNC_DISABLED == code) {
Expand Down
Expand Up @@ -53,9 +53,11 @@ public class WebConstants {

public static final int SYNC_DISABLED = 658;

public static final int SC_FORBIDDEN = 403;
public static final int SC_FORBIDDEN = 659;

public static final int SC_SERVICE_UNAVAILABLE = 503;
public static final int SC_SERVICE_UNAVAILABLE = 660;

public static final int SC_SERVICE_BUSY = 670;

public static final String ACK_BATCH_NAME = "batch-";

Expand Down
Expand Up @@ -73,14 +73,14 @@ public boolean before(HttpServletRequest req, HttpServletResponse resp) throws I
if (!concurrentConnectionManager
.reserveConnection(nodeId, poolId, ReservationType.SOFT)) {
statisticManager.incrementNodesRejected(1);
ServletUtils.sendError(resp, WebConstants.SC_SERVICE_UNAVAILABLE);
ServletUtils.sendError(resp, WebConstants.SC_SERVICE_BUSY);
} else {
try {
buildSuspendIgnoreResponseHeaders(nodeId, resp);
} catch (Exception ex) {
concurrentConnectionManager.releaseConnection(nodeId, poolId);
log.error("Error building response headers", ex);
ServletUtils.sendError(resp, WebConstants.SC_SERVICE_UNAVAILABLE);
ServletUtils.sendError(resp, WebConstants.SC_SERVICE_BUSY);
}
}
return false;
Expand All @@ -92,13 +92,13 @@ public boolean before(HttpServletRequest req, HttpServletResponse resp) throws I
} catch (Exception ex) {
concurrentConnectionManager.releaseConnection(nodeId, poolId);
log.error("Error building response headers", ex);
ServletUtils.sendError(resp, WebConstants.SC_SERVICE_UNAVAILABLE);
ServletUtils.sendError(resp, WebConstants.SC_SERVICE_BUSY);
return false;
}

} else {
statisticManager.incrementNodesRejected(1);
ServletUtils.sendError(resp, WebConstants.SC_SERVICE_UNAVAILABLE);
ServletUtils.sendError(resp, WebConstants.SC_SERVICE_BUSY);
return false;
}
}
Expand Down

0 comments on commit 48d1f2d

Please sign in to comment.