diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index 8cb2ca43d0a..f725de37b35 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -410,15 +410,8 @@ class SparkCarbonFileFormat extends FileFormat val model = format.createQueryModel(split, hadoopAttemptContext) model.setConverter(new SparkDataTypeConverterImpl) model.setPreFetchData(false) - var isAdded = false - Option(TaskContext.get()).foreach { context => - val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks") - onCompleteCallbacksField.setAccessible(true) - val listeners = onCompleteCallbacksField.get(context) - .asInstanceOf[ArrayBuffer[TaskCompletionListener]] - isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener]) - model.setFreeUnsafeMemory(!isAdded) - } + // As file format uses on heap, no need to free unsafe memory + model.setFreeUnsafeMemory(false) val carbonReader = if (readVector) { model.setDirectVectorFill(true) val vectorizedReader = new VectorizedCarbonRecordReader(model, @@ -439,7 +432,7 @@ class SparkCarbonFileFormat extends FileFormat Option(TaskContext.get()).foreach{context => context.addTaskCompletionListener( CarbonQueryTaskCompletionListenerImpl( - iter.asInstanceOf[RecordReaderIterator[InternalRow]], !isAdded)) + iter.asInstanceOf[RecordReaderIterator[InternalRow]])) } if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) { diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala index eb3e42a5cb3..5547228ba90 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala @@ -40,7 +40,7 @@ trait CarbonQueryTaskCompletionListener extends TaskCompletionListener trait CarbonLoadTaskCompletionListener extends TaskCompletionListener case class CarbonQueryTaskCompletionListenerImpl(iter: RecordReaderIterator[InternalRow], - freeMemory: Boolean) extends CarbonQueryTaskCompletionListener { + freeMemory: Boolean = false) extends CarbonQueryTaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = { if (iter != null) { try {