Skip to content

Commit

Permalink
select with group by and insertoverwrite to another carbon table
Browse files Browse the repository at this point in the history
  • Loading branch information
kushalsaha authored and dhatchayani committed Dec 14, 2017
1 parent 4158f3d commit d750892
Show file tree
Hide file tree
Showing 9 changed files with 217 additions and 25 deletions.
@@ -0,0 +1 @@
id,name,salary
@@ -0,0 +1,6 @@
id,name,salary
1,hello,2300
2,hi,2500
3,xyz,4000
4,xyz1,5000
5,xyz2,6000
@@ -0,0 +1 @@
id,name,salary,age
@@ -0,0 +1,4 @@
id,name,salary,age
9,abc,48,20
10,abc1,90,21
11,abc2,99,22
Expand Up @@ -18,12 +18,15 @@ package org.apache.carbondata.spark.testsuite.allqueries

import java.io.File

import org.apache.spark.sql.Row
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest

import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonStorePath
Expand All @@ -38,6 +41,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists THive")
sql("create table THive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber BigInt) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
sql(s"LOAD DATA local INPATH '$resourcesPath/100_olap.csv' INTO TABLE THive")
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
}

test("insert from hive") {
Expand Down Expand Up @@ -276,6 +281,159 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
}
sql("LOAD DATA INPATH '" + resourcesPath + "/100_olap.csv' overwrite INTO table TCarbonSourceOverwrite options ('DELIMITER'=',', 'QUOTECHAR'='\', 'FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointDescription,gamePointId,contractNumber')")
assert(rowCount == sql("select imei from TCarbonSourceOverwrite").count())

}

test("insert overwrite in group by scenario with t1 no record and t2 no record") {
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable1_noRecord.csv' INTO table OverwriteTable_t1")
sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable2_noRecord.csv' INTO table OverwriteTable_t2")
sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val rowCountOverwriteT1 = sql("select * from OverwriteTable_t1").count()
val rowCountOverwriteT2 = sql("select * from OverwriteTable_t2").count()
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(rowCountOverwriteT1 == rowCountOverwriteT2)
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}


test("insert overwrite in group by scenario with t1 no record and t2 some record") {
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable1_noRecord.csv' INTO table OverwriteTable_t1")
sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable2_someRecord.csv' INTO table OverwriteTable_t2")
sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val rowCountOverwriteT1 = sql("select * from OverwriteTable_t1").count()
val rowCountOverwriteT2 = sql("select * from OverwriteTable_t2").count()
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(rowCountOverwriteT1 == rowCountOverwriteT2)
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}

