From 3642bfc696d283b929930b7e70a15157107e5193 Mon Sep 17 00:00:00 2001 From: mohammadshahidkhan Date: Tue, 16 Jan 2018 11:49:54 +0530 Subject: [PATCH] [CARBONDATA-2049] CarbonCleanFilesCommand table path problem --- .../apache/carbondata/api/CarbonStore.scala | 32 +++++++++++++------ .../management/CarbonCleanFilesCommand.scala | 25 ++++++++------- .../org/apache/spark/util/CleanFiles.scala | 32 +++++++++++++------ .../spark/util/CarbonCommandSuite.scala | 3 +- 4 files changed, 59 insertions(+), 33 deletions(-) diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index d514f772314..c02ba0aa6e9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -98,34 +98,46 @@ object CarbonStore { } } + /** + * The method deletes all data if forceTableCLean and lean garbage segment + * (MARKED_FOR_DELETE state) if forceTableCLean + * + * @param dbName : Database name + * @param tableName : Table name + * @param tablePath : Table path + * @param carbonTable : CarbonTable Object in case of force clean + * @param forceTableClean : for force clean it will delete all data + * it will clean garbage segment (MARKED_FOR_DELETE state) + * @param currentTablePartitions : Hive Partitions details + */ def cleanFiles( dbName: String, tableName: String, - storePath: String, + tablePath: String, carbonTable: CarbonTable, forceTableClean: Boolean, currentTablePartitions: Option[Seq[String]] = None): Unit = { LOGGER.audit(s"The clean files request has been received for $dbName.$tableName") var carbonCleanFilesLock: ICarbonLock = null - var absoluteTableIdentifier: AbsoluteTableIdentifier = null - if (forceTableClean) { - absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tableName) + val absoluteTableIdentifier = if (forceTableClean) { + AbsoluteTableIdentifier.from(tablePath, dbName, tableName) } else { - absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier + carbonTable.getAbsoluteTableIdentifier } try { val errorMsg = "Clean files request is failed for " + s"$dbName.$tableName" + ". Not able to acquire the clean files lock due to another clean files " + "operation is running in the background." - carbonCleanFilesLock = - CarbonLockUtil.getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) + // in case of force clean the lock is not required if (forceTableClean) { - val absIdent = AbsoluteTableIdentifier.from(storePath, dbName, tableName) FileFactory.deleteAllCarbonFilesOfDir( - FileFactory.getCarbonFile(absIdent.getTablePath, - FileFactory.getFileType(absIdent.getTablePath))) + FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath, + FileFactory.getFileType(absoluteTableIdentifier.getTablePath))) } else { + carbonCleanFilesLock = + CarbonLockUtil + .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) DataLoadingUtil.deleteLoadsAndUpdateMetadata( isForceDeletion = true, carbonTable) CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index 303c3efc345..4b68bd022a2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.spark.util.CommonUtil @@ -70,12 +70,13 @@ case class CarbonCleanFilesCommand( databaseNameOp: Option[String], tableName: String): Unit = { val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession) val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession) + val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName CarbonStore.cleanFiles( - dbName, - tableName, - databaseLocation, - null, - forceTableClean) + dbName = dbName, + tableName = tableName, + tablePath = tablePath, + carbonTable = null, // in case of delete all data carbonTable is not required. + forceTableClean = forceTableClean) } private def cleanGarbageData(sparkSession: SparkSession, @@ -90,12 +91,12 @@ case class CarbonCleanFilesCommand( None } CarbonStore.cleanFiles( - CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), - tableName, - CarbonProperties.getStorePath, - carbonTable, - forceTableClean, - partitions) + dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession), + tableName = tableName, + tablePath = carbonTable.getTablePath, + carbonTable = carbonTable, + forceTableClean = forceTableClean, + currentTablePartitions = partitions) } // Clean garbage data in all tables in all databases diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala index eba7dcd378c..d4d9a8486a0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala @@ -29,19 +29,30 @@ import org.apache.carbondata.api.CarbonStore object CleanFiles { /** - * Clean the stale segments from table - * @param spark - * @param dbName - * @param tableName - * @param storePath - * @param forceTableClean if true, it deletes the table and its contents with force.It does not + * The method deletes all data if forceTableCLean and lean garbage segment + * (MARKED_FOR_DELETE state) if forceTableCLean + * + * @param spark : Database name + * @param dbName : Table name + * @param tableName : Table path + * @param forceTableClean : if true, it deletes the table and its contents with force.It does not * drop table from hive metastore so should be very careful to use it. */ def cleanFiles(spark: SparkSession, dbName: String, tableName: String, - storePath: String, forceTableClean: Boolean = false): Unit = { + forceTableClean: Boolean = false): Unit = { TableAPIUtil.validateTableExists(spark, dbName, tableName) - val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark) - CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable, forceTableClean) + val tablePath = CarbonEnv.getTablePath(Some(dbName), tableName)(spark) + val carbonTable = if (!forceTableClean) { + CarbonEnv.getCarbonTable(Some(dbName), tableName)(spark) + } else { + null + } + CarbonStore.cleanFiles( + dbName = dbName, + tableName = tableName, + tablePath = tablePath, + carbonTable = carbonTable, + forceTableClean = forceTableClean) } def main(args: Array[String]): Unit = { @@ -60,6 +71,7 @@ object CleanFiles { val spark = TableAPIUtil.spark(storePath, s"CleanFiles: $dbName.$tableName") CarbonEnv.getInstance(spark).carbonMetastore. checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName))) - cleanFiles(spark, dbName, tableName, storePath, forceTableClean) + + cleanFiles(spark, dbName, tableName, forceTableClean) } } diff --git a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala index e4931797fdc..8ff6cab7291 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/util/CarbonCommandSuite.scala @@ -142,7 +142,8 @@ class CarbonCommandSuite extends Spark2QueryTest with BeforeAndAfterAll { dropTable(table) createAndLoadTestTable(table, "csv_table") CleanFiles.main(Array(s"${location}", table, "true")) - val tablePath = s"${location}${File.separator}default${File.separator}$table" + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", table) + val tablePath = carbonTable.getTablePath val f = new File(tablePath) assert(!f.exists())