From 3a5572ee4d0b472e0a37aebf7c6d38e779c8eacb Mon Sep 17 00:00:00 2001 From: manishnalla1994 Date: Tue, 8 Jan 2019 16:12:55 +0530 Subject: [PATCH] [CARBONDATA-3236] Fix for JVM Crash for insert into new table from old table Problem: Insert into new table from old table fails with JVM crash for file format(Using carbondata). This happened because both the query and load flow were assigned the same taskId and once query finished it freed the unsafe memory while the insert still in progress. Solution: As the flow for file format is direct flow and uses on-heap(safe) so no need to free the unsafe memory in query. This closes #3056 --- .../datasources/SparkCarbonFileFormat.scala | 13 +++---------- .../CarbonTaskCompletionListener.scala | 2 +- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala index 8cb2ca43d0a..f725de37b35 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala @@ -410,15 +410,8 @@ class SparkCarbonFileFormat extends FileFormat val model = format.createQueryModel(split, hadoopAttemptContext) model.setConverter(new SparkDataTypeConverterImpl) model.setPreFetchData(false) - var isAdded = false - Option(TaskContext.get()).foreach { context => - val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks") - onCompleteCallbacksField.setAccessible(true) - val listeners = onCompleteCallbacksField.get(context) - .asInstanceOf[ArrayBuffer[TaskCompletionListener]] - isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener]) - model.setFreeUnsafeMemory(!isAdded) - } + // As file format uses on heap, no need to free unsafe memory + model.setFreeUnsafeMemory(false) val carbonReader = if (readVector) { model.setDirectVectorFill(true) val vectorizedReader = new VectorizedCarbonRecordReader(model, @@ -439,7 +432,7 @@ class SparkCarbonFileFormat extends FileFormat Option(TaskContext.get()).foreach{context => context.addTaskCompletionListener( CarbonQueryTaskCompletionListenerImpl( - iter.asInstanceOf[RecordReaderIterator[InternalRow]], !isAdded)) + iter.asInstanceOf[RecordReaderIterator[InternalRow]])) } if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) { diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala index eb3e42a5cb3..5547228ba90 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/tasklisteners/CarbonTaskCompletionListener.scala @@ -40,7 +40,7 @@ trait CarbonQueryTaskCompletionListener extends TaskCompletionListener trait CarbonLoadTaskCompletionListener extends TaskCompletionListener case class CarbonQueryTaskCompletionListenerImpl(iter: RecordReaderIterator[InternalRow], - freeMemory: Boolean) extends CarbonQueryTaskCompletionListener { + freeMemory: Boolean = false) extends CarbonQueryTaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = { if (iter != null) { try {