Skip to content

Commit

Permalink
Parallize datamap rebuild processing for segments
Browse files Browse the repository at this point in the history
Currently in carbondata, while rebuilding datamap, one spark job will be
started for each segment and all the jobs are executed serailly. If we
have many historical segments, the rebuild will takes a lot of time.

Here we optimize the procedure for datamap rebuild and start one start
for each segments, all the tasks can be done in parallel in one spark
job.
  • Loading branch information
xuchuanyin committed Jul 10, 2018
1 parent f4a58c5 commit 3eb3f8c
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ public void clearDataMaps(String tableUniqName) {
}

/**
* Clear the datamap/datamaps of a table from memory
* Clear the datamap/datamaps of a table from memory and disk
*
* @param identifier Table identifier
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
protected QueryExecutor queryExecutor;
private InputMetricsStats inputMetricsStats;

/**
* Whether to clear datamap when reader is closed. In some scenarios such as datamap rebuild,
* we will set it to true and will clear the datamap after rebuild
*/
private boolean skipClearDataMapAtClose = false;

public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport,
InputMetricsStats inputMetricsStats) {
this(queryModel, readSupport);
Expand Down Expand Up @@ -122,9 +128,11 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
CarbonUtil.clearDictionaryCache(entry.getValue());
}
}
// Clear the datamap cache
DataMapStoreManager.getInstance()
.clearDataMaps(queryModel.getTable().getAbsoluteTableIdentifier());
if (!skipClearDataMapAtClose) {
// Clear the datamap cache
DataMapStoreManager.getInstance().clearDataMaps(
queryModel.getTable().getAbsoluteTableIdentifier());
}
// close read support
readSupport.close();
try {
Expand All @@ -133,4 +141,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throw new IOException(e);
}
}

