Skip to content

Commit

Permalink
[CARBONDATA-2990] Queries slow down after some time due to broadcast …
Browse files Browse the repository at this point in the history
…issue

Problem
It is observed that during consecutive run of queries after some time queries are slowing down. This is causing the degrade in query performance.
No exception is thrown in driver and executor logs but as observed from the logs the time to broadcast hadoop conf is increasing after every query run.

Analysis

This is happening because in carbon SerializableConfiguration class is overriden from spark. Spark registers this class with Kryo serializer and hence the computation using the kryo is fast. The same benefit is not observed in carbondata becuase of overriding the class.
Internal Spark sizeEstimator calculates the size of object and there are few extra objects in carbondata overriden class because of which the computation time is increasing.
Solution
Use the spark class instead of overriding the class in carbondata

This closes #2803
  • Loading branch information
manishgupta88 authored and ravipesala committed Oct 9, 2018
1 parent 19097f2 commit 3c7b339
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 62 deletions.
Expand Up @@ -34,14 +34,14 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.sql.util.SparkSQLUtil.sessionState

import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.rdd.SerializableConfiguration
import org.apache.carbondata.spark.util.CommonUtil

object CsvRDDHelper {
Expand Down Expand Up @@ -110,7 +110,7 @@ object CsvRDDHelper {
closePartition()

// 2. read function
val serializableConfiguration = new SerializableConfiguration(hadoopConf)
val serializableConfiguration = SparkSQLUtil.getSerializableConfigurableInstance(hadoopConf)
val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
override def apply(file: PartitionedFile): Iterator[InternalRow] = {
new Iterator[InternalRow] {
Expand Down
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.TaskContext
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.storage.StorageLevel

import org.apache.carbondata.common.logging.LogServiceFactory
Expand All @@ -35,7 +36,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters}
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.spark.rdd.SerializableConfiguration

/**
* Use sortBy operator in spark to load the data
Expand Down Expand Up @@ -66,7 +66,7 @@ object DataLoadProcessBuilderOnSpark {
val sortStepRowCounter = sc.accumulator(0, "Sort Processor Accumulator")
val writeStepRowCounter = sc.accumulator(0, "Write Processor Accumulator")

val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val conf = SparkSQLUtil.broadCastHadoopConf(sc, hadoopConf)
// 1. Input
val inputRDD = originRDD
.mapPartitions(rows => DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast))
Expand Down Expand Up @@ -121,7 +121,7 @@ object DataLoadProcessBuilderOnSpark {
// 4. Write
sc.runJob(sortRDD, (context: TaskContext, rows: Iterator[CarbonRow]) =>
DataLoadProcessorStepOnSpark.writeFunc(rows, context.partitionId, modelBroadcast,
writeStepRowCounter, conf))
writeStepRowCounter, conf.value.value))

// clean cache only if persisted and keeping unpersist non-blocking as non-blocking call will
// not have any functional impact as spark automatically monitors the cache usage on each node
Expand Down
Expand Up @@ -42,7 +42,7 @@ import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImp
import org.apache.carbondata.processing.sort.sortdata.SortParameters
import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
import org.apache.carbondata.spark.rdd.{NewRddIterator, SerializableConfiguration, StringArrayRow}
import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}

object DataLoadProcessorStepOnSpark {
Expand Down Expand Up @@ -230,8 +230,8 @@ object DataLoadProcessorStepOnSpark {
index: Int,
modelBroadcast: Broadcast[CarbonLoadModel],
rowCounter: Accumulator[Int],
conf: Broadcast[SerializableConfiguration]) {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
conf: Configuration) {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf)
var model: CarbonLoadModel = null
var tableName: String = null
var rowConverter: RowConverterImpl = null
Expand Down
Expand Up @@ -22,7 +22,6 @@ import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.util.SparkSQLUtil
Expand All @@ -49,8 +48,7 @@ abstract class CarbonRDD[T: ClassTag](

@transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()

val config: Broadcast[SerializableConfiguration] = sparkContext
.broadcast(new SerializableConfiguration(hadoopConf))
val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf)

/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient sparkSession: SparkSession, @transient oneParent: RDD[_]) =
Expand Down
Expand Up @@ -54,39 +54,6 @@ import org.apache.carbondata.processing.util.CarbonQueryUtil
import org.apache.carbondata.spark.DataLoadResult
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}

class SerializableConfiguration(@transient var value: Configuration) extends Serializable {

@transient
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

private def writeObject(out: ObjectOutputStream): Unit =
try {
out.defaultWriteObject()
value.write(out)
} catch {
case e: IOException =>
LOGGER.error(e, "Exception encountered")
throw e
case NonFatal(e) =>
LOGGER.error(e, "Exception encountered")
throw new IOException(e)
}


private def readObject(in: ObjectInputStream): Unit =
try {
value = new Configuration(false)
value.readFields(in)
} catch {
case e: IOException =>
LOGGER.error(e, "Exception encountered")
throw e
case NonFatal(e) =>
LOGGER.error(e, "Exception encountered")
throw new IOException(e)
}
}

/**
* This partition class use to split by Host
*
Expand Down
Expand Up @@ -19,11 +19,14 @@ package org.apache.spark.sql.util

import java.lang.reflect.Method

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.{SessionState, SQLConf}
import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil}

object SparkSQLUtil {
def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState
Expand Down Expand Up @@ -99,4 +102,20 @@ object SparkSQLUtil {
throw new UnsupportedOperationException("Spark version not supported")
}
}

/**
* Method to broadcast a variable using spark SerializableConfiguration class
*
* @param sparkContext
* @param hadoopConf
* @return
*/
def broadCastHadoopConf(sparkContext: SparkContext,
hadoopConf: Configuration): Broadcast[SerializableConfiguration] = {
sparkContext.broadcast(getSerializableConfigurableInstance(hadoopConf))
}

def getSerializableConfigurableInstance(hadoopConf: Configuration): SerializableConfiguration = {
new SerializableConfiguration(hadoopConf)
}
}
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.logging.LogServiceFactory
Expand Down Expand Up @@ -728,7 +728,7 @@ object CarbonDataRDDFactory {

// because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
// so segmentIdIndex=partitionId/parallelism, this has been verified.
val conf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val conf = SparkSQLUtil.broadCastHadoopConf(sqlContext.sparkSession.sparkContext, hadoopConf)
partitionByRdd.map(_._2).mapPartitions { partition =>
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
val partitionId = TaskContext.getPartitionId()
Expand Down
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkTypeConverter
import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}

import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
Expand All @@ -44,7 +44,7 @@ import org.apache.carbondata.core.scan.executor.util.QueryUtil
import org.apache.carbondata.core.util.{DataTypeUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.spark.CarbonAliasDecoderRelation
import org.apache.carbondata.spark.rdd.{CarbonRDDWithTableInfo, SerializableConfiguration}
import org.apache.carbondata.spark.rdd.CarbonRDDWithTableInfo

/**
* It decodes the data.
Expand Down Expand Up @@ -76,8 +76,8 @@ case class CarbonDictionaryDecoder(
(carbonTable.getTableName, carbonTable)
}.toMap

val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
.sessionState.newHadoopConf()))
val conf = SparkSQLUtil
.broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
if (CarbonDictionaryDecoder.isRequiredToDecode(getDictionaryColumnIds)) {
val dataTypes = child.output.map { attr => attr.dataType }
child.execute().mapPartitions { iter =>
Expand Down
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, FindDataSou
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
Expand Down Expand Up @@ -78,7 +79,7 @@ import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataPro
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark}
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, SerializableConfiguration}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil}

case class CarbonLoadDataCommand(
Expand Down Expand Up @@ -986,8 +987,8 @@ case class CarbonLoadDataCommand(
array
}
}
val conf = sparkSession.sparkContext
.broadcast(new SerializableConfiguration(sparkSession.sessionState.newHadoopConf()))
val conf = SparkSQLUtil
.broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
val finalRDD = convertRDD.mapPartitionsWithIndex { case(index, rows) =>
DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
Expand Down
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.SparkSQLUtil

import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
Expand All @@ -49,7 +50,6 @@ import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.exception.MultipleMatchingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.spark.DeleteDelataResultImpl
import org.apache.carbondata.spark.rdd.SerializableConfiguration

object DeleteExecution {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
Expand Down Expand Up @@ -120,9 +120,8 @@ object DeleteExecution {
blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
keyRdd.partitions.length)

val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
.sessionState.newHadoopConf()))

val conf = SparkSQLUtil
.broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
val rdd = rowContRdd.join(keyRdd)
res = rdd.mapPartitionsWithIndex(
(index: Int, records: Iterator[((String), (RowCountDetailsVO, Iterable[Row]))]) =>
Expand Down
Expand Up @@ -26,16 +26,16 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.command.AlterTableModel
import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.util.SparkSQLUtil

import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.{ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
import org.apache.carbondata.spark.rdd.SerializableConfiguration

object HorizontalCompaction {

Expand Down Expand Up @@ -191,8 +191,8 @@ object HorizontalCompaction {

val timestamp = factTimeStamp
val updateStatusDetails = segmentUpdateStatusManager.getUpdateStatusDetails
val conf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(sparkSession
.sessionState.newHadoopConf()))
val conf = SparkSQLUtil
.broadCastHadoopConf(sparkSession.sparkContext, sparkSession.sessionState.newHadoopConf())
val result = rdd1.mapPartitions(iter =>
new Iterator[Seq[CarbonDataMergerUtilResult]] {
ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
Expand Down

0 comments on commit 3c7b339

Please sign in to comment.