Skip to content

Commit

Permalink
Fix compaction failure issue for SI table and metadata mismatch in co…
Browse files Browse the repository at this point in the history
…ncurrency
  • Loading branch information
akashrn5 committed Jul 20, 2020
1 parent 33c2a57 commit fb107e0
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ListBuffer

import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputSplit, Job}
Expand All @@ -37,6 +38,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
Expand Down Expand Up @@ -93,6 +95,15 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
deletePartialLoadsInCompaction()
val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
loadsToMerge.asScala.foreach { segmentId =>
val segmentLock = CarbonLockFactory
.getCarbonLockObj(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(segmentId.getLoadName) + LockUsage.LOCK)
segmentLock.lockWithRetries()
segmentLocks += segmentLock
}
try {
scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments, compactedLoad)
} catch {
Expand All @@ -117,6 +128,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
Array(compactedLoadToClear))
}
throw e
} finally {
segmentLocks.foreach { segmentLock =>
segmentLock.unlock()
}
}

// scan again and determine if anything is there to merge again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,17 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{CleanFilesPostEvent, Event, OperationContext, OperationEventListener}

class CleanFilesPostEventListener extends OperationEventListener with Logging {
Expand All @@ -54,7 +60,70 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
SegmentStatusManager.deleteLoadsAndUpdateMetadata(
indexTable, true, partitions.map(_.asJava).orNull)
CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
}
}
}

