From def7cbd92917a78a1d9135cba20e53d82c2f0430 Mon Sep 17 00:00:00 2001 From: m00258959 Date: Wed, 7 Feb 2018 12:07:33 +0530 Subject: [PATCH] Fixed memory leak issue. In case of any task failure unsafe memory for that task in not getting cleared from the executor if the task fails during initialization of the record reader --- .../executor/impl/AbstractQueryExecutor.java | 14 ++++++- .../hadoop/AbstractRecordReader.java | 8 ++-- .../carbondata/spark/rdd/CarbonScanRDD.scala | 38 +++++++++++-------- 3 files changed, 40 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 6875f355599..64906948dc4 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -586,9 +586,17 @@ private Set getCurrentBlockFilterDimensions( */ @Override public void finish() throws QueryExecutionException { CarbonUtil.clearBlockCache(queryProperties.dataBlocks); + Throwable exceptionOccurred = null; if (null != queryIterator) { - queryIterator.close(); + // catch if there is any exception so that it can be rethrown after clearing all the resources + // else if any exception is thrown from this point executor service will not be terminated + try { + queryIterator.close(); + } catch (Throwable e) { + exceptionOccurred = e; + } } + // clear all the unsafe memory used for the given task ID UnsafeMemoryManager.INSTANCE.freeMemoryAll(ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId()); if (null != queryProperties.executorService) { // In case of limit query when number of limit records is already found so executors @@ -596,6 +604,10 @@ private Set getCurrentBlockFilterDimensions( // the query performance. queryProperties.executorService.shutdownNow(); } + // if there is any exception re throw the exception + if (null != exceptionOccurred) { + throw new QueryExecutionException(exceptionOccurred); + } } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java index 62a97f9e174..bd4bbce26d7 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java @@ -36,8 +36,10 @@ public abstract class AbstractRecordReader extends RecordReader { */ public void logStatistics(int recordCount, QueryStatisticsRecorder recorder) { // result size - QueryStatistic queryStatistic = new QueryStatistic(); - queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount); - recorder.recordStatistics(queryStatistic); + if (null != recorder) { + QueryStatistic queryStatistic = new QueryStatistic(); + queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount); + recorder.recordStatistics(queryStatistic); + } } } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 49c02250842..102c6c8b731 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -362,18 +362,21 @@ class CarbonScanRDD( } } + // add task completion before calling initialize as initialize method will internally call + // for usage of unsafe method for processing of one blocklet and if there is any exception + // while doing that the unsafe memory occupied for that task will not get cleared + context.addTaskCompletionListener { _ => + reader.close() + close() + logStatistics(queryStartTime, model.getStatisticsRecorder) + } + // initialize the reader reader.initialize(inputSplit, attemptContext) new Iterator[Any] { private var havePair = false private var finished = false - context.addTaskCompletionListener { _ => - reader.close() - close() - logStatistics(queryStartTime, model.getStatisticsRecorder) - } - override def hasNext: Boolean = { if (context.isInterrupted) { throw new TaskKilledException @@ -394,10 +397,6 @@ class CarbonScanRDD( value } - private def close() { - TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId) - inputMetricsStats.updateAndClose() - } } } else { new Iterator[Any] { @@ -411,6 +410,11 @@ class CarbonScanRDD( iterator.asInstanceOf[Iterator[InternalRow]] } + private def close() { + TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId) + inputMetricsStats.updateAndClose() + } + def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = { CarbonTableInputFormat.setTableInfo(conf, tableInfo) CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName) @@ -456,12 +460,14 @@ class CarbonScanRDD( } def logStatistics(queryStartTime: Long, recorder: QueryStatisticsRecorder): Unit = { - val queryStatistic = new QueryStatistic() - queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART, - System.currentTimeMillis - queryStartTime) - recorder.recordStatistics(queryStatistic) - // print executor query statistics for each task_id - recorder.logStatisticsAsTableExecutor() + if (null != recorder) { + val queryStatistic = new QueryStatistic() + queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART, + System.currentTimeMillis - queryStartTime) + recorder.recordStatistics(queryStatistic) + // print executor query statistics for each task_id + recorder.logStatisticsAsTableExecutor() + } } /**