Skip to content

Commit

Permalink
0004019: Skipping a batch that contains a CREATE event gets a
Browse files Browse the repository at this point in the history
NullPointerException
  • Loading branch information
erilong committed Jun 19, 2019
1 parent d7f20e8 commit 6832351
Showing 1 changed file with 8 additions and 6 deletions.
Expand Up @@ -97,7 +97,7 @@ public void process(DataContext context) {

boolean processBatch = listener == null ? true : listener
.beforeBatchStarted(context);

if (processBatch) {
dataWriter = chooseDataWriter(currentBatch);
processBatch &= dataWriter != null;
Expand All @@ -118,7 +118,7 @@ public void process(DataContext context) {

// pull and process any data events that are not wrapped
// in a table
forEachDataInTable(context, processBatch, currentBatch);
forEachDataInTable(context, processBatch, false, currentBatch);

// pull and process all data events wrapped in tables
forEachTableInBatch(context, processBatch, currentBatch);
Expand Down Expand Up @@ -175,7 +175,7 @@ protected int forEachTableInBatch(DataContext context, boolean processBatch, Bat
if (processBatch) {
processTable = context.getWriter().start(currentTable);
}
dataRow += forEachDataInTable(context, processTable, batch);
dataRow += forEachDataInTable(context, processBatch, processTable, batch);
} catch (IgnoreBatchException ex) {
processBatch = false;
}
Expand All @@ -189,7 +189,7 @@ protected int forEachTableInBatch(DataContext context, boolean processBatch, Bat
return dataRow;
}

protected int forEachDataInTable(DataContext context, boolean processTable, Batch batch) {
protected int forEachDataInTable(DataContext context, boolean processBatch, boolean processTable, Batch batch) {
int dataRow = 0;
IgnoreBatchException ignore = null;
long startTime = System.currentTimeMillis();
Expand All @@ -206,9 +206,11 @@ protected int forEachDataInTable(DataContext context, boolean processTable, Batc
batch.startTimer(STAT_WRITE_DATA);
batch.incrementLineCount();
if (context.getWriter() == null) {
context.setWriter(chooseDataWriter(batch));
context.setWriter(chooseDataWriter(batch));
}
if (processBatch) {
context.getWriter().write(currentData);
}
context.getWriter().write(currentData);
batch.incrementDataWriteMillis(batch.endTimer(STAT_WRITE_DATA));
} catch (IgnoreBatchException ex) {
ignore = ex;
Expand Down

0 comments on commit 6832351

Please sign in to comment.