Skip to content

Commit

Permalink
0003182: Resource is not closed after "out of disk space" error on a
Browse files Browse the repository at this point in the history
remote target node
  • Loading branch information
chenson42 committed Jul 3, 2017
1 parent dec4109 commit a6f234a
Showing 1 changed file with 132 additions and 118 deletions.
Expand Up @@ -79,141 +79,155 @@ public void process() throws IOException {
IStagedResource resource = null;
String line = null;
long startTime = System.currentTimeMillis(), ts = startTime, lineCount = 0;

while (reader.readRecord()) {
line = reader.getRawRecord();
if (line.startsWith(CsvConstants.CATALOG)) {
catalogLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.SCHEMA)) {
schemaLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.TABLE)) {
tableLine = new TableLine(catalogLine, schemaLine, line);
TableLine batchTableLine = batchTableLines.get(tableLine);

if (batchTableLine != null) {
tableLine = batchTableLine;
try {
while (reader.readRecord()) {
line = reader.getRawRecord();
if (line.startsWith(CsvConstants.CATALOG)) {
catalogLine = line;
writeLine(line);
} else {
TableLine syncTableLine = syncTableLines.get(tableLine);
if (syncTableLine != null) {
tableLine = syncTableLine;
writeLine(tableLine.catalogLine);
writeLine(tableLine.schemaLine);
} else if (line.startsWith(CsvConstants.SCHEMA)) {
schemaLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.TABLE)) {
tableLine = new TableLine(catalogLine, schemaLine, line);
TableLine batchTableLine = batchTableLines.get(tableLine);

if (batchTableLine != null) {
tableLine = batchTableLine;
writeLine(line);
writeLine(tableLine.keysLine);
writeLine(tableLine.columnsLine);
} else {
syncTableLines.put(tableLine, tableLine);
batchTableLines.put(tableLine, tableLine);
writeLine(line);
TableLine syncTableLine = syncTableLines.get(tableLine);
if (syncTableLine != null) {
tableLine = syncTableLine;
writeLine(tableLine.catalogLine);
writeLine(tableLine.schemaLine);
writeLine(line);
writeLine(tableLine.keysLine);
writeLine(tableLine.columnsLine);
} else {
syncTableLines.put(tableLine, tableLine);
batchTableLines.put(tableLine, tableLine);
writeLine(line);
}
}
}
} else if (line.startsWith(CsvConstants.KEYS)) {
tableLine.keysLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.COLUMNS)) {
tableLine.columnsLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.BATCH)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location,
batch.getBatchId());
if (resource == null || resource.getState() == State.DONE) {
log.debug("Creating staged resource for batch {}", batch.getNodeBatchId());
resource = stagingManager.create(category, location, batch.getBatchId());
}
writer = resource.getWriter(memoryThresholdInBytes);
writeLine(nodeLine);
writeLine(binaryLine);
writeLine(channelLine);
writeLine(line);

if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
} else if (line.startsWith(CsvConstants.KEYS)) {
tableLine.keysLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.COLUMNS)) {
tableLine.columnsLine = line;
writeLine(line);
} else if (line.startsWith(CsvConstants.BATCH)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location, batch.getBatchId());
if (resource == null || resource.getState() != State.DONE) {
log.debug("Creating staged resource for batch {}", batch.getNodeBatchId());
resource = stagingManager.create(category, location, batch.getBatchId());
}
}
} else if (line.startsWith(CsvConstants.COMMIT)) {
if (writer != null) {
writer = resource.getWriter(memoryThresholdInBytes);
writeLine(nodeLine);
writeLine(binaryLine);
writeLine(channelLine);
writeLine(line);
resource.close();
resource.setState(State.DONE);
writer = null;
}
batchTableLines.clear();

if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.end(context, batch, resource);
if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
}
}
}
} else if (line.startsWith(CsvConstants.RETRY)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location,
batch.getBatchId());
if (resource == null || resource.getState() == State.CREATE) {
resource = null;
writer = null;
}

