From a97ba812b321bc4ece83b4c3d3e003a53140a75e Mon Sep 17 00:00:00 2001 From: Salim Achouche Date: Fri, 22 Dec 2017 11:50:56 -0800 Subject: [PATCH] DRILL-6079 : Fixed memory leak in Parquet Reader --- .../columnreaders/AsyncPageReader.java | 52 +++++++++++-------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java index 4f1ac1218a5..036b546f578 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java @@ -17,39 +17,40 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; -import com.google.common.base.Stopwatch; -import io.netty.buffer.DrillBuf; +import static org.apache.parquet.column.Encoding.valueOf; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.ExecConstants; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.DirectDecompressor; -import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.parquet.hadoop.CodecFactory; -import org.apache.parquet.hadoop.codec.SnappyCodec; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.drill.exec.util.filereader.DirectBufInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressor; +import org.apache.hadoop.io.compress.GzipCodec; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.PageType; import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.xerial.snappy.Snappy; -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; +import com.google.common.base.Stopwatch; -import static org.apache.parquet.column.Encoding.valueOf; +import io.netty.buffer.DrillBuf; /** * The AyncPageReader reads one page of data at a time asynchronously from the provided InputStream. The * first request to the page reader creates a Future Task (AsyncPageReaderTask) and submits it to the @@ -219,7 +220,7 @@ protected void nextInternal() throws IOException { Stopwatch timer = Stopwatch.createStarted(); parentColumnReader.parentReader.getOperatorContext().getStats().startWait(); try { - asyncPageRead.poll().get(); // get the result of execution + waitForExecutionResult(); // get the result of execution synchronized (pageQueueSyncronize) { boolean pageQueueFull = pageQueue.remainingCapacity() == 0; readStatus = pageQueue.take(); // get the data if no exception has been thrown @@ -253,7 +254,7 @@ protected void nextInternal() throws IOException { do { if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { readDictionaryPageData(readStatus, parentColumnReader); - asyncPageRead.poll().get(); // get the result of execution + waitForExecutionResult(); // get the result of execution synchronized (pageQueueSyncronize) { boolean pageQueueFull = pageQueue.remainingCapacity() == 0; readStatus = pageQueue.take(); // get the data if no exception has been thrown @@ -283,6 +284,14 @@ protected void nextInternal() throws IOException { } + private void waitForExecutionResult() throws InterruptedException, ExecutionException { + // Get the execution result but don't remove the Future object from the "asyncPageRead" queue yet; + // this will ensure that cleanup will happen properly in case of an exception being thrown + asyncPageRead.peek().get(); // get the result of execution + // Alright now remove the Future object + asyncPageRead.poll(); + } + @Override public void clear() { //Cancelling all existing AsyncPageReaderTasks while (asyncPageRead != null && !asyncPageRead.isEmpty()) { @@ -395,7 +404,6 @@ public AsyncPageReaderTask(String name, LinkedBlockingQueue queue) { @Override public Void call() throws IOException { ReadStatus readStatus = new ReadStatus(); - long bytesRead = 0; long valuesRead = 0; final long totalValuesRead = parent.totalPageValuesRead;