/**
* This method added to clean the segments which are success in SI and may be compacted or marked
* for delete in main table, which can happen in case of concurrent scenarios.
*/
def cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable: CarbonTable,
mainTable: CarbonTable): Unit = {
val mainTableStatusLock: ICarbonLock = CarbonLockFactory
.getCarbonLockObj(mainTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
val indexTableStatusLock: ICarbonLock = CarbonLockFactory
.getCarbonLockObj(indexTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
var mainTableLocked = false
var indexTableLocked = false
try {
mainTableLocked = mainTableStatusLock.lockWithRetries()
indexTableLocked = indexTableStatusLock.lockWithRetries()
if (mainTableLocked && indexTableLocked) {
val mainTableMetadataDetails =
SegmentStatusManager.readLoadMetadata(mainTable.getMetadataPath).toSet ++
SegmentStatusManager.readLoadHistoryMetadata(mainTable.getMetadataPath).toSet
val indexTableMetadataDetails =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath).toSet
val segToStatusMap = mainTableMetadataDetails
.map(detail => detail.getLoadName -> detail.getSegmentStatus).toMap

val unnecessarySegmentsOfSI = indexTableMetadataDetails.filter { indexDetail =>
indexDetail.getSegmentStatus.equals(SegmentStatus.SUCCESS) &&
segToStatusMap.contains(indexDetail.getLoadName) &&
(segToStatusMap(indexDetail.getLoadName).equals(SegmentStatus.COMPACTED) ||
segToStatusMap(indexDetail.getLoadName).equals(SegmentStatus.MARKED_FOR_DELETE))
}
LOGGER.info(s"Unwanted SI segments are: $unnecessarySegmentsOfSI")
unnecessarySegmentsOfSI.foreach { detail =>
val carbonFile = FileFactory
.getCarbonFile(CarbonTablePath
.getSegmentPath(indexTable.getTablePath, detail.getLoadName))
CarbonUtil.deleteFoldersAndFiles(carbonFile)
}
unnecessarySegmentsOfSI.foreach { detail =>
detail.setSegmentStatus(segToStatusMap(detail.getLoadName))
detail.setVisibility("false")
}
indexTableStatusLock.unlock()
mainTableStatusLock.unlock()
CarbonInternalLoaderUtil.recordLoadMetadata(
unnecessarySegmentsOfSI.toList.asJava,
unnecessarySegmentsOfSI.map(_.getLoadName).toList.asJava,
indexTable,
List(indexTable).asJava,
indexTable.getDatabaseName,
indexTable.getTableName
)
}
} catch {
case ex: Exception =>
LOGGER.error("clean up of unwanted SI segments failed", ex)
// ignore the exception
} finally {
indexTableStatusLock.unlock()
mainTableStatusLock.unlock()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.secondaryindex.events
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

import org.apache.log4j.Logger
import org.apache.spark.internal.Logging
Expand All @@ -33,7 +34,7 @@ import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
Expand Down Expand Up @@ -91,6 +92,7 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
if (!isLoadSIForFailedSegments
|| !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
mainTblLoadMetadataDetails,
Expand Down Expand Up @@ -166,8 +168,19 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName)
failedLoadMetadataDetails.add(detail(0))
// in concurrent scenario, if a compaction is going on table, then SI
// segments are updated first in table status and then the main table
// segment, so in any load runs paralley this listener shouldn't consider
// those segments accidentally. So try to take the segment lock.
val segmentLockOfProbableOngngCompactionSeg = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
LockUsage.LOCK)
if (segmentLockOfProbableOngngCompactionSeg.lockWithRetries()) {
segmentLocks += segmentLockOfProbableOngngCompactionSeg
LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName)
failedLoadMetadataDetails.add(detail(0))
}
}
}
})
Expand Down Expand Up @@ -221,6 +234,10 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
LOGGER.error(s"Load to SI table to $indexTableName is failed " +
s"or SI table ENABLE is failed. ", ex)
return
} finally {
segmentLocks.foreach {
segmentLock => segmentLock.unlock()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.secondaryindex.load
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

import org.apache.spark.rdd.CarbonMergeFilesRDD
import org.apache.spark.sql.SQLContext
Expand All @@ -29,6 +30,7 @@ import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.ICarbonLock
import org.apache.carbondata.core.metadata.index.IndexType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
Expand Down Expand Up @@ -60,6 +62,7 @@ object Compactor {
} else {
java.util.Collections.emptyIterator()
}
var allSegmentsLock: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer.empty
while (iterator.hasNext) {
val index = iterator.next()
val indexColumns = index.getValue.get(CarbonCommonConstants.INDEX_COLUMNS).split(",").toList
Expand All @@ -76,11 +79,12 @@ object Compactor {
try {
val segmentToSegmentTimestampMap: util.Map[String, String] = new java.util
.HashMap[String, String]()
val indexCarbonTable = SecondaryIndexCreator
val (indexCarbonTable, segmentLocks) = SecondaryIndexCreator
.createSecondaryIndex(secondaryIndexModel,
segmentToSegmentTimestampMap, null,
forceAccessSegment, isCompactionCall = true,
isLoadToFailedSISegments = false)
allSegmentsLock ++= segmentLocks
CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
indexCarbonTable,
loadsToMerge,
Expand Down Expand Up @@ -138,6 +142,11 @@ object Compactor {
""".stripMargin).collect()
}
throw ex
} finally {
// once compaction is success, release the segment locks
allSegmentsLock.foreach { segmentLock =>
segmentLock.unlock()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object SecondaryIndexCreator {
indexTable: CarbonTable,
forceAccessSegment: Boolean = false,
isCompactionCall: Boolean,
isLoadToFailedSISegments: Boolean): CarbonTable = {
isLoadToFailedSISegments: Boolean): (CarbonTable, ListBuffer[ICarbonLock]) = {
var indexCarbonTable = indexTable
val sc = secondaryIndexModel.sqlContext
// get the thread pool size for secondary index creation
Expand Down Expand Up @@ -302,7 +302,11 @@ object SecondaryIndexCreator {
OperationListenerBus.getInstance
.fireEvent(loadTableSIPostExecutionEvent, operationContext)

indexCarbonTable
if (isCompactionCall) {
(indexCarbonTable, segmentLocks)
} else {
(indexCarbonTable, ListBuffer.empty)
}
} catch {
case ex: Exception =>
FileInternalUtil
Expand All @@ -319,10 +323,6 @@ object SecondaryIndexCreator {
LOGGER.error(ex)
throw ex
} finally {
// release the segment locks
segmentLocks.foreach(segmentLock => {
segmentLock.unlock()
})
// if some segments are skipped, disable the SI table so that
// SILoadEventListenerForFailedSegments will take care to load to these segments in next
// consecutive load to main table.
Expand Down Expand Up @@ -351,6 +351,13 @@ object SecondaryIndexCreator {
if (null != executorService) {
executorService.shutdownNow()
}

// release the segment locks only for load flow
if (!isCompactionCall) {
segmentLocks.foreach(segmentLock => {
segmentLock.unlock()
})
}
}
}

Expand Down

0 comments on commit fb107e0

Please sign in to comment.