Skip to content

Commit

Permalink
[CARBONDATA-3236] Fix for JVM Crash for insert into new table from ol…
Browse files Browse the repository at this point in the history
…d table

Problem: Insert into new table from old table fails with JVM crash for file format(Using carbondata).
This happened because both the query and load flow were assigned the same taskId and once query finished
it freed the unsafe memory while the insert still in progress.

Solution: As the flow for file format is direct flow and uses on-heap(safe) so no need to free the unsafe memory in query.

This closes #3056
  • Loading branch information
manishnalla1994 authored and kumarvishal09 committed Jan 9, 2019
1 parent dd2fff2 commit 3a5572e
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 11 deletions.
Expand Up @@ -410,15 +410,8 @@ class SparkCarbonFileFormat extends FileFormat
val model = format.createQueryModel(split, hadoopAttemptContext)
model.setConverter(new SparkDataTypeConverterImpl)
model.setPreFetchData(false)
var isAdded = false
Option(TaskContext.get()).foreach { context =>
val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks")
onCompleteCallbacksField.setAccessible(true)
val listeners = onCompleteCallbacksField.get(context)
.asInstanceOf[ArrayBuffer[TaskCompletionListener]]
isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener])
model.setFreeUnsafeMemory(!isAdded)
}
// As file format uses on heap, no need to free unsafe memory
model.setFreeUnsafeMemory(false)
val carbonReader = if (readVector) {
model.setDirectVectorFill(true)
val vectorizedReader = new VectorizedCarbonRecordReader(model,
Expand All @@ -439,7 +432,7 @@ class SparkCarbonFileFormat extends FileFormat
Option(TaskContext.get()).foreach{context =>
context.addTaskCompletionListener(
CarbonQueryTaskCompletionListenerImpl(
iter.asInstanceOf[RecordReaderIterator[InternalRow]], !isAdded))
iter.asInstanceOf[RecordReaderIterator[InternalRow]]))
}

if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) {
Expand Down
Expand Up @@ -40,7 +40,7 @@ trait CarbonQueryTaskCompletionListener extends TaskCompletionListener
trait CarbonLoadTaskCompletionListener extends TaskCompletionListener

case class CarbonQueryTaskCompletionListenerImpl(iter: RecordReaderIterator[InternalRow],
freeMemory: Boolean) extends CarbonQueryTaskCompletionListener {
freeMemory: Boolean = false) extends CarbonQueryTaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
if (iter != null) {
try {
Expand Down

0 comments on commit 3a5572e

Please sign in to comment.