if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
} else if (line.startsWith(CsvConstants.COMMIT)) {
if (writer != null) {
writeLine(line);
resource.close();
resource.setState(State.DONE);
writer = null;
}
}
} else if (line.startsWith(CsvConstants.NODEID)) {
nodeLine = line;
} else if (line.startsWith(CsvConstants.BINARY)) {
binaryLine = line;
} else if (line.startsWith(CsvConstants.CHANNEL)) {
channelLine = line;
} else {
if (writer == null) {
throw new IllegalStateException("Invalid batch data was received: " + line);
}
TableLine batchLine = batchTableLines.get(tableLine);
if (batchLine == null || (batchLine != null && batchLine.columnsLine == null)) {
TableLine syncLine = syncTableLines.get(tableLine);
if (syncLine != null) {
log.debug("Injecting keys and columns to be backwards compatible");
if (batchLine == null) {
batchLine = syncLine;
batchTableLines.put(batchLine, batchLine);
writeLine(batchLine.tableLine);
batchTableLines.clear();

if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.end(context, batch, resource);
}
batchLine.keysLine = syncLine.keysLine;
writeLine(syncLine.keysLine);
batchLine.columnsLine = syncLine.columnsLine;
writeLine(syncLine.columnsLine);
}
}
int size = line.length();
if (size > MAX_WRITE_LENGTH) {
log.debug("Exceeded max line length with {}", size);
for (int i = 0; i < size; i = i + MAX_WRITE_LENGTH) {
int end = i + MAX_WRITE_LENGTH;
writer.append(line, i, end < size ? end : size);
} else if (line.startsWith(CsvConstants.RETRY)) {
batch = new Batch(batchType, Long.parseLong(getArgLine(line)), getArgLine(channelLine), getBinaryEncoding(binaryLine),
getArgLine(nodeLine), targetNodeId, false);
String location = batch.getStagedLocation();
resource = stagingManager.find(category, location, batch.getBatchId());
if (resource == null || resource.getState() == State.CREATE) {
resource = null;
writer = null;
}

if (listeners != null) {
for (IProtocolDataWriterListener listener : listeners) {
listener.start(context, batch);
}
}
writer.append("\n");
} else if (line.startsWith(CsvConstants.NODEID)) {
nodeLine = line;
} else if (line.startsWith(CsvConstants.BINARY)) {
binaryLine = line;
} else if (line.startsWith(CsvConstants.CHANNEL)) {
channelLine = line;
} else {
writeLine(line);
if (writer == null) {
throw new IllegalStateException("Invalid batch data was received: " + line);
}
TableLine batchLine = batchTableLines.get(tableLine);
if (batchLine == null || (batchLine != null && batchLine.columnsLine == null)) {
TableLine syncLine = syncTableLines.get(tableLine);
if (syncLine != null) {
log.debug("Injecting keys and columns to be backwards compatible");
if (batchLine == null) {
batchLine = syncLine;
batchTableLines.put(batchLine, batchLine);
writeLine(batchLine.tableLine);
}
batchLine.keysLine = syncLine.keysLine;
writeLine(syncLine.keysLine);
batchLine.columnsLine = syncLine.columnsLine;
writeLine(syncLine.columnsLine);
}
}
int size = line.length();
if (size > MAX_WRITE_LENGTH) {
log.debug("Exceeded max line length with {}", size);
for (int i = 0; i < size; i = i + MAX_WRITE_LENGTH) {
int end = i + MAX_WRITE_LENGTH;
writer.append(line, i, end < size ? end : size);
}
writer.append("\n");
} else {
writeLine(line);
}
}

lineCount++;
if (System.currentTimeMillis() - ts > 60000) {
log.info(
"Batch '{}', for node '{}', for process 'transfer to stage' has been processing for {} seconds. The following stats have been gathered: {}",
new Object[] { (batch != null ? batch.getBatchId() : 0), (batch != null ? batch.getTargetNodeId() : ""),
(System.currentTimeMillis() - startTime) / 1000,
"LINES=" + lineCount + ", BYTES=" + ((resource == null) ? 0 : resource.getSize()) });
ts = System.currentTimeMillis();
}
}

} catch (Exception ex) {
if (resource != null) {
resource.close();
resource.delete();
}

lineCount++;
if (System.currentTimeMillis() - ts > 60000) {
log.info(
"Batch '{}', for node '{}', for process 'transfer to stage' has been processing for {} seconds. The following stats have been gathered: {}",
new Object[] { (batch != null ? batch.getBatchId() : 0), (batch != null ? batch.getTargetNodeId() : ""),
(System.currentTimeMillis() - startTime) / 1000,
"LINES=" + lineCount + ", BYTES=" + ((resource == null) ? 0 : resource.getSize()) });
ts = System.currentTimeMillis();
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw new RuntimeException(ex);
}
}
}
Expand Down

0 comments on commit a6f234a

Please sign in to comment.