From 78b2e69a37473d65e41613909fbc2db3c386a076 Mon Sep 17 00:00:00 2001 From: akashrn5 Date: Thu, 8 Feb 2018 23:10:43 +0530 Subject: [PATCH] unwanted updatetablestatus files are being created for delete operation where no records are selected --- .../iud/DeleteCarbonTableTestCase.scala | 22 ++++++++++++++++++- .../events/CreateDatabaseEvents.scala | 6 ++++- .../org/apache/spark/util/FileUtils.scala | 5 +++-- .../command/mutation/DeleteExecution.scala | 2 +- .../sql/execution/strategy/DDLStrategy.scala | 2 +- 5 files changed, 31 insertions(+), 6 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala index 6521657a6b1..22aa38559ab 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala @@ -17,9 +17,11 @@ package org.apache.carbondata.spark.testsuite.iud import org.apache.spark.sql.test.util.QueryTest -import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.{CarbonEnv, Row, SaveMode} import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.datastore.impl.FileFactory + class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { override def beforeAll { @@ -180,6 +182,24 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { Seq(Row(1, "abc"), Row(3, "uhj"), Row(4, "frg"))) } + test("test number of update table status files after delete query where no records are deleted") { + sql("drop table if exists update_status_files") + sql("create table update_status_files(name string,age int) stored by 'carbondata'") + sql("insert into update_status_files select 'abc',1") + sql("insert into update_status_files select 'def',2") + sql("insert into update_status_files select 'xyz',4") + sql("insert into update_status_files select 'abc',6") + sql("alter table update_status_files compact 'minor'") + sql("delete from update_status_files where age=3").show() + sql("delete from update_status_files where age=5").show() + val carbonTable = CarbonEnv + .getCarbonTable(Some("iud_db"), "update_status_files")(sqlContext.sparkSession) + val metaPath = carbonTable.getMetaDataFilepath + val files = FileFactory.getCarbonFile(metaPath) + assert(files.listFiles().length == 2) + sql("drop table update_status_files") + } + override def afterAll { sql("use default") diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala index dae22b10b0a..c1d79dbf8e6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala @@ -17,11 +17,15 @@ package org.apache.carbondata.events +import org.apache.spark.SparkContext + case class CreateDatabasePreExecutionEvent(databaseName: String) extends Event with DatabaseEventInfo -case class CreateDatabasePostExecutionEvent(databaseName: String, dataBasePath: String) +case class CreateDatabasePostExecutionEvent(databaseName: String, + dataBasePath: String, + sparkContext: SparkContext) extends Event with DatabaseEventInfo case class CreateDatabaseAbortExecutionEvent(databaseName: String) diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala index 95ba318acd3..12ce17f72a6 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.util import java.io.{File, IOException} import org.apache.hadoop.conf.Configuration +import org.apache.spark.SparkContext import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -106,13 +107,13 @@ object FileUtils { } } - def createDatabaseDirectory(dbName: String, storePath: String) { + def createDatabaseDirectory(dbName: String, storePath: String, sparkContext: SparkContext) { val databasePath: String = storePath + File.separator + dbName.toLowerCase val fileType = FileFactory.getFileType(databasePath) FileFactory.mkdirs(databasePath, fileType) val operationContext = new OperationContext val createDatabasePostExecutionEvent = new CreateDatabasePostExecutionEvent(dbName, - databasePath) + databasePath, sparkContext) OperationListenerBus.getInstance.fireEvent(createDatabasePostExecutionEvent, operationContext) } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala index a8efb84129f..1ac0b3475fd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala @@ -134,7 +134,7 @@ object DeleteExecution { ).collect() // if no loads are present then no need to do anything. - if (res.isEmpty) { + if (res.flatten.isEmpty) { return segmentsTobeDeleted } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 83831e39654..0178716f822 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -91,7 +91,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { case e: NoSuchDatabaseException => CarbonProperties.getStorePath } - FileUtils.createDatabaseDirectory(dbName, dbLocation) + FileUtils.createDatabaseDirectory(dbName, dbLocation, sparkSession.sparkContext) ExecutedCommandExec(createDb) :: Nil case drop@DropDatabaseCommand(dbName, ifExists, isCascade) => ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil