Skip to content

Commit

Permalink
ORC-978: Fix NPE in TestFlinkOrcReaderWriter (#891)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR aims to fix NPE in TestFlinkOrcReaderWriter.

```java
@OverRide
  public void close() throws IOException {
    if (!isClosed) {
      try {
        if (batch.size > 0) {
          writer.addRowBatch(batch);
          batch.reset();
        }
      } finally {
        writer.close();
        this.isClosed = true;
      }
    }
  }
```

writer.addRowBatch(batch);

```java
    .......
      checkMemory();
    } catch (Throwable t) {
      try {
        close();
      } catch (Throwable ignore) {
        // ignore
      }
      if (t instanceof IOException) {
        throw (IOException) t;
      } else {
        throw new IOException("Problem adding row to " + path, t);
      }
    }
```

addRowBatch method throws java.lang.OutOfMemoryError causing writerImpl to close twice in TestFlinkOrcReaderWriter case.
The first close already set the rawWriter to null, so the second time throw a NullPointerException.

I added status variables to ensure that close will only do the necessary action the first time.

### Why are the changes needed?

Fix NPE in TestFlinkOrcReaderWriter.

### How was this patch tested?

Add UT for this bug.
  • Loading branch information
guiyanakuang committed Sep 2, 2021
1 parent 4ff6115 commit 8a4e24a
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 8 deletions.
23 changes: 15 additions & 8 deletions java/core/src/java/org/apache/orc/impl/WriterImpl.java
Expand Up @@ -140,6 +140,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
// information
private boolean needKeyFlush;
private final boolean useProlepticGregorian;
private boolean isClose = false;

public WriterImpl(FileSystem fs,
Path path,
Expand Down Expand Up @@ -725,15 +726,21 @@ public void addRowBatch(VectorizedRowBatch batch) throws IOException {

@Override
public void close() throws IOException {
if (callback != null) {
callback.preFooterWrite(callbackContext);
if (!isClose) {
try {
if (callback != null) {
callback.preFooterWrite(callbackContext);
}
// remove us from the memory manager so that we don't get any callbacks
memoryManager.removeWriter(path);
// actually close the file
flushStripe();
lastFlushOffset = writeFooter();
physicalWriter.close();
} finally {
isClose = true;
}
}
// remove us from the memory manager so that we don't get any callbacks
memoryManager.removeWriter(path);
// actually close the file
flushStripe();
lastFlushOffset = writeFooter();
physicalWriter.close();
}

/**
Expand Down
8 changes: 8 additions & 0 deletions java/core/src/test/org/apache/orc/impl/TestWriterImpl.java
Expand Up @@ -118,4 +118,12 @@ public void testStripes() throws Exception {
Reader r = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf));
assertEquals(r.getStripes(), w.getStripes());
}

@Test
public void testCloseIsIdempotent() throws IOException {
conf.set(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), "true");
Writer w = OrcFile.createWriter(testFilePath, OrcFile.writerOptions(conf).setSchema(schema));
w.close();
w.close();
}
}

0 comments on commit 8a4e24a

Please sign in to comment.