Skip to content

Commit

Permalink
Merge fe445c5 into d9f1a81
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Dec 10, 2018
2 parents d9f1a81 + fe445c5 commit 4939b1c
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonSession, SQLContext}
import org.apache.spark.sql.execution.command.CompactionModel
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.command.preaaggregate.{AlterTableDropPartitionPostStatusListener, CommitPreAggregateListener, PreAggregateUtil}

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.OperationContext
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.util.CarbonLoaderUtil

/**
* Used to perform compaction on Aggregate data map.
Expand Down Expand Up @@ -79,20 +83,28 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
CarbonSession.threadSet(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP,
"true")
loadCommand.processData(sqlContext.sparkSession)
val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
val newMetadataDetails = SegmentStatusManager.readLoadMetadata(
carbonTable.getMetadataPath, uuid)
val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
case load if loadMetaDataDetails.contains(load) =>
load.setMergedLoadName(mergedLoadName)
load.setSegmentStatus(SegmentStatus.COMPACTED)
load.setModificationOrdeletionTimesStamp(System.currentTimeMillis())
load
case other => other
}
val mergedContent = loadMetaDataDetails.asScala.map {
segment => segment.setSegmentStatus(SegmentStatus.COMPACTED)
segment.setMergedLoadName(mergedLoadName)
segment.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime)
segment
} ++ newMetadataDetails
SegmentStatusManager.writeLoadDetailsIntoFile(
CarbonTablePath.getTableStatusFilePathWithUUID(carbonTable.getTablePath, uuid),
updatedLoadMetaDataDetails)
carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
mergedContent.toArray)
carbonLoadModel.setLoadMetadataDetails((carbonLoadModel.getLoadMetadataDetails.asScala ++
newMetadataDetails).asJava)
// If isCompaction is true then it means that the compaction on aggregate table was
// triggered by the maintable thus no need to commit the tablestatus file but if the
// compaction was triggered directly for aggregate table then commit has to be fired as
// the commit listener would not be called.
val directAggregateCompactionCall = Option(operationContext
.getProperty("isCompaction")).getOrElse("false").toString.toBoolean
if (!directAggregateCompactionCall) {
commitAggregateTableStatus(carbonTable, uuid)
}
} finally {
// check if any other segments needs compaction on in case of MINOR_COMPACTION.
// For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold
Expand All @@ -112,12 +124,7 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
if (!compactionModel.compactionType.equals(CompactionType.MAJOR) &&
!compactionModel.compactionType.equals(CompactionType.CUSTOM)) {
if (!identifySegmentsToBeMerged().isEmpty) {
val uuidTableStaus = CarbonTablePath.getTableStatusFilePathWithUUID(
carbonTable.getTablePath, uuid)
val tableStatus = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
if (!uuidTableStaus.equalsIgnoreCase(tableStatus)) {
FileFactory.getCarbonFile(uuidTableStaus).renameForce(tableStatus)
}
commitAggregateTableStatus(carbonTable, uuid)
executeCompaction()
}
}
Expand All @@ -133,4 +140,30 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
}
}
}
private def commitAggregateTableStatus(carbonTable: CarbonTable, uuid: String) = {
val retryCount = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
val maxTimeout = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
val lockFile: ICarbonLock = new SegmentStatusManager(carbonTable
.getAbsoluteTableIdentifier)
.getTableStatusLock
if (lockFile.lockWithRetries(retryCount, maxTimeout)) {
try {
CommitPreAggregateListener.mergeTableStatusContents(CarbonTablePath
.getTableStatusFilePathWithUUID(carbonTable.getTablePath, uuid), CarbonTablePath
.getTableStatusFilePathWithUUID(carbonTable.getTablePath, ""))
} catch {
case e: Exception =>
LOGGER.error("Unable to write Table status file. ", e)
throw e
} finally {
lockFile.unlock()
}
} else {
throw new RuntimeException("Unable to acquire lock for table status updation")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,8 @@ object CarbonDataRDDFactory {
}
return null
}
val uniqueTableStatusId = operationContext.getProperty("uuid").asInstanceOf[String]
val uniqueTableStatusId = Option(operationContext.getProperty("uuid")).getOrElse("")
.asInstanceOf[String]
if (loadStatus == SegmentStatus.LOAD_FAILURE) {
// update the load entry in table status file for changing the status to marked for delete
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ case class CarbonLoadDataCommand(
case ex: Exception =>
LOGGER.error(ex)
// update the load entry in table status file for changing the status to marked for delete
if (isUpdateTableStatusRequired) {
if (isUpdateTableStatusRequired && !table.isChildDataMap) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
throw ex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser

import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonUtil
Expand Down Expand Up @@ -111,6 +113,29 @@ trait CommitHelper {
}
}

def mergeTableStatusContents(uuidTableStatusPath: String,
tableStatusPath: String): Boolean = {
try {
val tableStatusContents = SegmentStatusManager.readTableStatusFile(tableStatusPath)
val newLoadContent = SegmentStatusManager.readTableStatusFile(uuidTableStatusPath)
val mergedContent = tableStatusContents.collect {
case content =>
val contentIndex = newLoadContent.indexOf(content)
if (contentIndex == -1) {
content
} else {
newLoadContent(contentIndex)
}
}
SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, mergedContent)
true
} catch {
case ex: Exception =>
LOGGER.error("Exception occurred while merging files", ex)
false
}
}

/**
* Used to remove table status files with UUID and segment folders.
*/
Expand Down Expand Up @@ -156,20 +181,19 @@ object AlterTableDropPartitionPostStatusListener extends OperationEventListener
// Generate table status file name without UUID, forExample: tablestatus
val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
childCarbonTable.getTablePath)
renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
mergeTableStatusContents(oldTableSchemaPath, newTableSchemaPath)
}
// if true then the commit for one of the child tables has failed
val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
if (commitFailed) {
LOGGER.info("Reverting table status file to original state")
renamedDataMaps.foreach {
command =>
val carbonTable = command.table
// rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
val backupTableSchemaPath =
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
LOGGER.warn("Reverting table status file to original state")
childCommands.foreach {
childDropCommand =>
val tableStatusPath = CarbonTablePath.getTableStatusFilePath(
childDropCommand.table.getTablePath)
markInProgressSegmentAsDeleted(tableStatusPath,
operationContext,
childDropCommand.table)
}
}
commitFailed
Expand Down Expand Up @@ -200,7 +224,6 @@ object AlterTableDropPartitionMetaListener extends OperationEventListener{
val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent]
val parentCarbonTable = dropPartitionEvent.parentCarbonTable
val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
val sparkSession = SparkSession.getActiveSession.get
if (parentCarbonTable.hasAggregationDataMap) {
// used as a flag to block direct drop partition on aggregate tables fired by the user
operationContext.setProperty("isInternalDropCall", "true")
Expand Down Expand Up @@ -325,7 +348,7 @@ object CompactionProcessMetaListener extends OperationEventListener {
childDataFrame,
false,
sparkSession)
val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
val uuid = Option(operationContext.getProperty("uuid")).getOrElse(UUID.randomUUID()).toString
loadCommand.processMetadata(sparkSession)
operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
operationContext.setProperty("uuid", uuid)
Expand Down Expand Up @@ -460,7 +483,7 @@ object LoadPostAggregateListener extends OperationEventListener {
.asInstanceOf[mutable.ArrayBuffer[AggregationDataMapSchema]]
// sorting the datamap for timeseries rollup
val sortedList = aggregationDataMapList.sortBy(_.getOrdinal)
for (dataMapSchema: AggregationDataMapSchema <- sortedList) {
val successDataMaps = sortedList.takeWhile { dataMapSchema =>
val childLoadCommand = operationContext
.getProperty(dataMapSchema.getChildSchema.getTableName)
.asInstanceOf[CarbonLoadDataCommand]
Expand Down Expand Up @@ -493,9 +516,32 @@ object LoadPostAggregateListener extends OperationEventListener {
isOverwrite,
sparkSession)
}
val loadFailed = successDataMaps.lengthCompare(sortedList.length) != 0
if (loadFailed) {
successDataMaps.foreach(dataMapSchema => markSuccessSegmentsAsFailed(operationContext
.getProperty(dataMapSchema.getChildSchema.getTableName)
.asInstanceOf[CarbonLoadDataCommand]))
throw new RuntimeException(
"Data Load failed for DataMap. Please check logs for the failure")
}
}
}
}

private def markSuccessSegmentsAsFailed(childLoadCommand: CarbonLoadDataCommand) {
val segmentToRevert = childLoadCommand.operationContext
.getProperty(childLoadCommand.table.getTableUniqueName + "_Segment")
val tableStatusPath = CarbonTablePath.getTableStatusFilePath(
childLoadCommand.table.getTablePath)
val tableStatusContents = SegmentStatusManager.readTableStatusFile(tableStatusPath)
val updatedLoadDetails = tableStatusContents.collect {
case content if content.getLoadName == segmentToRevert =>
content.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
content
case others => others
}
SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, updatedLoadDetails)
}
}

