From c0e567cefe7ad9bf9745ed41fb87abba60dbf1b9 Mon Sep 17 00:00:00 2001 From: Roman Kulyk Date: Fri, 29 Sep 2017 17:26:39 +0000 Subject: [PATCH] DRILL-5564: Added finally block for stopWait() to avoid all situations where Drill able to miss stopWait() in case of exceptions (it can lead to assertions). --- .../columnreaders/AsyncPageReader.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java index 8c89e3a169e..4f1ac1218a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java @@ -218,21 +218,24 @@ protected void nextInternal() throws IOException { try { Stopwatch timer = Stopwatch.createStarted(); parentColumnReader.parentReader.getOperatorContext().getStats().startWait(); - asyncPageRead.poll().get(); // get the result of execution - synchronized (pageQueueSyncronize) { - boolean pageQueueFull = pageQueue.remainingCapacity() == 0; - readStatus = pageQueue.take(); // get the data if no exception has been thrown - if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) { - throw new DrillRuntimeException("Unexpected end of data"); - } - //if the queue was full before we took a page out, then there would - // have been no new read tasks scheduled. In that case, schedule a new read. - if (!parentColumnReader.isShuttingDown && pageQueueFull) { - asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue))); + try { + asyncPageRead.poll().get(); // get the result of execution + synchronized (pageQueueSyncronize) { + boolean pageQueueFull = pageQueue.remainingCapacity() == 0; + readStatus = pageQueue.take(); // get the data if no exception has been thrown + if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) { + throw new DrillRuntimeException("Unexpected end of data"); + } + //if the queue was full before we took a page out, then there would + // have been no new read tasks scheduled. In that case, schedule a new read. + if (!parentColumnReader.isShuttingDown && pageQueueFull) { + asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName, pageQueue))); + } } + } finally { + parentColumnReader.parentReader.getOperatorContext().getStats().stopWait(); } long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS); - parentColumnReader.parentReader.getOperatorContext().getStats().stopWait(); stats.timeDiskScanWait.addAndGet(timeBlocked); stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime()); if (readStatus.isDictionaryPage) {