Skip to content

Commit

Permalink
0002791: Every network error is logged and alerts user on Dashboard
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Sep 15, 2016
1 parent a7704ba commit 74d6e1a
Show file tree
Hide file tree
Showing 13 changed files with 173 additions and 127 deletions.
Expand Up @@ -209,6 +209,7 @@ private ParameterConstants() {
public final static String TRANSPORT_HTTP_BASIC_AUTH_PASSWORD = "http.basic.auth.password";
public final static String TRANSPORT_TYPE = "transport.type";
public final static String TRANSPORT_MAX_BYTES_TO_SYNC = "transport.max.bytes.to.sync";
public final static String TRANSPORT_MAX_ERROR_MILLIS = "transport.max.error.millis";

public final static String CACHE_TIMEOUT_GROUPLETS_IN_MS = "cache.grouplets.time.ms";
public final static String CACHE_TIMEOUT_NODE_SECURITY_IN_MS = "cache.node.security.time.ms";
Expand Down
Expand Up @@ -44,20 +44,15 @@ public DefaultOfflineClientListener(IParameterService parameterService,
}

public void busy(Node remoteNode) {
log.info("Node '{}' was too busy to accept the connection", remoteNode.getNodeId());
}

public void notAuthenticated(Node remoteNode) {
log.warn("Could not authenticate with node '{}'", remoteNode.getNodeId());
}

public void unknownError(Node remoteNode, Exception ex) {
}

public void offline(Node remoteNode) {
log.warn("Failed to connect to the transport: {}",
(remoteNode.getSyncUrl() == null ? parameterService.getRegistrationUrl() : remoteNode
.getSyncUrl()));
}

public void syncDisabled(Node remoteNode) {
Expand All @@ -70,7 +65,6 @@ public void syncDisabled(Node remoteNode) {
}

public void registrationRequired(Node remoteNode) {
log.warn("Registration is required before this operation can complete");
}

}
Expand Up @@ -24,9 +24,13 @@
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.exception.ExceptionUtils;
import org.jumpmind.exception.HttpException;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.IOfflineClientListener;
import org.jumpmind.symmetric.model.Node;
Expand All @@ -45,15 +49,21 @@
* Abstract service that provides help methods for detecting offline status.
*/
public abstract class AbstractOfflineDetectorService extends AbstractService implements IOfflineDetectorService {

protected IExtensionService extensionService;

private Map<String, Long> transportErrorTimeByNode = new HashMap<String, Long>();

public AbstractOfflineDetectorService(IParameterService parameterService,
ISymmetricDialect symmetricDialect, IExtensionService extensionService) {
super(parameterService, symmetricDialect);
this.extensionService = extensionService;
}

protected void fireOnline(Node remoteNode, RemoteNodeStatus status) {
transportErrorTimeByNode.remove(remoteNode.getNodeId());
}

protected void fireOffline(Exception error, Node remoteNode, RemoteNodeStatus status) {
String syncUrl = remoteNode.getSyncUrl() == null ? parameterService.getRegistrationUrl() : remoteNode
.getSyncUrl();
Expand All @@ -62,23 +72,46 @@ protected void fireOffline(Exception error, Node remoteNode, RemoteNodeStatus st
cause = error;
}
if (isOffline(error)) {
log.warn("Could not communicate with {} at {} because: {}", new Object[] {remoteNode, syncUrl, cause.getMessage()});
String reason = cause.getMessage();
if (reason == null) {
reason = cause.getClass().getName();
}
if (shouldLogTransportError(remoteNode.getNodeId())) {
log.warn("Could not communicate with {} at {} because: {}", new Object[] {remoteNode, syncUrl, reason});
} else {
log.info("Could not communicate with {} at {} because: {}", new Object[] {remoteNode, syncUrl, reason});
}
status.setStatus(Status.OFFLINE);
} else if (isServiceUnavailable(error)) {
log.info("{} at {} was unavailable", new Object[] {remoteNode, syncUrl});
if (shouldLogTransportError(remoteNode.getNodeId())) {
log.warn("{} at {} was unavailable.", new Object[] {remoteNode, syncUrl});
} else {
log.info("{} at {} was unavailable. It may be starting up.", new Object[] {remoteNode, syncUrl});
}
status.setStatus(Status.OFFLINE);
} else if (isBusy(error)) {
log.info("{} at {} was busy", new Object[] {remoteNode, syncUrl});
if (shouldLogTransportError(remoteNode.getNodeId())) {
log.warn("{} at {} was busy", new Object[] {remoteNode, syncUrl});
} else {
log.info("{} at {} was busy", new Object[] {remoteNode, syncUrl});
}
status.setStatus(Status.BUSY);
} else if (isNotAuthenticated(error)) {
log.info("{} at {} was not authorized", new Object[] {remoteNode, syncUrl});
log.warn("{} at {} was not authorized", new Object[] {remoteNode, syncUrl});
status.setStatus(Status.NOT_AUTHORIZED);
} else if (isSyncDisabled(error)) {
log.info("Sync was not enabled for {} at {}", new Object[] {remoteNode, syncUrl});
log.warn("Sync was not enabled for {} at {}", new Object[] {remoteNode, syncUrl});
status.setStatus(Status.SYNC_DISABLED);
} else if (isRegistrationRequired(error)) {
log.info("Registration was not open at {}", new Object[] {remoteNode, syncUrl});
log.warn("Registration was not open at {}", new Object[] {remoteNode, syncUrl});
status.setStatus(Status.REGISTRATION_REQUIRED);
} else if (getHttpException(error) != null) {
HttpException http = getHttpException(error);
if (shouldLogTransportError(remoteNode.getNodeId())) {
log.warn("Could not communicate with node '{}' at {} because it returned HTTP code {}", remoteNode, syncUrl, http.getCode());
} else {
log.info("Could not communicate with node '{}' at {} because it returned HTTP code {}", remoteNode, syncUrl, http.getCode());
}
} else {
log.warn(String.format("Could not communicate with node '%s' at %s because of unexpected error", remoteNode, syncUrl), error);
status.setStatus(Status.UNKNOWN_ERROR);
Expand All @@ -104,6 +137,16 @@ protected void fireOffline(Exception error, Node remoteNode, RemoteNodeStatus st
}
}

protected boolean shouldLogTransportError(String nodeId) {
long maxErrorMillis = parameterService.getLong(ParameterConstants.TRANSPORT_MAX_ERROR_MILLIS, 300000);
Long errorTime = transportErrorTimeByNode.get(nodeId);
if (errorTime == null) {
errorTime = System.currentTimeMillis();
transportErrorTimeByNode.put(nodeId, errorTime);
}
return System.currentTimeMillis() - errorTime >= maxErrorMillis;
}

/**
* Check to see if the {@link Exception} was caused by an offline scenario.
*
Expand Down Expand Up @@ -179,4 +222,18 @@ protected boolean isRegistrationRequired(Exception ex) {
}
return registrationRequired;
}

protected HttpException getHttpException(Exception ex) {
HttpException exception = null;
if (ex != null) {
Throwable cause = ExceptionUtils.getRootCause(ex);
if (cause instanceof HttpException) {
exception = (HttpException) cause;
} else if (ex instanceof HttpException) {
exception = (HttpException) ex;
}
}
return exception;
}

}
Expand Up @@ -110,9 +110,8 @@ public BatchAckResult ack(final BatchAck batch) {
}

if (status == Status.ER) {
log.error(
"The outgoing batch {} failed: {}",
outgoingBatch.getNodeBatchId(), batch.getSqlMessage() != null ? ". " + batch.getSqlMessage() : "");
log.error("The outgoing batch {} failed: {}{}", outgoingBatch.getNodeBatchId(),
(batch.getSqlCode() != 0 ? "[" + batch.getSqlState() + "," + batch.getSqlCode() + "] " : ""), batch.getSqlMessage());
RouterStats routerStats = engine.getStatisticManager().getRouterStatsByBatch(batch.getBatchId());
if (routerStats != null) {
log.info("Router stats for batch " + outgoingBatch.getBatchId() + ": " + routerStats.toString());
Expand Down
Expand Up @@ -693,18 +693,18 @@ public FutureOutgoingBatch call() throws Exception {
}
outgoingBatchService.updateOutgoingBatch(currentBatch);

if (isStreamClosedByClient(e)) {
log.warn(
"Failed to transport batch {}. The stream was closed by the client. There is a good chance that a previously sent batch errored out and the stream was closed or there was a network error. The error was: {}",
currentBatch, getRootMessage(e));
} else {
if (!isStreamClosedByClient(e)) {
if (e instanceof ProtocolException) {
IStagedResource resource = getStagedResource(currentBatch);
if (resource != null) {
resource.delete();
}
}
log.error("Failed to extract batch {}", currentBatch, e);
if (e.getCause() instanceof InterruptedException) {
log.info("Extract of batch {} was interrupted", currentBatch);
} else {
log.error("Failed to extract batch {}", currentBatch, e);
}
}
processInfo.setStatus(ProcessInfo.Status.ERROR);
} else {
Expand Down
Expand Up @@ -139,8 +139,6 @@
*/
public class DataLoaderService extends AbstractService implements IDataLoaderService {

private static final int MAX_NETWORK_ERROR_FOR_LOGGING = 5;

private IIncomingBatchService incomingBatchService;

private IConfigurationService configurationService;
Expand Down Expand Up @@ -169,8 +167,6 @@ public class DataLoaderService extends AbstractService implements IDataLoaderSer

private Date lastUpdateTime;

private Map<String, Integer> errorCountByNode = new HashMap<String, Integer>();

public DataLoaderService(ISymmetricEngine engine) {
super(engine.getParameterService(), engine.getSymmetricDialect());
this.incomingBatchService = engine.getIncomingBatchService();
Expand Down Expand Up @@ -342,13 +338,6 @@ public void loadDataFromPull(Node remote, RemoteNodeStatus status) throws IOExce
log.error("", e);
}
throw e;
} catch (ServiceUnavailableException e) {
if (remote.getNodeId() == null) {
log.info("Service is unavailable for registration with remote node. It may be starting up.");
} else {
log.info("Service is unavailable for remote node " + remote.getNodeGroupId() + "-" + remote.getNodeId() +
". It may be starting up.");
}
}
}

Expand Down Expand Up @@ -408,6 +397,8 @@ public void loadDataFromPush(Node sourceNode, String channelId, InputStream in,
processInfo.setStatus(ProcessInfo.Status.ERROR);
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else if (e instanceof IOException) {
throw (IOException) e;
}
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -534,10 +525,9 @@ protected IDataWriter chooseDataWriter(Batch batch) {
};
processor.process(ctx);
}
errorCountByNode.remove(sourceNode.getNodeId());
} catch (Throwable ex) {
error = ex;
logAndRethrow(sourceNode, ex);
logAndRethrow(ex);
} finally {
transport.close();

Expand All @@ -549,52 +539,27 @@ protected IDataWriter chooseDataWriter(Batch batch) {
return listener.getBatchesProcessed();
}

protected void logAndRethrow(Node remoteNode, Throwable ex) throws IOException {
protected void logAndRethrow(Throwable ex) throws IOException {
if (ex instanceof RegistrationRequiredException) {
throw (RegistrationRequiredException) ex;
} else if (ex instanceof ConnectException) {
throw (ConnectException) ex;
} else if (ex instanceof UnknownHostException) {
log.warn("Could not connect to the transport because the host was unknown: '{}'",
ex.getMessage());
throw (UnknownHostException) ex;
} else if (ex instanceof RegistrationNotOpenException) {
log.warn("Registration attempt failed. Registration was not open");
throw (RegistrationNotOpenException) ex;
} else if (ex instanceof ConnectionRejectedException) {
throw (ConnectionRejectedException) ex;
} else if (ex instanceof ServiceUnavailableException) {
throw (ServiceUnavailableException) ex;
} else if (ex instanceof AuthenticationException) {
log.warn("Could not authenticate with node '{}'",
remoteNode != null ? remoteNode.getNodeId() : "?");
throw (AuthenticationException) ex;
} else if (ex instanceof SyncDisabledException) {
log.warn("Synchronization is disabled on the server node");
throw (SyncDisabledException) ex;
} else if (ex instanceof IOException) {
Integer errorCount = errorCountByNode.get(remoteNode.getNodeId());
if (errorCount == null) {
errorCount = 1;
}
if (errorCount >= MAX_NETWORK_ERROR_FOR_LOGGING) {
if (ex.getMessage() != null && !ex.getMessage().startsWith("http")) {
log.error("Failed while reading batch because: {}", ex.getMessage());
} else {
log.error("Failed while reading batch because: {}", ex.getMessage(), ex);
}
} else {
if (ex.getMessage() != null && !ex.getMessage().startsWith("http")) {
log.info("Failed while reading batch because: {}", ex.getMessage());
} else {
log.info("Failed while reading batch because: {}", ex.getMessage(), ex);
}
}
errorCountByNode.put(remoteNode.getNodeId(), errorCount + 1);
throw (IOException) ex;
} else {
if (!(ex instanceof ConflictException || ex instanceof SqlException)) {
log.error("Failed while parsing batch", ex);
}
} else if (!(ex instanceof ConflictException) && !(ex instanceof SqlException)) {
log.error("Failed while parsing batch", ex);
}
}

Expand Down

0 comments on commit 74d6e1a

Please sign in to comment.