/**
Expand Down Expand Up @@ -540,47 +586,63 @@ object CommitPreAggregateListener extends OperationEventListener with CommitHelp
operationContext.getProperty(dataMapSchema.getChildSchema.getTableName + "_Compaction")
.asInstanceOf[CarbonLoadDataCommand]
}
}
}
var commitFailed = false
try {
if (dataMapSchemas.nonEmpty) {
val uuid = operationContext.getProperty("uuid").toString
// keep committing until one fails
val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
val childCarbonTable = childLoadCommand.table
// Generate table status file name with UUID, forExample: tablestatus_1
val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
childCarbonTable.getTablePath, uuid)
// Generate table status file name without UUID, forExample: tablestatus
val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
childCarbonTable.getTablePath)
renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
}
// if true then the commit for one of the child tables has failed
commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.length) != 0
if (commitFailed) {
LOGGER.warn("Reverting table status file to original state")
renamedDataMaps.foreach {
loadCommand =>
val carbonTable = loadCommand.table
// rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
val backupTableSchemaPath =
CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" +
uuid
val tableSchemaPath = CarbonTablePath
.getTableStatusFilePath(carbonTable.getTablePath)
markInProgressSegmentAsDeleted(backupTableSchemaPath, operationContext, carbonTable)
renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
val retryCount = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
val maxTimeout = CarbonLockUtil
.getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
val lockFiles = new scala.collection.mutable.ListBuffer[ICarbonLock]()
try {
val uuid = operationContext.getProperty("uuid").toString
// keep committing until one fails
val renamedDataMaps = childLoadCommands.takeWhile { childLoadCommand =>
val childCarbonTable = childLoadCommand.table
val lockFile: ICarbonLock = new SegmentStatusManager(childCarbonTable
.getAbsoluteTableIdentifier)
.getTableStatusLock
if (lockFile.lockWithRetries(retryCount, maxTimeout)) {
lockFiles.+=(lockFile)
// Generate table status file name with UUID, forExample: tablestatus_1
val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
childCarbonTable.getTablePath, uuid)
// Generate table status file name without UUID, forExample: tablestatus
val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
childCarbonTable.getTablePath)
mergeTableStatusContents(oldTableSchemaPath, newTableSchemaPath)
} else {
false
}
}
}
// after success/failure of commit delete all tablestatus files with UUID in their names.
// if commit failed then remove the segment directory
cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table),
operationContext,
uuid)
operationContext.setProperty("commitComplete", !commitFailed)
if (commitFailed) {
sys.error("Failed to update table status for pre-aggregate table")
// if true then the commit for one of the child tables has failed
commitFailed = renamedDataMaps.lengthCompare(dataMapSchemas.length) != 0
if (commitFailed) {
LOGGER.warn("Reverting table status file to original state")
childLoadCommands.foreach {
childLoadCommand =>
val tableStatusPath = CarbonTablePath.getTableStatusFilePath(
childLoadCommand.table.getTablePath)
markInProgressSegmentAsDeleted(tableStatusPath,
operationContext,
childLoadCommand.table)
}
}
// after success/failure of commit delete all tablestatus files with UUID in their names.
// if commit failed then remove the segment directory
cleanUpStaleTableStatusFiles(childLoadCommands.map(_.table),
operationContext,
uuid)
operationContext.setProperty("commitComplete", !commitFailed)
if (commitFailed) {
sys.error("Failed to update table status for pre-aggregate table")
}
} finally {
lockFiles.foreach(_.unlock())

}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ object PreAggregateUtil {
validateSegments: Boolean,
loadCommand: CarbonLoadDataCommand,
isOverwrite: Boolean,
sparkSession: SparkSession): Unit = {
sparkSession: SparkSession): Boolean = {
CarbonSession.threadSet(
CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." +
Expand All @@ -601,6 +601,11 @@ object PreAggregateUtil {
"true")
try {
loadCommand.processData(sparkSession)
true
} catch {
case ex: Exception =>
LOGGER.error("Data Load failed for DataMap: ", ex)
false
} finally {
CarbonSession.threadUnset(
CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
Expand Down
Loading

0 comments on commit 4939b1c

Please sign in to comment.