From 2dbe03ae0bc19a1912847192118c9fc3eabf0f7d Mon Sep 17 00:00:00 2001 From: kushalsaha Date: Mon, 11 Dec 2017 21:11:21 +0530 Subject: [PATCH] SDV test case failure fix --- .../sdv/generated/DataLoadingTestCase.scala | 64 ++++++--- .../resources/overwriteTable1_noRecord.csv | 1 + .../resources/overwriteTable1_someRecord.csv | 6 + .../resources/overwriteTable2_noRecord.csv | 1 + .../resources/overwriteTable2_someRecord.csv | 4 + .../InsertIntoCarbonTableTestCase.scala | 122 ++++++++++++++++++ .../spark/sql/test/util/QueryTest.scala | 1 + .../spark/rdd/CarbonDataRDDFactory.scala | 38 ++++-- .../management/CarbonLoadDataCommand.scala | 8 -- .../processing/util/CarbonLoaderUtil.java | 23 +++- 10 files changed, 223 insertions(+), 45 deletions(-) create mode 100644 integration/spark-common-test/src/test/resources/overwriteTable1_noRecord.csv create mode 100644 integration/spark-common-test/src/test/resources/overwriteTable1_someRecord.csv create mode 100644 integration/spark-common-test/src/test/resources/overwriteTable2_noRecord.csv create mode 100644 integration/spark-common-test/src/test/resources/overwriteTable2_someRecord.csv diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala index c8c88e2e0e3..4065f1af049 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala @@ -21,6 +21,8 @@ package org.apache.carbondata.cluster.sdv.generated import java.text.SimpleDateFormat import java.util.Date +import scala.collection.script.Include + import org.apache.spark.sql.Row import org.apache.spark.sql.common.util._ import org.apache.spark.sql.test.TestQueryExecutor @@ -227,38 +229,60 @@ class DataLoadingTestCase extends QueryTest with BeforeAndAfterAll { //Data load-->Extra_Column_incsv test("BadRecord_Dataload_019", Include) { - sql(s"""CREATE TABLE exceed_column_in_Csv (CUST_NAME String,date timestamp) STORED BY 'org.apache.carbondata.format'""").collect - intercept[Exception] { - sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/extra_column.csv' into table exceed_column_in_Csv OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='CUST_NAME,date')""").collect - checkAnswer( - s"""select count(*) from exceed_column_in_Csv """, - Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_019") - } - sql(s"""drop table exceed_column_in_Csv """).collect + sql( + s"""CREATE TABLE exceed_column_in_Csv (CUST_NAME String,date timestamp) STORED BY 'org + |.apache.carbondata.format'""".stripMargin) + .collect + sql( + s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/extra_column.csv' into table + |exceed_column_in_Csv OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"', + |'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT', + |'FILEHEADER'='CUST_NAME,date')""".stripMargin) + .collect + checkAnswer( + s"""select count(*) from exceed_column_in_Csv """, + Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_019") + + sql(s"""drop table exceed_column_in_Csv """).collect } //Data load-->Timestamp Exceed Range test("BadRecord_Dataload_020", Include) { - sql(s"""CREATE TABLE timestamp_range (date timestamp) STORED BY 'org.apache.carbondata.format'""").collect - intercept[Exception] { - sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/timetsmap.csv' into table timestamp_range OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='date')""").collect - } - checkAnswer(s"""select count(*) from timestamp_range""", + sql( + s"""CREATE TABLE timestamp_range (date timestamp) STORED BY 'org.apache.carbondata + |.format'""".stripMargin) + .collect + sql( + s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/timetsmap.csv' into table + |timestamp_range OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"', + |'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT', + |'FILEHEADER'='date')""".stripMargin) + .collect + checkAnswer( + s"""select count(*) from timestamp_range""", Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_020") - sql(s"""drop table timestamp_range""").collect + sql(s"""drop table timestamp_range""").collect } //Show loads-->Delimeter_check test("BadRecord_Dataload_021", Include) { - sql(s"""CREATE TABLE bad_records_test5 (String_col string,integer_col int,decimal_column decimal,date timestamp,double_col double) STORED BY 'org.apache.carbondata.format'""").collect - intercept[Exception] { - sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/badrecords_test5.csv' into table bad_records_test5 OPTIONS('DELIMITER'='*' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='IGNORE','FILEHEADER'='String_col,integer_col,decimal_column,date,double_col') """).collect - } - checkAnswer(s"""select count(*) from bad_records_test5""", + sql( + s"""CREATE TABLE bad_records_test5 (String_col string,integer_col int,decimal_column + |decimal,date timestamp,double_col double) STORED BY 'org.apache.carbondata.format'""" + .stripMargin) + .collect + sql( + s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/badrecords_test5.csv' into table + |bad_records_test5 OPTIONS('DELIMITER'='*' , 'QUOTECHAR'='"', + |'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='IGNORE', + |'FILEHEADER'='String_col,integer_col,decimal_column,date,double_col') """.stripMargin) + .collect + checkAnswer( + s"""select count(*) from bad_records_test5""", Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_021") - sql(s"""drop table bad_records_test5 """).collect + sql(s"""drop table bad_records_test5 """).collect } diff --git a/integration/spark-common-test/src/test/resources/overwriteTable1_noRecord.csv b/integration/spark-common-test/src/test/resources/overwriteTable1_noRecord.csv new file mode 100644 index 00000000000..5aa5c9496ec --- /dev/null +++ b/integration/spark-common-test/src/test/resources/overwriteTable1_noRecord.csv @@ -0,0 +1 @@ +id,name,salary diff --git a/integration/spark-common-test/src/test/resources/overwriteTable1_someRecord.csv b/integration/spark-common-test/src/test/resources/overwriteTable1_someRecord.csv new file mode 100644 index 00000000000..4f026890579 --- /dev/null +++ b/integration/spark-common-test/src/test/resources/overwriteTable1_someRecord.csv @@ -0,0 +1,6 @@ +id,name,salary +1,hello,2300 +2,hi,2500 +3,xyz,4000 +4,xyz1,5000 +5,xyz2,6000 \ No newline at end of file diff --git a/integration/spark-common-test/src/test/resources/overwriteTable2_noRecord.csv b/integration/spark-common-test/src/test/resources/overwriteTable2_noRecord.csv new file mode 100644 index 00000000000..a5da1cf70bf --- /dev/null +++ b/integration/spark-common-test/src/test/resources/overwriteTable2_noRecord.csv @@ -0,0 +1 @@ +id,name,salary,age \ No newline at end of file diff --git a/integration/spark-common-test/src/test/resources/overwriteTable2_someRecord.csv b/integration/spark-common-test/src/test/resources/overwriteTable2_someRecord.csv new file mode 100644 index 00000000000..3798b2a8e5f --- /dev/null +++ b/integration/spark-common-test/src/test/resources/overwriteTable2_someRecord.csv @@ -0,0 +1,4 @@ +id,name,salary,age +9,abc,48,20 +10,abc1,90,21 +11,abc2,99,22 \ No newline at end of file diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala index 137e1cbc513..c9c8a590db3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala @@ -18,12 +18,15 @@ package org.apache.carbondata.spark.testsuite.allqueries import java.io.File +import org.apache.spark.sql.Row import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.spark.sql.test.util.QueryTest +import org.apache.carbondata.core.datastore.filesystem.CarbonFile +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.path.CarbonStorePath @@ -38,6 +41,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table if exists THive") sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','") sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE THive") + sql("drop table if exists OverwriteTable_t1") + sql("drop table if exists OverwriteTable_t2") } test("insert from hive") { @@ -276,6 +281,121 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { } sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')") assert(rowCount == sql("select imei from TCarbonSourceOverwrite").count()) + + } + + test("insert overwrite in group by scenario with t1 no record and t2 no record") { + queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_noRecord.csv") + sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary") + val exists_t1 = checkSegment("OverwriteTable_t1") + val exists_t2 = checkSegment("OverwriteTable_t2") + assert(!exists_t1) + assert(!exists_t2) + assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count()) + checkAnswer(sql("select * from OverwriteTable_t2"), + Seq()) + checkAnswer(sql("select * from OverwriteTable_t1"), + sql("select * from OverwriteTable_t2")) + } + + + test("insert overwrite in group by scenario with t1 no record and t2 some record") { + queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_someRecord.csv") + sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary") + val exists_t1 = checkSegment("OverwriteTable_t1") + val exists_t2 = checkSegment("OverwriteTable_t2") + assert(!exists_t1) + assert(!exists_t2) + assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count()) + checkAnswer(sql("select * from OverwriteTable_t2"), + Seq()) + checkAnswer(sql("select * from OverwriteTable_t1"), + sql("select * from OverwriteTable_t2")) + } + + test("insert overwrite in group by scenario having record in both table") { + queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_someRecord.csv") + sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary") + val exists_t1 = checkSegment("OverwriteTable_t1") + val exists_t2 = checkSegment("OverwriteTable_t2") + assert(exists_t1) + assert(exists_t2) + assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count()) + checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2")) + } + + test("insert overwrite in group by scenario t1 some record and t2 no record") { + queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_noRecord.csv") + sql("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary") + val exists_t1 = checkSegment("OverwriteTable_t1") + val exists_t2 = checkSegment("OverwriteTable_t2") + assert(exists_t1) + assert(exists_t2) + assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count()) + checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2")) + } + + test("insert overwrite without group by scenario t1 no record and t2 no record") { + queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_noRecord.csv") + sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1") + val exists_t1 = checkSegment("OverwriteTable_t1") + val exists_t2 = checkSegment("OverwriteTable_t2") + assert(!exists_t1) + assert(!exists_t2) + assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count()) + checkAnswer(sql("select * from OverwriteTable_t2"), + Seq()) + checkAnswer(sql("select * from OverwriteTable_t1"), + sql("select * from OverwriteTable_t2")) + } + + + test("insert overwrite without group by scenario with t1 no record and t2 some record") { + queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_someRecord.csv") + sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1") + val exists_t1 = checkSegment("OverwriteTable_t1") + val exists_t2 = checkSegment("OverwriteTable_t2") + assert(!exists_t1) + assert(!exists_t2) + assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count()) + checkAnswer(sql("select * from OverwriteTable_t2"), + Seq()) + checkAnswer(sql("select * from OverwriteTable_t1"), + sql("select * from OverwriteTable_t2")) + } + + test("insert overwrite without group by scenario having record in both table") { + queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_someRecord.csv") + sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1") + val exists_t1 = checkSegment("OverwriteTable_t1") + val exists_t2 = checkSegment("OverwriteTable_t2") + assert(exists_t1) + assert(exists_t2) + assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count()) + checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2")) + } + + private def queryExecution(csvFileName1: String , csvFileName2: String) : Unit ={ + sql("drop table if exists OverwriteTable_t1") + sql("drop table if exists OverwriteTable_t2") + sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ") + sql("LOAD DATA INPATH '" + resourcesPath + s"/$csvFileName1' INTO table OverwriteTable_t1") + sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'") + sql("LOAD DATA INPATH '" + resourcesPath + s"/$csvFileName2' INTO table OverwriteTable_t2") + } + + + + private def checkSegment(tableName: String) : Boolean ={ + val storePath_t1 = metastoredb + s"/warehouse/${tableName.toLowerCase()}/Fact/Part0" + val carbonFile_t1: CarbonFile = FileFactory + .getCarbonFile(storePath_t1, FileFactory.getFileType(storePath_t1)) + var exists: Boolean = carbonFile_t1.exists() + if (exists) { + val listFiles: Array[CarbonFile] = carbonFile_t1.listFiles() + exists = listFiles.size > 0 + } + exists } test("test show segments after clean files for insert overwrite") { @@ -309,6 +429,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS student") sql("DROP TABLE IF EXISTS uniqdata") sql("DROP TABLE IF EXISTS show_insert") + sql("drop table if exists OverwriteTable_t1") + sql("drop table if exists OverwriteTable_t2") if (timeStampPropOrig != null) { CarbonProperties.getInstance() diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala index c01cfef6a78..0079d1e4b3e 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala @@ -104,6 +104,7 @@ class QueryTest extends PlanTest { lazy val storeLocation = CarbonProperties.getInstance(). getProperty(CarbonCommonConstants.STORE_LOCATION) val resourcesPath = TestQueryExecutor.resourcesPath + val metastoredb = TestQueryExecutor.metastoredb val integrationPath = TestQueryExecutor.integrationPath val dblocation = TestQueryExecutor.location } diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 7955e71832f..80cc898080a 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -375,7 +375,17 @@ object CarbonDataRDDFactory { } } } else { - loadStatus = SegmentStatus.LOAD_FAILURE + // if no value is there in data load, make load status Success + // and data load flow executes + if (dataFrame.isDefined && updateModel.isEmpty) { + val rdd = dataFrame.get.rdd + if (rdd.partitions == null || rdd.partitions.length == 0) { + LOGGER.warn("DataLoading finished. No data was loaded.") + loadStatus = SegmentStatus.SUCCESS + } + } else { + loadStatus = SegmentStatus.LOAD_FAILURE + } } if (loadStatus != SegmentStatus.LOAD_FAILURE && @@ -483,20 +493,21 @@ object CarbonDataRDDFactory { s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") throw new Exception(status(0)._2._2.errorMsg) } - // if segment is empty then fail the data load + // as no record loaded in new segment, new segment should be deleted + var newEntryLoadStatus = if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap && !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) { - // update the load entry in table status file for changing the status to marked for delete - CommonUtil.updateTableStatusForFailure(carbonLoadModel) - LOGGER.info("********starting clean up**********") - CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt) - LOGGER.info("********clean up done**********") + LOGGER.audit(s"Data load is failed for " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" + " as there is no data to load") LOGGER.warn("Cannot write load metadata file as data load failed") - throw new Exception("No Data to load") + + SegmentStatus.MARKED_FOR_DELETE + } else { + loadStatus } + writeDictionary(carbonLoadModel, result, writeAll = false) val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent( @@ -505,7 +516,11 @@ object CarbonDataRDDFactory { carbonLoadModel) operationContext.setProperty("isOverwrite", overwriteTable) OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext) - val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable) + val done = updateTableStatus(status, + carbonLoadModel, + loadStatus, + newEntryLoadStatus, + overwriteTable) if (!done) { CommonUtil.updateTableStatusForFailure(carbonLoadModel) LOGGER.info("********starting clean up**********") @@ -766,17 +781,18 @@ object CarbonDataRDDFactory { status: Array[(String, (LoadMetadataDetails, ExecutionErrors))], carbonLoadModel: CarbonLoadModel, loadStatus: SegmentStatus, + newEntryLoadStatus: SegmentStatus, overwriteTable: Boolean ): Boolean = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val metadataDetails = if (status != null && status(0) != null) { + val metadataDetails = if (status != null && status.size > 0 && status(0) != null) { status(0)._2._1 } else { new LoadMetadataDetails } CarbonLoaderUtil.populateNewLoadMetaEntry( metadataDetails, - loadStatus, + newEntryLoadStatus, carbonLoadModel.getFactTimeStamp, true) CarbonUtil diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 38c34dd0b0c..0a9869989a2 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -64,14 +64,6 @@ case class CarbonLoadDataCommand( override def processData(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - if (dataFrame.isDefined && updateModel.isEmpty) { - val rdd = dataFrame.get.rdd - if (rdd.partitions == null || rdd.partitions.length == 0) { - LOGGER.warn("DataLoading finished. No data was loaded.") - return Seq.empty - } - } - val carbonProperty: CarbonProperties = CarbonProperties.getInstance() carbonProperty.addProperty("zookeeper.enable.lock", "false") diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 9e6a73e4406..4649cee3a1e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -224,17 +224,18 @@ public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry, entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); // For insert overwrite, we will delete the old segment folder immediately // So collect the old segments here - String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName()); - // add to the deletion list only if file exist else HDFS file system will throw - // exception while deleting the file if file path does not exist - if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { - staleFolders.add(FileFactory.getCarbonFile(path)); - } + addToStaleFolders(carbonTablePath, staleFolders, entry); } } } listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry); } + // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE + // so empty segment folder should be deleted + if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) { + addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry); + } + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()])); // Delete all old stale segment folders @@ -266,6 +267,16 @@ public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry, return status; } + private static void addToStaleFolders(CarbonTablePath carbonTablePath, + List staleFolders, LoadMetadataDetails entry) throws IOException { + String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName()); + // add to the deletion list only if file exist else HDFS file system will throw + // exception while deleting the file if file path does not exist + if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { + staleFolders.add(FileFactory.getCarbonFile(path)); + } + } + /** * Method to create new entry for load in table status file *