Skip to content

Commit

Permalink
0003572: Improve SymmetricDS acknowledgement logging
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed May 22, 2018
1 parent dba4583 commit af97986
Showing 1 changed file with 34 additions and 17 deletions.
Expand Up @@ -24,7 +24,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -37,6 +36,7 @@
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
Expand Down Expand Up @@ -244,38 +244,49 @@ protected String buildBatchWhere(List<String> nodeIds, List<String> channels,
* Try a configured number of times to get the ACK through.
*/
protected void sendAck(Node remote, Node local, NodeSecurity localSecurity,
List<IncomingBatch> list, ITransportManager transportManager) throws IOException {
Exception error = null;
int sendAck = -1;
List<IncomingBatch> list, ITransportManager transportManager) throws IOException {
assertNotNull(remote, "Node remote cannot be null. Maybe there is a missing sym_node row.");
assertNotNull(local, "Node local cannot be null. Maybe there is a missing sym_node row.");
assertNotNull(localSecurity, "NodeSecurity localSecurity cannot be null. Maybe there is a missing sym_node_security row.");

Exception exception = null;
int statusCode = -1;
int numberOfStatusSendRetries = parameterService
.getInt(ParameterConstants.DATA_LOADER_NUM_OF_ACK_RETRIES);
for (int i = 0; i < numberOfStatusSendRetries && sendAck != HttpURLConnection.HTTP_OK; i++) {
for (int i = 0; i < numberOfStatusSendRetries && statusCode != HttpURLConnection.HTTP_OK; i++) {
try {
sendAck = transportManager.sendAcknowledgement(remote, list, local,
statusCode = transportManager.sendAcknowledgement(remote, list, local,
localSecurity.getNodePassword(), parameterService.getRegistrationUrl());
} catch (IOException ex) {
error = ex;
exception = ex;
} catch (RuntimeException ex) {
error = ex;
exception = ex;
}
if (sendAck != HttpURLConnection.HTTP_OK) {
log.warn("Ack was not sent successfully on try number {}. {}", i + 1,
error != null ? error.getMessage() : "");
if (statusCode != HttpURLConnection.HTTP_OK) {
String message = String.format("Ack was not sent successfully on try number %s of %s.", i+1, numberOfStatusSendRetries);
if (statusCode > 0) {
message += String.format(" statusCode=%s", statusCode);
}
if (exception != null) {
log.warn(message, exception);
} else {
log.warn(message);
}

if (i < numberOfStatusSendRetries - 1) {
AppUtils.sleep(parameterService
.getLong(ParameterConstants.DATA_LOADER_TIME_BETWEEN_ACK_RETRIES));
} else if (error instanceof RuntimeException) {
throw (RuntimeException) error;
} else if (error instanceof IOException) {
throw (IOException) error;
} else if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof IOException) {
throw (IOException) exception;
} else {
throw new IOException(Integer.toString(sendAck));
throw new IOException("Ack was not sent successfully. statusCode=" + statusCode);
}
}
}
}


protected List<BatchAck> readAcks(List<OutgoingBatch> batches, IOutgoingWithResponseTransport transport,
ITransportManager transportManager, IAcknowledgeService acknowledgeService, IDataExtractorService dataExtratorService)
throws IOException {
Expand Down Expand Up @@ -358,6 +369,12 @@ protected boolean isStreamClosedByClient(Exception ex) {
return false;
}
}

protected void assertNotNull(Object o, String message) {
if (o == null) {
throw new SymmetricException(message);
}
}



Expand Down

0 comments on commit af97986

Please sign in to comment.