Skip to content

Commit

Permalink
0002948: Columns mismatched with multiline clob and varchar columns with
Browse files Browse the repository at this point in the history
matching protocol data that happens to be in a new line
  • Loading branch information
chenson42 committed Dec 24, 2016
1 parent c0fecee commit ec54c54
Showing 1 changed file with 38 additions and 42 deletions.
Expand Up @@ -22,12 +22,15 @@

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.csv.CsvReader;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.CsvConstants;
Expand All @@ -44,7 +47,7 @@ public class SimpleStagingDataWriter {

protected final Logger log = LoggerFactory.getLogger(getClass());

protected BufferedReader reader;
protected CsvReader reader;
protected IStagingManager stagingManager;
protected IProtocolDataWriterListener[] listeners;
protected long memoryThresholdInBytes;
Expand All @@ -55,10 +58,12 @@ public class SimpleStagingDataWriter {

protected BufferedWriter writer;
protected Batch batch;

public SimpleStagingDataWriter(BufferedReader reader, IStagingManager stagingManager, String category, long memoryThresholdInBytes,
BatchType batchType, String targetNodeId, DataContext context, IProtocolDataWriterListener... listeners) {
this.reader = reader;
this.reader = new CsvReader(reader);
this.reader.setEscapeMode(CsvReader.ESCAPE_MODE_BACKSLASH);
this.reader.setSafetySwitch(false);
this.stagingManager = stagingManager;
this.memoryThresholdInBytes = memoryThresholdInBytes;
this.category = category;
Expand All @@ -72,12 +77,14 @@ public void process() throws IOException {
String catalogLine = null, schemaLine = null, nodeLine = null, binaryLine = null, channelLine = null;
TableLine tableLine = null;
Map<TableLine, TableLine> syncTableLines = new HashMap<TableLine, TableLine>();
Map<TableLine, TableLine> batchTableLines = new HashMap<TableLine,TableLine>();
Map<TableLine, TableLine> batchTableLines = new HashMap<TableLine, TableLine>();
IStagedResource resource = null;
String line = null;
long startTime = System.currentTimeMillis(), ts = startTime, lineCount = 0;

while ((line = readLine()) != null) {

while (reader.readRecord()) {
line = reader.getRawRecord();
if (line.startsWith(CsvConstants.CATALOG)) {
catalogLine = line;
writeLine(line);
Expand All @@ -87,7 +94,7 @@ public void process() throws IOException {
} else if (line.startsWith(CsvConstants.TABLE)) {
tableLine = new TableLine(catalogLine, schemaLine, line);
TableLine batchTableLine = batchTableLines.get(tableLine);

if (batchTableLine != null) {
tableLine = batchTableLine;
writeLine(line);
Expand All @@ -113,15 +120,16 @@ public void process() throws IOException {
tableLine.columnsLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.BATCH)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine),
getBinaryEncoding(binaryLine), getArgLine(nodeLine), targetNodeId, false);
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location, batch.getBatchId());
resource = stagingManager.find(category, location,
batch.getBatchId());
if (resource == null || resource.getState() == State.DONE) {
log.debug("Creating staged resource for batch {}", batch.getNodeBatchId());
resource = stagingManager.create(memoryThresholdInBytes, category, location, batch.getBatchId());
}
writer = resource.getWriter();
writer = new BufferedWriter(new FileWriter(new File("/Users/cshenso/Downloads/test.text")));
writeLine(nodeLine);
writeLine(binaryLine);
writeLine(channelLine);
Expand All @@ -140,17 +148,18 @@ public void process() throws IOException {
writer = null;
}
batchTableLines.clear();

if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.end(context, batch, resource);
}
}
} else if (line.startsWith(CsvConstants.RETRY)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine),
getBinaryEncoding(binaryLine), getArgLine(nodeLine), targetNodeId, false);
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location, batch.getBatchId());
resource = stagingManager.find(category, location,
batch.getBatchId());
if (resource == null || resource.getState() == State.CREATE) {
resource = null;
writer = null;
Expand All @@ -168,29 +177,31 @@ public void process() throws IOException {
} else if (line.startsWith(CsvConstants.CHANNEL)) {
channelLine = line;
} else {
int size = line.length();
int size = line.length();
if (size > MAX_WRITE_LENGTH) {
log.debug("Exceeded max line length with {}", size);
for (int i = 0; i < size; i = i + MAX_WRITE_LENGTH) {
int end = i + MAX_WRITE_LENGTH;
writer.append(line, i, end < size ? end : size);
}
writer.append("\n");
} else {
writeLine(line);
}
}

lineCount++;
if (System.currentTimeMillis() - ts > 60000) {
log.info("Batch '{}', for node '{}', for process 'transfer to stage' has been processing for {} seconds. The following stats have been gathered: {}",
new Object[] { (batch != null ? batch.getBatchId() : 0),
(batch != null ? batch.getTargetNodeId() : ""), (System.currentTimeMillis() - startTime) / 1000,
"LINES=" + lineCount + ", BYTES=" + ((resource == null) ? 0 : resource.getSize()) });
log.info(
"Batch '{}', for node '{}', for process 'transfer to stage' has been processing for {} seconds. The following stats have been gathered: {}",
new Object[] { (batch != null ? batch.getBatchId() : 0), (batch != null ? batch.getTargetNodeId() : ""),
(System.currentTimeMillis() - startTime) / 1000,
"LINES=" + lineCount + ", BYTES=" + ((resource == null) ? 0 : resource.getSize()) });
ts = System.currentTimeMillis();
}
}
}

protected String getArgLine(String line) throws IOException {
if (line != null) {
int i = line.indexOf(",");
Expand All @@ -201,53 +212,38 @@ protected String getArgLine(String line) throws IOException {
}
return null;
}

protected BinaryEncoding getBinaryEncoding(String line) throws IOException {
String value = getArgLine(line);
if (value != null) {
return BinaryEncoding.valueOf(value);
}
return null;
}

protected void writeLine(String line) throws IOException {
if (line != null) {
if (log.isDebugEnabled()) {
log.debug("Writing staging data: {}", line);
}
writer.write(line);
writer.write("\n");
}
}

protected String readLine() throws IOException {
StringBuilder sb = new StringBuilder();
int ch;
while ((ch = reader.read()) != -1) {
sb.append((char) ch);
if (ch == '\n') {
break;
}
}
String str = sb.toString();
if (str.length() == 0) {
return null;
}
return str;
}

class TableLine {
String catalogLine;
String schemaLine;
String tableLine;
String keysLine;
String columnsLine;

public TableLine(String catalogLine, String schemaLine, String tableLine) {
this.catalogLine = catalogLine;
this.schemaLine = schemaLine;
this.tableLine = tableLine;
}

@Override
public boolean equals(Object o) {
if (o == null || !(o instanceof TableLine)) {
Expand All @@ -257,7 +253,7 @@ public boolean equals(Object o) {
return StringUtils.equals(catalogLine, t.catalogLine) && StringUtils.equals(schemaLine, t.schemaLine)
&& StringUtils.equals(tableLine, t.tableLine);
}

@Override
public int hashCode() {
return (catalogLine + "." + schemaLine + "." + tableLine).hashCode();
Expand Down

0 comments on commit ec54c54

Please sign in to comment.