Skip to content

Commit

Permalink
0003865: Improve logging readability and appropriate levels
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Jan 30, 2019
1 parent cb8cb69 commit 0708693
Showing 1 changed file with 21 additions and 14 deletions.
Expand Up @@ -28,18 +28,17 @@

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.csv.CsvReader;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener;
import org.jumpmind.symmetric.io.data.CsvConstants;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.data.writer.IProtocolDataWriterListener;
import org.jumpmind.symmetric.io.stage.IStagedResource.State;
import org.jumpmind.util.Statistics;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.model.ProcessInfo;
import org.jumpmind.symmetric.model.ProcessInfo.ProcessStatus;
import org.jumpmind.util.Statistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -60,6 +59,7 @@ public class SimpleStagingDataWriter {
protected ProcessInfo processInfo;
protected BufferedWriter writer;
protected Batch batch;
protected long invalidLineCount;

public SimpleStagingDataWriter(ProcessInfo processInfo, BufferedReader reader, IStagingManager stagingManager, String category, long memoryThresholdInBytes,
BatchType batchType, String targetNodeId, DataContext context, IProtocolDataWriterListener... listeners) {
Expand Down Expand Up @@ -154,13 +154,15 @@ public void process() throws IOException {
}
batchTableLines.clear();

batch.setStatistics(batchStats);
batchStats = null;
if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.end(context, batch, resource);
if (batch != null) {
batch.setStatistics(batchStats);
if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.end(context, batch, resource);
}
}
}
batchStats = null;
resource = null;
} else if (line.startsWith(CsvConstants.RETRY)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
Expand Down Expand Up @@ -204,10 +206,9 @@ public void process() throws IOException {
batchStatsLine = line;
batchStats = new Statistics();
putStats(batchStats, batchStatsColumnsLine, batchStatsLine);
} else if (writer == null) {
invalidLineCount++;
} else {
if (writer == null) {
throw new IllegalStateException("Invalid batch data was received: " + line);
}
TableLine batchLine = batchTableLines.get(tableLine);
if (batchLine == null || (batchLine != null && batchLine.columnsLine == null)) {
TableLine syncLine = syncTableLines.get(tableLine);
Expand Down Expand Up @@ -261,7 +262,7 @@ public void process() throws IOException {
}

processInfo.setStatus(ProcessStatus.OK);
} catch (IOException ex) {
} catch (Exception ex) {
if (resource != null) {
resource.delete();
}
Expand All @@ -271,7 +272,13 @@ public void process() throws IOException {
/*
* Just log an error here. We want batches that come before us to continue to process and to be acknowledged
*/
log.error("Failed to process batch. Context: " + context.getContext() , ex);

log.error("Failed to write batch into staging from {}. {}: {}", context.getContext().get(Constants.DATA_CONTEXT_SOURCE_NODE).toString(),
ex.getClass().getName(), ex.getMessage());
} finally {
if (invalidLineCount > 0) {
log.warn("Found {} invalid lines that could not be written to a batch", invalidLineCount);
}
}
}

Expand Down

0 comments on commit 0708693

Please sign in to comment.