Skip to content

Commit

Permalink
Merge be5c74d into 176d6a7
Browse files Browse the repository at this point in the history
  • Loading branch information
kunal642 committed Mar 27, 2019
2 parents 176d6a7 + be5c74d commit c2619d9
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 83 deletions.
Expand Up @@ -36,5 +36,6 @@ public class LockUsage {
public static final String STREAMING_LOCK = "streaming.lock";
public static final String DATAMAP_STATUS_LOCK = "datamapstatus.lock";
public static final String CONCURRENT_LOAD_LOCK = "concurrentload.lock";
public static final String UPDATE_LOCK = "update.lock";

}
Expand Up @@ -54,6 +54,7 @@ import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
import org.apache.carbondata.core.metadata.datatype.DataTypes
Expand Down Expand Up @@ -867,27 +868,34 @@ object CarbonDataRDDFactory {
val lock = CarbonLockFactory.getCarbonLockObj(
carbonTable.getAbsoluteTableIdentifier,
LockUsage.COMPACTION_LOCK)

if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock.")
try {
startCompactionThreads(sqlContext,
carbonLoadModel,
storeLocation,
compactionModel,
lock,
compactedSegments,
operationContext
)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
throw e
val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable
.getAbsoluteTableIdentifier, LockUsage.UPDATE_LOCK)
try {
if (updateLock.lockWithRetries()) {
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock.")
startCompactionThreads(sqlContext,
carbonLoadModel,
storeLocation,
compactionModel,
lock,
compactedSegments,
operationContext
)
} else {
LOGGER.error("Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
}
} else {
throw new ConcurrentOperationException(carbonTable, "update", "compaction")
}
} else {
LOGGER.error("Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
} catch {
case e: Exception =>
LOGGER.error(s"Exception in start compaction thread.", e)
lock.unlock()
throw e
} finally {
updateLock.unlock()
}
}
}
Expand Down
Expand Up @@ -288,31 +288,38 @@ case class CarbonAlterTableCompactionCommand(
val lock = CarbonLockFactory.getCarbonLockObj(
carbonTable.getAbsoluteTableIdentifier,
LockUsage.COMPACTION_LOCK)

if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
CarbonDataRDDFactory.startCompactionThreads(
sqlContext,
carbonLoadModel,
storeLocation,
compactionModel,
lock,
compactedSegments,
operationContext
)
} catch {
case e: Exception =>
LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
lock.unlock()
throw e
val updateLock = CarbonLockFactory.getCarbonLockObj(carbonTable
.getAbsoluteTableIdentifier, LockUsage.UPDATE_LOCK)
try {
if (updateLock.lockWithRetries()) {
if (lock.lockWithRetries()) {
LOGGER.info("Acquired the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonDataRDDFactory.startCompactionThreads(
sqlContext,
carbonLoadModel,
storeLocation,
compactionModel,
lock,
compactedSegments,
operationContext
)
} else {
LOGGER.error(s"Not able to acquire the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonException.analysisException(
"Table is already locked for compaction. Please try after some time.")
}
} else {
throw new ConcurrentOperationException(carbonTable, "update", "compaction")
}
} else {
LOGGER.error(s"Not able to acquire the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonException.analysisException(
"Table is already locked for compaction. Please try after some time.")
} catch {
case e: Exception =>
LOGGER.error(s"Exception in start compaction thread.", e)
lock.unlock()
throw e
} finally {
updateLock.unlock()
}
}
}
Expand Down
Expand Up @@ -79,9 +79,6 @@ private[sql] case class CarbonProjectForUpdateCommand(
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
if (SegmentStatusManager.isCompactionInProgress(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "compaction", "data update")
}
if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "loading", "data update")
}
Expand All @@ -99,6 +96,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
val metadataLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.METADATA_LOCK)
val compactionLock = CarbonLockFactory.getCarbonLockObj(carbonTable
.getAbsoluteTableIdentifier, LockUsage.COMPACTION_LOCK)
var lockStatus = false
// get the current time stamp which should be same for delete and update.
val currentTime = CarbonUpdateUtil.readCurrentTime
Expand All @@ -113,45 +112,47 @@ private[sql] case class CarbonProjectForUpdateCommand(
else {
throw new Exception("Table is locked for updation. Please try after some time")
}
// Get RDD.

dataSet = if (isPersistEnabled) {
Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
}
else {
Dataset.ofRows(sparkSession, plan)
}
val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
if (compactionLock.lockWithRetries()) {
// Get RDD.
dataSet = if (isPersistEnabled) {
Dataset.ofRows(sparkSession, plan).persist(StorageLevel.fromString(
CarbonProperties.getInstance.getUpdateDatasetStorageLevel()))
}
else {
Dataset.ofRows(sparkSession, plan)
}

// handle the clean up of IUD.
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)

// do delete operation.
val segmentsToBeDeleted = DeleteExecution.deleteDeltaExecution(
databaseNameOp,
tableName,
sparkSession,
dataSet.rdd,
currentTime + "",
isUpdateOperation = true,
executionErrors)

if (executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
}

// handle the clean up of IUD.
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)

// do delete operation.
val segmentsToBeDeleted = DeleteExecution.deleteDeltaExecution(
databaseNameOp,
tableName,
sparkSession,
dataSet.rdd,
currentTime + "",
isUpdateOperation = true,
executionErrors)

if (executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
// do update operation.
performUpdate(dataSet,
databaseNameOp,
tableName,
plan,
sparkSession,
currentTime,
executionErrors,
segmentsToBeDeleted)
} else {
throw new ConcurrentOperationException(carbonTable, "compaction", "update")
}

// do update operation.
performUpdate(dataSet,
databaseNameOp,
tableName,
plan,
sparkSession,
currentTime,
executionErrors,
segmentsToBeDeleted)

if (executionErrors.failureCauses != FailureCauses.NONE) {
throw new Exception(executionErrors.errorMsg)
}
Expand Down Expand Up @@ -185,11 +186,11 @@ private[sql] case class CarbonProjectForUpdateCommand(
sys.error("Update operation failed. " + e.getCause.getMessage)
}
sys.error("Update operation failed. please check logs.")
}
finally {
} finally {
if (null != dataSet && isPersistEnabled) {
dataSet.unpersist()
}
compactionLock.unlock()
if (lockStatus) {
CarbonLockUtil.fileUnlock(metadataLock, LockUsage.METADATA_LOCK)
}
Expand Down

0 comments on commit c2619d9

Please sign in to comment.