Skip to content

Commit

Permalink
[CARBONDATA-2978] Fixed JVM crash issue when insert into carbon table…
Browse files Browse the repository at this point in the history
… from other carbon table

Problem:
When data is inserted from one carbon to other carbon table and unsafe load and query is enabled then JVM crash is happening.
Reason: When insert happens from one carbon table another table it uses same task and thread so it
gets the same taskid and at the unsafe manager tries to release all memory acquired by the task even though load happens on the task.

Solution:
Check the listeners and ignore cache clearing.

This closes #2773
  • Loading branch information
ravipesala authored and kumarvishal09 committed Sep 28, 2018
1 parent c016361 commit 9ae91cc
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 47 deletions.
Expand Up @@ -424,6 +424,8 @@ public static class CarbonRecordWriter extends RecordWriter<NullWritable, Object

private Future future;

private boolean isClosed;

public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper,
DataLoadExecutor dataLoadExecutor, CarbonLoadModel loadModel, Future future,
ExecutorService executorService) {
Expand All @@ -442,22 +444,25 @@ public CarbonRecordWriter(CarbonOutputIteratorWrapper iteratorWrapper,
}

@Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException {
if (iteratorWrapper != null) {
iteratorWrapper.closeWriter(false);
}
try {
future.get();
} catch (ExecutionException e) {
LOG.error("Error while loading data", e);
throw new InterruptedException(e.getMessage());
} finally {
executorService.shutdownNow();
dataLoadExecutor.close();
ThreadLocalSessionInfo.unsetAll();
// clean up the folders and files created locally for data load operation
TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
if (!isClosed) {
isClosed = true;
if (iteratorWrapper != null) {
iteratorWrapper.closeWriter(false);
}
try {
future.get();
} catch (ExecutionException e) {
LOG.error("Error while loading data", e);
throw new InterruptedException(e.getMessage());
} finally {
executorService.shutdownNow();
dataLoadExecutor.close();
ThreadLocalSessionInfo.unsetAll();
// clean up the folders and files created locally for data load operation
TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false);
}
LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID());
}
LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID());
}

public CarbonLoadModel getLoadModel() {
Expand Down
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.carbondata.spark.testsuite.insertQuery

import org.apache.spark.sql.Row
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties


class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
Expand Down Expand Up @@ -64,6 +67,8 @@ class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
"Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions," +
"Latest_operatorId,gamePointDescription,gamePointId,contractNumber', " +
"'bad_records_logger_enable'='false','bad_records_action'='FORCE')")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, "true")
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
}

test("insert into hive") {
Expand Down Expand Up @@ -102,7 +107,79 @@ class InsertIntoNonCarbonTableTestCase extends QueryTest with BeforeAndAfterAll
sql("drop table thive_cond")
}

test("jvm crash when insert data from datasource table to session table") {
val spark = sqlContext.sparkSession
import spark.implicits._

import scala.util.Random
val r = new Random()
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
.toDF("ID", "name", "city", "age")
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")

df.write.format("carbon").saveAsTable("personTable")
spark.sql("create table test_table(ID int, name string, city string, age decimal) stored by 'carbondata' tblproperties('sort_columns'='ID')")
spark.sql("insert into test_table select * from personTable")
spark.sql("insert into test_table select * from personTable limit 2")

assert(spark.sql("select * from test_table").count() == 12)
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")
}

test("jvm crash when insert data from datasource table to datasource table") {
val spark = sqlContext.sparkSession
import spark.implicits._

import scala.util.Random
val r = new Random()
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
.toDF("ID", "name", "city", "age")
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")

df.write.format("carbon").saveAsTable("personTable")
spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon")
spark.sql("insert into test_table select * from personTable")
spark.sql("insert into test_table select * from personTable limit 2")

assert(spark.sql("select * from test_table").count() == 12)
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")
}

