Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CARBONDATA-1882] select with group by and insertoverwrite to another carbon table #1641

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -227,38 +227,42 @@ class DataLoadingTestCase extends QueryTest with BeforeAndAfterAll {

//Data load-->Extra_Column_incsv
test("BadRecord_Dataload_019", Include) {
sql(s"""CREATE TABLE exceed_column_in_Csv (CUST_NAME String,date timestamp) STORED BY 'org.apache.carbondata.format'""").collect
intercept[Exception] {
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/extra_column.csv' into table exceed_column_in_Csv OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='CUST_NAME,date')""").collect
checkAnswer(
s"""select count(*) from exceed_column_in_Csv """,
Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_019")
}
sql(s"""drop table exceed_column_in_Csv """).collect
sql(
s"""CREATE TABLE exceed_column_in_Csv (CUST_NAME String,date timestamp) STORED BY 'org.apache.carbondata.format'""".stripMargin).collect
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/extra_column.csv' into table exceed_column_in_Csv OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='CUST_NAME,date')""".stripMargin).collect
checkAnswer(s"""select count(*) from exceed_column_in_Csv """,Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_019")
sql(s"""drop table exceed_column_in_Csv """).collect
}


//Data load-->Timestamp Exceed Range
test("BadRecord_Dataload_020", Include) {
sql(s"""CREATE TABLE timestamp_range (date timestamp) STORED BY 'org.apache.carbondata.format'""").collect
intercept[Exception] {
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/timetsmap.csv' into table timestamp_range OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='date')""").collect
}
checkAnswer(s"""select count(*) from timestamp_range""",
Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_020")
sql(s"""drop table timestamp_range""").collect
sql(
s"""CREATE TABLE timestamp_range (date timestamp) STORED BY 'org.apache.carbondata.format'""".stripMargin).collect
sql(
s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/timetsmap.csv' into table timestamp_range OPTIONS('DELIMITER'=',' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='REDIRECT','FILEHEADER'='date')""".stripMargin).collect
checkAnswer(s"""select count(*) from timestamp_range""",Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_020")
sql(s"""drop table timestamp_range""").collect
}


//Show loads-->Delimeter_check
test("BadRecord_Dataload_021", Include) {
sql(s"""CREATE TABLE bad_records_test5 (String_col string,integer_col int,decimal_column decimal,date timestamp,double_col double) STORED BY 'org.apache.carbondata.format'""").collect
intercept[Exception] {
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/badrecords_test5.csv' into table bad_records_test5 OPTIONS('DELIMITER'='*' , 'QUOTECHAR'='"','BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='IGNORE','FILEHEADER'='String_col,integer_col,decimal_column,date,double_col') """).collect
}
checkAnswer(s"""select count(*) from bad_records_test5""",
sql(
s"""CREATE TABLE bad_records_test5 (String_col string,integer_col int,decimal_column
|decimal,date timestamp,double_col double) STORED BY 'org.apache.carbondata.format'"""
.stripMargin)
.collect
sql(
s"""LOAD DATA INPATH '$resourcesPath/Data/InsertData/badrecords_test5.csv' into table
|bad_records_test5 OPTIONS('DELIMITER'='*' , 'QUOTECHAR'='"',
|'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='IGNORE',
|'FILEHEADER'='String_col,integer_col,decimal_column,date,double_col') """.stripMargin)
.collect
checkAnswer(
s"""select count(*) from bad_records_test5""",
Seq(Row(0)), "DataLoadingTestCase-BadRecord_Dataload_021")
sql(s"""drop table bad_records_test5 """).collect
sql(s"""drop table bad_records_test5 """).collect
}


Expand Down
@@ -0,0 +1 @@
id,name,salary
@@ -0,0 +1,6 @@
id,name,salary
1,hello,2300
2,hi,2500
3,xyz,4000
4,xyz1,5000
5,xyz2,6000
@@ -0,0 +1 @@
id,name,salary,age
@@ -0,0 +1,4 @@
id,name,salary,age
9,abc,48,20
10,abc1,90,21
11,abc2,99,22
Expand Up @@ -18,12 +18,15 @@ package org.apache.carbondata.spark.testsuite.allqueries

import java.io.File

import org.apache.spark.sql.Row
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest

import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonStorePath
Expand All @@ -38,6 +41,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists THive")
sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE THive")
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
}

test("insert from hive") {
Expand Down Expand Up @@ -276,6 +281,121 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
}
sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
assert(rowCount == sql("select imei from TCarbonSourceOverwrite").count())

}

test("insert overwrite in group by scenario with t1 no record and t2 no record") {
queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_noRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}


test("insert overwrite in group by scenario with t1 no record and t2 some record") {
queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_someRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only insert overwrite query is kept in all the testcases as we handled two different scenario 1) IN GROUP BY CASE
2) WITHOUT GROUP BY CASE if we refactor so one extra method need to be written where again same code will exists

val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}

test("insert overwrite in group by scenario having record in both table") {
queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_someRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(exists_t1)
assert(exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2"))
}

test("insert overwrite in group by scenario t1 some record and t2 no record") {
queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_noRecord.csv")
sql("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(exists_t1)
assert(exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2"))
}

test("insert overwrite without group by scenario t1 no record and t2 no record") {
queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_noRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}


test("insert overwrite without group by scenario with t1 no record and t2 some record") {
queryExecution("overwriteTable1_noRecord.csv","overwriteTable2_someRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}

test("insert overwrite without group by scenario having record in both table") {
queryExecution("overwriteTable1_someRecord.csv","overwriteTable2_someRecord.csv")
sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1")
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(exists_t1)
assert(exists_t2)
assert(sql("select * from OverwriteTable_t1").count() == sql("select * from OverwriteTable_t2").count())
checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2"))
}

private def queryExecution(csvFileName1: String , csvFileName2: String) : Unit ={
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ")
sql("LOAD DATA INPATH '" + resourcesPath + s"/$csvFileName1' INTO table OverwriteTable_t1")
sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'")
sql("LOAD DATA INPATH '" + resourcesPath + s"/$csvFileName2' INTO table OverwriteTable_t2")
}



private def checkSegment(tableName: String) : Boolean ={
val storePath_t1 = metastoredb + s"/warehouse/${tableName.toLowerCase()}/Fact/Part0"
val carbonFile_t1: CarbonFile = FileFactory
.getCarbonFile(storePath_t1, FileFactory.getFileType(storePath_t1))
var exists: Boolean = carbonFile_t1.exists()
if (exists) {
val listFiles: Array[CarbonFile] = carbonFile_t1.listFiles()
exists = listFiles.size > 0
}
exists
}

test("test show segments after clean files for insert overwrite") {
Expand Down Expand Up @@ -309,6 +429,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS student")
sql("DROP TABLE IF EXISTS uniqdata")
sql("DROP TABLE IF EXISTS show_insert")
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")

if (timeStampPropOrig != null) {
CarbonProperties.getInstance()
Expand Down
Expand Up @@ -104,6 +104,7 @@ class QueryTest extends PlanTest {
lazy val storeLocation = CarbonProperties.getInstance().
getProperty(CarbonCommonConstants.STORE_LOCATION)
val resourcesPath = TestQueryExecutor.resourcesPath
val metastoredb = TestQueryExecutor.metastoredb
val integrationPath = TestQueryExecutor.integrationPath
val dblocation = TestQueryExecutor.location
}
Expand Down
Expand Up @@ -375,7 +375,17 @@ object CarbonDataRDDFactory {
}
}
} else {
loadStatus = SegmentStatus.LOAD_FAILURE
// if no value is there in data load, make load status Success
// and data load flow executes
if (dataFrame.isDefined && updateModel.isEmpty) {
val rdd = dataFrame.get.rdd
if (rdd.partitions == null || rdd.partitions.length == 0) {
LOGGER.warn("DataLoading finished. No data was loaded.")
loadStatus = SegmentStatus.SUCCESS
}
} else {
loadStatus = SegmentStatus.LOAD_FAILURE
}
}

if (loadStatus != SegmentStatus.LOAD_FAILURE &&
Expand Down Expand Up @@ -483,20 +493,21 @@ object CarbonDataRDDFactory {
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
throw new Exception(status(0)._2._2.errorMsg)
}
// if segment is empty then fail the data load
// as no record loaded in new segment, new segment should be deleted
var newEntryLoadStatus =
if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
// update the load entry in table status file for changing the status to marked for delete
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")

LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
" as there is no data to load")
LOGGER.warn("Cannot write load metadata file as data load failed")
throw new Exception("No Data to load")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write comment 'as no records loaded in new segment, new segment should be deleted'

SegmentStatus.MARKED_FOR_DELETE
} else {
loadStatus
}

writeDictionary(carbonLoadModel, result, writeAll = false)
val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
LoadTablePreStatusUpdateEvent(
Expand All @@ -505,7 +516,13 @@ object CarbonDataRDDFactory {
carbonLoadModel)
operationContext.setProperty("isOverwrite", overwriteTable)
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
val done =
updateTableStatus(
status,
carbonLoadModel,
loadStatus,
newEntryLoadStatus,
overwriteTable)
if (!done) {
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
Expand Down Expand Up @@ -766,17 +783,18 @@ object CarbonDataRDDFactory {
status: Array[(String, (LoadMetadataDetails, ExecutionErrors))],
carbonLoadModel: CarbonLoadModel,
loadStatus: SegmentStatus,
newEntryLoadStatus: SegmentStatus,
overwriteTable: Boolean
): Boolean = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val metadataDetails = if (status != null && status(0) != null) {
val metadataDetails = if (status != null && status.size > 0 && status(0) != null) {
status(0)._2._1
} else {
new LoadMetadataDetails
}
CarbonLoaderUtil.populateNewLoadMetaEntry(
metadataDetails,
loadStatus,
newEntryLoadStatus,
carbonLoadModel.getFactTimeStamp,
true)
CarbonUtil
Expand Down
Expand Up @@ -64,14 +64,6 @@ case class CarbonLoadDataCommand(

override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
if (dataFrame.isDefined && updateModel.isEmpty) {
val rdd = dataFrame.get.rdd
if (rdd.partitions == null || rdd.partitions.length == 0) {
LOGGER.warn("DataLoading finished. No data was loaded.")
return Seq.empty
}
}

val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")

Expand Down