public void setSkipClearDataMapAtClose(boolean skipClearDataMapAtClose) {
this.skipClearDataMapAtClose = skipClearDataMapAtClose;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class C2DataMapFactory(
* delete datamap data if any
*/
override def deleteDatamapData(): Unit = {
???
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class FGDataMapFactory(carbonTable: CarbonTable,
* delete datamap data if any
*/
override def deleteDatamapData(): Unit = {
???

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,14 @@ class RestructureResultImpl extends RestructureResult[Int, Boolean] {
}

trait RefreshResult[K, V] extends Serializable {
def getKey(key: String, value: Boolean): (K, V)
/**
* Previously index datamap refresh is per segment, for CARBONDATA-2685 it will refresh
* all segments in a batch. The structure is taskNo -> (segmentNo, status)
*/
def getKey(key: String, value: (String, Boolean)): (K, V)
}

class RefreshResultImpl extends RefreshResult[String, Boolean] {
override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value)
class RefreshResultImpl extends RefreshResult[String, (String, Boolean)] {
override def getKey(key: String,
value: (String, Boolean)): (String, (String, Boolean)) = (key, value)
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
* Helper object to rebuild the index DataMap
*/
object IndexDataMapRebuildRDD {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)

/**
* Rebuild the datamap for all existing data in the table
Expand All @@ -78,56 +79,56 @@ object IndexDataMapRebuildRDD {
tableIdentifier,
mutable.Seq[String](schema.getDataMapName))
OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent, operationContext)
// loop all segments to rebuild DataMap
validSegments.asScala.foreach { segment =>
// if lucene datamap folder is exists, not require to build lucene datamap again
refreshOneSegment(sparkSession, carbonTable, schema.getDataMapName,
indexedCarbonColumns, segment);
}
val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession,
tableIdentifier)
OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext)
}

private def refreshOneSegment(
sparkSession: SparkSession,
carbonTable: CarbonTable,
dataMapName: String,
indexColumns: java.util.List[CarbonColumn],
segment: Segment): Unit = {
val segments2DmStorePath = validSegments.asScala.map { segment =>
val dataMapStorePath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath,
segment.getSegmentNo, schema.getDataMapName)
segment -> dataMapStorePath
}.filter(p => !FileFactory.isFileExist(p._2)).toMap

val dataMapStorePath = CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath,
segment.getSegmentNo, dataMapName)
segments2DmStorePath.foreach { case (_, dmPath) =>
if (!FileFactory.mkdirs(dmPath, FileFactory.getFileType(dmPath))) {
throw new IOException(
s"Failed to create directory $dmPath for rebuilding datamap ${ schema.getDataMapName }")
}
}

if (!FileFactory.isFileExist(dataMapStorePath)) {
if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) {
try {
val status = new IndexDataMapRebuildRDD[String, Boolean](
sparkSession,
new RefreshResultImpl(),
carbonTable.getTableInfo,
dataMapName,
indexColumns.asScala.toArray,
segment
).collect()

status.find(_._2 == false).foreach { task =>
throw new Exception(
s"Task Failed to rebuild datamap $dataMapName on segment_${segment.getSegmentNo}")
}
} catch {
case ex: Throwable =>
// process failure
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(dataMapStorePath))
throw new Exception(
s"Failed to refresh datamap $dataMapName on segment_${segment.getSegmentNo}", ex)
val status = new IndexDataMapRebuildRDD[String, (String, Boolean)](
sparkSession,
new RefreshResultImpl(),
carbonTable.getTableInfo,
schema.getDataMapName,
indexedCarbonColumns.asScala.toArray,
segments2DmStorePath.keySet
).collect

// for failed segments, clean the result
val failedSegments = status
.find { case (taskId, (segmentId, rebuildStatus)) =>
!rebuildStatus
}
.map { task =>
val segmentId = task._2._1
val dmPath = segments2DmStorePath.filter(p => p._1.getSegmentNo.equals(segmentId)).values
val cleanResult = dmPath.map(p =>
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(p)))
if (cleanResult.exists(!_)) {
LOGGER.error(s"Failed to clean up datamap store for segment_$segmentId")
false
} else {
true
}
} else {
throw new IOException(s"Failed to create directory $dataMapStorePath")
}

if (failedSegments.nonEmpty) {
throw new Exception(s"Failed to refresh datamap ${ schema.getDataMapName }")
}
}
DataMapStoreManager.getInstance().clearDataMaps(tableIdentifier)

val buildDataMapPostExecutionEvent = new BuildDataMapPostExecutionEvent(sparkSession,
tableIdentifier)
OperationListenerBus.getInstance().fireEvent(buildDataMapPostExecutionEvent, operationContext)
}
}

class OriginalReadSupport(dataTypes: Array[DataType]) extends CarbonReadSupport[Array[Object]] {
Expand Down Expand Up @@ -254,7 +255,7 @@ class IndexDataMapRebuildRDD[K, V](
@transient tableInfo: TableInfo,
dataMapName: String,
indexColumns: Array[CarbonColumn],
segment: Segment
segments: Set[Segment]
) extends CarbonRDDWithTableInfo[(K, V)](
session.sparkContext, Nil, tableInfo.serialize()) {

Expand All @@ -274,11 +275,12 @@ class IndexDataMapRebuildRDD[K, V](
val inputMetrics = new CarbonInputMetrics
TaskMetricsMap.getInstance().registerThreadCallback()
val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
val segment = inputSplit.getAllSplits.get(0).getSegment
inputMetrics.initBytesReadCallback(context, inputSplit)

val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
val format = createInputFormat(attemptContext)
val format = createInputFormat(segment, attemptContext)

val model = format.createQueryModel(inputSplit, attemptContext)
// one query id per table
Expand All @@ -290,8 +292,7 @@ class IndexDataMapRebuildRDD[K, V](
var refresher: DataMapBuilder = null
try {
val segmentPropertiesFetcher = DataMapStoreManager.getInstance().getDataMap(carbonTable,
BlockletDataMapFactory.DATA_MAP_SCHEMA)
.getDataMapFactory
BlockletDataMapFactory.DATA_MAP_SCHEMA).getDataMapFactory
.asInstanceOf[SegmentPropertiesFetcher]
val segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment)

Expand All @@ -308,6 +309,8 @@ class IndexDataMapRebuildRDD[K, V](
}
reader = new CarbonRecordReader[Array[Object]](model, readSupport, inputMetrics)
reader.initialize(inputSplit, attemptContext)
// skip clear datamap and we will do this adter rebuild
reader.setSkipClearDataMapAtClose(true)

var blockletId = 0
var firstRow = true
Expand Down Expand Up @@ -360,13 +363,13 @@ class IndexDataMapRebuildRDD[K, V](

override def next(): (K, V) = {
finished = true
result.getKey(split.index.toString, status)
result.getKey(split.index.toString, (segment.getSegmentNo, status))
}
}
}


private def createInputFormat(
private def createInputFormat(segment: Segment,
attemptContext: TaskAttemptContextImpl) = {
val format = new CarbonTableInputFormat[Object]
val tableInfo1 = getTableInfo
Expand Down Expand Up @@ -405,7 +408,7 @@ class IndexDataMapRebuildRDD[K, V](

CarbonInputFormat.setSegmentsToAccess(
job.getConfiguration,
Segment.toSegmentList(Array(segment.getSegmentNo), null))
Segment.toSegmentList(segments.map(_.getSegmentNo).toArray, null))

CarbonInputFormat.setTableInfo(
job.getConfiguration,
Expand All @@ -424,7 +427,7 @@ class IndexDataMapRebuildRDD[K, V](
.getSplits(job)
.asScala
.map(_.asInstanceOf[CarbonInputSplit])
.groupBy(_.taskId)
.groupBy(p => (p.getSegmentId, p.taskId))
.map { group =>
new CarbonMultiBlockSplit(
group._2.asJava,
Expand Down

0 comments on commit 3eb3f8c

Please sign in to comment.