test("insert overwrite in group by scenario having record in both table") {
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable1_someRecord.csv' INTO table OverwriteTable_t1")
sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable2_someRecord.csv' INTO table OverwriteTable_t2")
sql ("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val rowCountOverwriteT1 = sql("select * from OverwriteTable_t1").count()
val rowCountOverwriteT2 = sql("select * from OverwriteTable_t2").count()
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(exists_t1)
assert(exists_t2)
assert(rowCountOverwriteT1 == rowCountOverwriteT2)
checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2"))
}

test("insert overwrite in group by scenario t1 some record and t2 no record") {
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable1_someRecord.csv' INTO table OverwriteTable_t1")
sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable2_noRecord.csv' INTO table OverwriteTable_t2")
sql("insert overwrite table OverwriteTable_t2 select id,name,sum(salary) as TotalSalary,'98' as age from OverwriteTable_t1 group by id,name,salary")
val rowCountOverwriteT1 = sql("select * from OverwriteTable_t1").count()
val rowCountOverwriteT2 = sql("select * from OverwriteTable_t2").count()
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(exists_t1)
assert(exists_t2)
assert(rowCountOverwriteT1 == rowCountOverwriteT2)
checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2"))
}

test("insert overwrite without group by scenario t1 no record and t2 no record") {
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable1_noRecord.csv' INTO table OverwriteTable_t1")
sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable2_noRecord.csv' INTO table OverwriteTable_t2")
sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1")
val rowCountOverwriteT1 = sql("select * from OverwriteTable_t1").count()
val rowCountOverwriteT2 = sql("select * from OverwriteTable_t2").count()
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(rowCountOverwriteT1 == rowCountOverwriteT2)
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}


test("insert overwrite without group by scenario with t1 no record and t2 some record") {
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable1_noRecord.csv' INTO table OverwriteTable_t1")
sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable2_someRecord.csv' INTO table OverwriteTable_t2")
sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1")
val rowCountOverwriteT1 = sql("select * from OverwriteTable_t1").count()
val rowCountOverwriteT2 = sql("select * from OverwriteTable_t2").count()
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(!exists_t1)
assert(!exists_t2)
assert(rowCountOverwriteT1 == rowCountOverwriteT2)
checkAnswer(sql("select * from OverwriteTable_t2"),
Seq())
checkAnswer(sql("select * from OverwriteTable_t1"),
sql("select * from OverwriteTable_t2"))
}

test("insert overwrite without group by scenario having record in both table") {
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")
sql("create table OverwriteTable_t1(id int,name String,salary int) stored by 'carbondata' ")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable1_someRecord.csv' INTO table OverwriteTable_t1")
sql("create table OverwriteTable_t2(id int,name String,salary int,age String) stored by 'carbondata'")
sql("LOAD DATA INPATH '" + resourcesPath + "/overwriteTable2_someRecord.csv' INTO table OverwriteTable_t2")
sql ("insert overwrite table OverwriteTable_t2 select id,name,salary as TotalSalary,'98' as age from OverwriteTable_t1")
val rowCountOverwriteT1 = sql("select * from OverwriteTable_t1").count()
val rowCountOverwriteT2 = sql("select * from OverwriteTable_t2").count()
val exists_t1 = checkSegment("OverwriteTable_t1")
val exists_t2 = checkSegment("OverwriteTable_t2")
assert(exists_t1)
assert(exists_t2)
assert(rowCountOverwriteT1 == rowCountOverwriteT2)
checkAnswer(sql("select count(*) from OverwriteTable_t1"), sql("select count(*) from OverwriteTable_t2"))
}

private def checkSegment(tableName: String) : Boolean ={
val storePath_t1 = metastoredb + s"/warehouse/$tableName/Fact/Part0"
val carbonFile_t1: CarbonFile = FileFactory
.getCarbonFile(storePath_t1, FileFactory.getFileType(storePath_t1))
var exists: Boolean = carbonFile_t1.exists()
if (exists) {
val listFiles: Array[CarbonFile] = carbonFile_t1.listFiles()
exists = listFiles.size > 0
}
exists
}

test("test show segments after clean files for insert overwrite") {
Expand Down Expand Up @@ -309,6 +467,8 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS student")
sql("DROP TABLE IF EXISTS uniqdata")
sql("DROP TABLE IF EXISTS show_insert")
sql("drop table if exists OverwriteTable_t1")
sql("drop table if exists OverwriteTable_t2")

if (timeStampPropOrig != null) {
CarbonProperties.getInstance()
Expand Down
Expand Up @@ -104,6 +104,7 @@ class QueryTest extends PlanTest {
lazy val storeLocation = CarbonProperties.getInstance().
getProperty(CarbonCommonConstants.STORE_LOCATION)
val resourcesPath = TestQueryExecutor.resourcesPath
val metastoredb = TestQueryExecutor.metastoredb
val integrationPath = TestQueryExecutor.integrationPath
val dblocation = TestQueryExecutor.location
}
Expand Down
Expand Up @@ -375,7 +375,17 @@ object CarbonDataRDDFactory {
}
}
} else {
loadStatus = SegmentStatus.LOAD_FAILURE
// if no value is there in data load, make load status Success
// and data load flow executes
if (dataFrame.isDefined && updateModel.isEmpty) {
val rdd = dataFrame.get.rdd
if (rdd.partitions == null || rdd.partitions.length == 0) {
LOGGER.warn("DataLoading finished. No data was loaded.")
loadStatus = SegmentStatus.SUCCESS
}
} else {
loadStatus = SegmentStatus.LOAD_FAILURE
}
}

if (loadStatus != SegmentStatus.LOAD_FAILURE &&
Expand Down Expand Up @@ -483,20 +493,21 @@ object CarbonDataRDDFactory {
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
throw new Exception(status(0)._2._2.errorMsg)
}
// if segment is empty then fail the data load
// as no record loaded in new segment, new segment should be deleted
var newEntryLoadStatus =
if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
// update the load entry in table status file for changing the status to marked for delete
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")

LOGGER.audit(s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
" as there is no data to load")
LOGGER.warn("Cannot write load metadata file as data load failed")
throw new Exception("No Data to load")

SegmentStatus.MARKED_FOR_DELETE
} else {
loadStatus
}

writeDictionary(carbonLoadModel, result, writeAll = false)
val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
LoadTablePreStatusUpdateEvent(
Expand All @@ -505,7 +516,11 @@ object CarbonDataRDDFactory {
carbonLoadModel)
operationContext.setProperty("isOverwrite", overwriteTable)
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
val done = updateTableStatus(status,
carbonLoadModel,
loadStatus,
newEntryLoadStatus,
overwriteTable)
if (!done) {
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
LOGGER.info("********starting clean up**********")
Expand Down Expand Up @@ -766,17 +781,18 @@ object CarbonDataRDDFactory {
status: Array[(String, (LoadMetadataDetails, ExecutionErrors))],
carbonLoadModel: CarbonLoadModel,
loadStatus: SegmentStatus,
newEntryLoadStatus: SegmentStatus,
overwriteTable: Boolean
): Boolean = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val metadataDetails = if (status != null && status(0) != null) {
val metadataDetails = if (status != null && status.size > 0 && status(0) != null) {
status(0)._2._1
} else {
new LoadMetadataDetails
}
CarbonLoaderUtil.populateNewLoadMetaEntry(
metadataDetails,
loadStatus,
newEntryLoadStatus,
carbonLoadModel.getFactTimeStamp,
true)
CarbonUtil
Expand Down
Expand Up @@ -64,14 +64,6 @@ case class CarbonLoadDataCommand(

override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
if (dataFrame.isDefined && updateModel.isEmpty) {
val rdd = dataFrame.get.rdd
if (rdd.partitions == null || rdd.partitions.length == 0) {
LOGGER.warn("DataLoading finished. No data was loaded.")
return Seq.empty
}
}

val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")

Expand Down

0 comments on commit d750892

Please sign in to comment.