test("jvm crash when insert data from session table to datasource table") {
val spark = sqlContext.sparkSession
import spark.implicits._

import scala.util.Random
val r = new Random()
val df = spark.sparkContext.parallelize(1 to 10)
.map(x => (r.nextInt(100000), "name" + x % 8, "city" + x % 50, BigDecimal.apply(x % 60)))
.toDF("ID", "name", "city", "age")
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")

df.write
.format("carbondata")
.option("tableName", "personTable")
.mode(SaveMode.Overwrite)
.save()
spark.sql("create table test_table(ID int, name string, city string, age decimal) using carbon")
spark.sql("insert into test_table select * from personTable")
spark.sql("insert into test_table select * from personTable limit 2")

assert(spark.sql("select * from test_table").count() == 12)
spark.sql("DROP TABLE IF EXISTS personTable")
spark.sql("DROP TABLE IF EXISTS test_table")
}

override def afterAll {
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION, CarbonCommonConstants.ENABLE_UNSAFE_IN_QUERY_EXECUTION_DEFAULTVALUE)
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
sql("DROP TABLE IF EXISTS TCarbonSource")
}
}
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.profiler.{GetPartition, Profiler, QueryTaskEnd}
import org.apache.spark.sql.util.SparkSQLUtil.sessionState
Expand Down Expand Up @@ -470,39 +471,28 @@ class CarbonScanRDD[T: ClassTag](
val recorder = CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId())
model.setStatisticsRecorder(recorder)

// TODO: rewrite this logic to call free memory in FailureListener on failures. On success,
// TODO: no memory leak should be there, resources should be freed on success completion.
val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks")
onCompleteCallbacksField.setAccessible(true)
val listeners = onCompleteCallbacksField.get(context)
.asInstanceOf[ArrayBuffer[TaskCompletionListener]]

val isAdded = listeners.exists(p => p.isInstanceOf[InsertTaskCompletionListener])
model.setFreeUnsafeMemory(!isAdded)
// add task completion before calling initialize as initialize method will internally call
// for usage of unsafe method for processing of one blocklet and if there is any exception
// while doing that the unsafe memory occupied for that task will not get cleared
context.addTaskCompletionListener { new QueryTaskCompletionListener(!isAdded,
reader,
inputMetricsStats,
executionId,
taskId,
queryStartTime,
model.getStatisticsRecorder,
split,
queryId)
}
// initialize the reader
reader.initialize(inputSplit, attemptContext)

