Skip to content

Commit

Permalink
SDV test case failure fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kushalsaha authored and rahulk2 committed Dec 15, 2017
1 parent 4158f3d commit 2dbe03a
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 45 deletions.
Expand Up @@ -21,6 +21,8 @@ package org.apache.carbondata.cluster.sdv.generated
import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.script.Include

import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util._
import org.apache.spark.sql.test.TestQueryExecutor
Expand Down Expand Up @@ -227,38 +229,60 @@ class DataLoadingTestCase extends QueryTest with BeforeAndAfterAll {

//Data load-->Extra_Column_incsv
test("BadRecord_Dataload_019", Include) {
sql(s"""CREATE TABLE exceed_column_in_Csv (CUST_NAME String,date timestamp) STORED BY 'org.apache.carbondata.format'""").collect
intercept[Exception] {
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/extra_column.csv' into table exceed_column_in_Csv OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='CUST_NAME,date')""").collect
checkAnswer(
s"""select count(*) from exceed_column_in_Csv """,
Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_019")
}
sql(s"""drop table exceed_column_in_Csv """).collect
sql(
s"""CREATE TABLE exceed_column_in_Csv (CUST_NAME String,date timestamp) STORED BY 'org
|.apache.carbondata.format'""".stripMargin)
.collect
sql(
s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/extra_column.csv' into table
|exceed_column_in_Csv OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"',
|'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT',
|'FILEHEADER'='CUST_NAME,date')""".stripMargin)
.collect
checkAnswer(
s"""select count(*) from exceed_column_in_Csv """,
Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_019")

sql(s"""drop table exceed_column_in_Csv """).collect
}


//Data load-->Timestamp Exceed Range
test("BadRecord_Dataload_020", Include) {
sql(s"""CREATE TABLE timestamp_range (date timestamp) STORED BY 'org.apache.carbondata.format'""").collect
intercept[Exception] {
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/timetsmap.csv' into table timestamp_range OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='date')""").collect
}
checkAnswer(s"""select count(*) from timestamp_range""",
sql(
s"""CREATE TABLE timestamp_range (date timestamp) STORED BY 'org.apache.carbondata
|.format'""".stripMargin)
.collect
sql(
s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/timetsmap.csv' into table
|timestamp_range OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"',
|'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT',
|'FILEHEADER'='date')""".stripMargin)
.collect
checkAnswer(
s"""select count(*) from timestamp_range""",
Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_020")
sql(s"""drop table timestamp_range""").collect
sql(s"""drop table timestamp_range""").collect
}


//Show loads-->Delimeter_check
test("BadRecord_Dataload_021", Include) {
sql(s"""CREATE TABLE bad_records_test5 (String_col string,integer_col int,decimal_column decimal,date timestamp,double_col double) STORED BY 'org.apache.carbondata.format'""").collect
intercept[Exception] {
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/badrecords_test5.csv' into table bad_records_test5 OPTIONS('DELIMITER'='*' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='IGNORE','FILEHEADER'='String_col,integer_col,decimal_column,date,double_col') """).collect
}
checkAnswer(s"""select count(*) from bad_records_test5""",
sql(
s"""CREATE TABLE bad_records_test5 (String_col string,integer_col int,decimal_column
|decimal,date timestamp,double_col double) STORED BY 'org.apache.carbondata.format'"""
.stripMargin)
.collect
sql(
s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/badrecords_test5.csv' into table
|bad_records_test5 OPTIONS('DELIMITER'='*' , 'QUOTECHAR'='"',
|'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='IGNORE',
|'FILEHEADER'='String_col,integer_col,decimal_column,date,double_col') """.stripMargin)
.collect
checkAnswer(
s"""select count(*) from bad_records_test5""",
Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_021")
sql(s"""drop table bad_records_test5 """).collect
sql(s"""drop table bad_records_test5 """).collect
}


Expand Down
@@ -0,0 +1 @@
id,name,salary
@@ -0,0 +1,6 @@
id,name,salary
1,hello,2300
2,hi,2500
3,xyz,4000
4,xyz1,5000
5,xyz2,6000
@@ -0,0 +1 @@
id,name,salary,age
@@ -0,0 +1,4 @@
id,name,salary,age
9,abc,48,20
10,abc1,90,21
11,abc2,99,22
Expand Up @@ -18,12 +18,15 @@ package org.apache.carbondata.spark.testsuite.allqueries

import java.io.File

import org.apache.spark.sql.Row
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest

import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonStorePath
Expand All @@ -38,6 +41,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists THive")
sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE THive")
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
}

test("insert from hive") {
Expand Down Expand Up @@ -276,6 +281,121 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
}
sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
assert(rowCount == sql("select imei from TCarbonSourceOverwrite").count())

}

test("insert overwrite in group by scenario with t1 no record and t2 no record") {
queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_noRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}


test("insert overwrite in group by scenario with t1 no record and t2 some record") {
queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_someRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}

test("insert overwrite in group by scenario having record in both table") {
queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_someRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(exists_t1)
assert(exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2"))
}

test("insert overwrite in group by scenario t1 some record and t2 no record") {
queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_noRecord.csv")
sql("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(exists_t1)
assert(exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2"))
}

test("insert overwrite without group by scenario t1 no record and t2 no record") {
queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_noRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}


test("insert overwrite without group by scenario with t1 no record and t2 some record") {
queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_someRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}

test("insert overwrite without group by scenario having record in both table") {
queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_someRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(exists_t1)
assert(exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2"))
}

private def queryExecution(csvFileName1: String , csvFileName2: String) : Unit ={
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ")
sql("LOAD DATA INPATH '" + resourcesPath + s"/$csvFileName1' INTO table OverwriteTable_t1")
sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'")
sql("LOAD DATA INPATH '" + resourcesPath + s"/$csvFileName2' INTO table OverwriteTable_t2")
}



private def checkSegment(tableName: String) : Boolean ={
val storePath_t1 = metastoredb + s"/warehouse/${tableName.toLowerCase()}/Fact/Part0"
val carbonFile_t1: CarbonFile = FileFactory
.getCarbonFile(storePath_t1, FileFactory.getFileType(storePath_t1))
var exists: Boolean = carbonFile_t1.exists()
if (exists) {
val listFiles: Array[CarbonFile] = carbonFile_t1.listFiles()
exists = listFiles.size > 0
}
exists
}

test("test show segments after clean files for insert overwrite") {
Expand Down Expand Up @@ -309,6 +429,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS student")
sql("DROP TABLE IF EXISTS uniqdata")
sql("DROP TABLE IF EXISTS show_insert")
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")

if (timeStampPropOrig != null) {
CarbonProperties.getInstance()
Expand Down
Expand Up @@ -104,6 +104,7 @@ class QueryTest extends PlanTest {
lazy val storeLocation = CarbonProperties.getInstance().
getProperty(CarbonCommonConstants.STORE_LOCATION)
val resourcesPath = TestQueryExecutor.resourcesPath
val metastoredb = TestQueryExecutor.metastoredb
val integrationPath = TestQueryExecutor.integrationPath
val dblocation = TestQueryExecutor.location
}
Expand Down
Expand Up @@ -375,7 +375,17 @@ object CarbonDataRDDFactory {
}
}
} else {
loadStatus = SegmentStatus.LOAD_FAILURE
// if no value is there in data load, make load status Success
// and data load flow executes
if (dataFrame.isDefined && updateModel.isEmpty) {
val rdd = dataFrame.get.rdd
if (rdd.partitions == null || rdd.partitions.length == 0) {
LOGGER.warn("DataLoading finished. No data was loaded.")
loadStatus = SegmentStatus.SUCCESS
}
} else {
loadStatus = SegmentStatus.LOAD_FAILURE
}
}

if (loadStatus != SegmentStatus.LOAD_FAILURE &&
Expand Down Expand Up @@ -483,20 +493,21 @@ object CarbonDataRDDFactory {
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
throw new Exception(status(0)._2._2.errorMsg)
}
// if segment is empty then fail the data load
// as no record loaded in new segment, new segment should be deleted
var newEntryLoadStatus =
if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
// update the load entry in table status file for changing the status to marked for delete
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")

LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
" as there is no data to load")
LOGGER.warn("Cannot write load metadata file as data load failed")
throw new Exception("No Data to load")

SegmentStatus.MARKED_FOR_DELETE
} else {
loadStatus
}

writeDictionary(carbonLoadModel, result, writeAll = false)
val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
LoadTablePreStatusUpdateEvent(
Expand All @@ -505,7 +516,11 @@ object CarbonDataRDDFactory {
carbonLoadModel)
operationContext.setProperty("isOverwrite", overwriteTable)
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
val done = updateTableStatus(status,
carbonLoadModel,
loadStatus,
newEntryLoadStatus,
overwriteTable)
if (!done) {
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
Expand Down Expand Up @@ -766,17 +781,18 @@ object CarbonDataRDDFactory {
status: Array[(String, (LoadMetadataDetails, ExecutionErrors))],
carbonLoadModel: CarbonLoadModel,
loadStatus: SegmentStatus,
newEntryLoadStatus: SegmentStatus,
overwriteTable: Boolean
): Boolean = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val metadataDetails = if (status != null && status(0) != null) {
val metadataDetails = if (status != null && status.size > 0 && status(0) != null) {
status(0)._2._1
} else {
new LoadMetadataDetails
}
CarbonLoaderUtil.populateNewLoadMetaEntry(
metadataDetails,
loadStatus,
newEntryLoadStatus,
carbonLoadModel.getFactTimeStamp,
true)
CarbonUtil
Expand Down

0 comments on commit 2dbe03a

Please sign in to comment.