new Iterator[Any] {
private var havePair = false
private var finished = false
private var first = true

override def hasNext: Boolean = {
if (context.isInterrupted) {
throw new TaskKilledException
}
if (first) {
first = false
addTaskCompletionListener(
split,
context,
queryStartTime,
executionId,
taskId,
model,
reader)
// initialize the reader
reader.initialize(inputSplit, attemptContext)
}
if (!finished && !havePair) {
finished = !reader.nextKeyValue
havePair = !finished
Expand Down Expand Up @@ -534,6 +524,42 @@ class CarbonScanRDD[T: ClassTag](
iterator.asInstanceOf[Iterator[T]]
}

private def addTaskCompletionListener(split: Partition,
context: TaskContext,
queryStartTime: Long,
executionId: String,
taskId: Int,
model: QueryModel,
reader: RecordReader[Void, Object]) = {
// TODO: rewrite this logic to call free memory in FailureListener on failures and
// On success,
// TODO: no memory leak should be there, resources should be freed on
// success completion.
val onCompleteCallbacksField =
context.getClass.getDeclaredField("onCompleteCallbacks")
onCompleteCallbacksField.setAccessible(true)
val listeners = onCompleteCallbacksField.get(context)
.asInstanceOf[ArrayBuffer[TaskCompletionListener]]

val isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener])
model.setFreeUnsafeMemory(!isAdded)
// add task completion before calling initialize as initialize method will internally
// call for usage of unsafe method for processing of one blocklet and if there is any
// exceptionwhile doing that the unsafe memory occupied for that task will not
// get cleared
context.addTaskCompletionListener {
new QueryTaskCompletionListener(!isAdded,
reader,
inputMetricsStats,
executionId,
taskId,
queryStartTime,
model.getStatisticsRecorder,
split,
queryId)
}
}

private def close() {
TaskMetricsMap.getInstance().updateReadBytes(Thread.currentThread().getId)
inputMetricsStats.updateAndClose()
Expand Down
Expand Up @@ -18,16 +18,16 @@
package org.apache.carbondata.spark.rdd

import org.apache.spark.TaskContext
import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonLoadTaskCompletionListener
import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.util.TaskCompletionListener

import org.apache.carbondata.core.util.ThreadLocalTaskInfo
import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
import org.apache.carbondata.spark.util.CommonUtil

class InsertTaskCompletionListener(dataLoadExecutor: DataLoadExecutor,
executorErrors: ExecutionErrors)
extends TaskCompletionListener {
extends CarbonLoadTaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
try {
dataLoadExecutor.close()
Expand Down
Expand Up @@ -21,8 +21,8 @@ import scala.collection.JavaConverters._

import org.apache.hadoop.mapreduce.RecordReader
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.CarbonQueryTaskCompletionListener
import org.apache.spark.sql.profiler.{Profiler, QueryTaskEnd}
import org.apache.spark.util.TaskCompletionListener

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.memory.UnsafeMemoryManager
Expand All @@ -34,7 +34,7 @@ class QueryTaskCompletionListener(freeMemory: Boolean,
var reader: RecordReader[Void, Object],
inputMetricsStats: InitInputMetrics, executionId: String, taskId: Int, queryStartTime: Long,
queryStatisticsRecorder: QueryStatisticsRecorder, split: Partition, queryId: String)
extends TaskCompletionListener {
extends CarbonQueryTaskCompletionListener {
override def onTaskCompletion(context: TaskContext): Unit = {
if (reader != null) {
try {
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.carbondata.execution.datasources

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.memory.MemoryMode
import org.apache.spark.sql._
import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
import org.apache.spark.sql.carbondata.execution.datasources.tasklisteners.{CarbonLoadTaskCompletionListener, CarbonLoadTaskCompletionListenerImpl, CarbonQueryTaskCompletionListener, CarbonQueryTaskCompletionListenerImpl}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
Expand All @@ -37,7 +39,7 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkTypeConverter
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, TaskCompletionListener}

import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
import org.apache.carbondata.common.logging.LogServiceFactory
Expand Down Expand Up @@ -174,6 +176,10 @@ class SparkCarbonFileFormat extends FileFormat
private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
new CarbonTableOutputFormat().getRecordWriter(context)

Option(TaskContext.get()).foreach {c =>
c.addTaskCompletionListener(CarbonLoadTaskCompletionListenerImpl(recordWriter, context))
}

/**
* Write sparks internal row to carbondata record writer
*/
Expand Down Expand Up @@ -388,6 +394,15 @@ class SparkCarbonFileFormat extends FileFormat
val model = format.createQueryModel(split, hadoopAttemptContext)
model.setConverter(new SparkDataTypeConverterImpl)
model.setPreFetchData(false)
var isAdded = false
Option(TaskContext.get()).foreach { context =>
val onCompleteCallbacksField = context.getClass.getDeclaredField("onCompleteCallbacks")
onCompleteCallbacksField.setAccessible(true)
val listeners = onCompleteCallbacksField.get(context)
.asInstanceOf[ArrayBuffer[TaskCompletionListener]]
isAdded = listeners.exists(p => p.isInstanceOf[CarbonLoadTaskCompletionListener])
model.setFreeUnsafeMemory(!isAdded)
}
val carbonReader = if (readVector) {
val vectorizedReader = new VectorizedCarbonRecordReader(model,
null,
Expand All @@ -404,7 +419,11 @@ class SparkCarbonFileFormat extends FileFormat
}

val iter = new RecordReaderIterator(carbonReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
Option(TaskContext.get()).foreach{context =>
context.addTaskCompletionListener(
CarbonQueryTaskCompletionListenerImpl(
iter.asInstanceOf[RecordReaderIterator[InternalRow]], !isAdded))
}

if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) {
iter.asInstanceOf[Iterator[InternalRow]]
Expand Down

0 comments on commit 9ae91cc

Please sign in to comment.