Skip to content

Commit

Permalink
[CARBONDATA-1304] Support IUD with single pass
Browse files Browse the repository at this point in the history
  • Loading branch information
mohammadshahidkhan committed Dec 15, 2017
1 parent e2a79ee commit d8d408f
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package org.apache.carbondata.spark.testsuite.iud

import org.apache.spark.sql.{Row, SaveMode}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants

import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest

Expand Down Expand Up @@ -528,6 +529,46 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists preaggMain_preagg1")
}

test("Update operation on carbon table with singlepass") {
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=true""")
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
STORED BY 'org.apache.carbondata.format'""")
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
// update operation
sql("""update carbon.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
checkAnswer(
sql("select count(*) from carbontable"),
Seq(Row(6))
)
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=false""")
sql("drop table carbontable")
}
test("Update operation on carbon table with persist false") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.isPersistEnabled, "false")
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
STORED BY 'org.apache.carbondata.format'""")
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
// update operation
sql("""update carbon.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
checkAnswer(
sql("select count(*) from carbontable"),
Seq(Row(6))
)
sql("drop table carbontable")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.isPersistEnabled,
CarbonCommonConstants.defaultValueIsPersistEnabled)
}

override def afterAll {
sql("use default")
sql("drop database if exists iud cascade")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,21 @@ object CarbonScalaUtil {
} cannot be modified. Only Int and Decimal data types are allowed for modification")
}
}

/**
* returns all fields except tupleId field as it is not required in the value
*
* @param fields
* @return
*/
def getAllFieldsWithoutTupleIdField(fields: Array[StructField]): Seq[Column] = {
// getting all fields except tupleId field as it is not required in the value
val otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
new Column(field.name)
})
otherFields
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object TestQueryExecutor {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

val projectPath = new File(this.getClass.getResource("/").getPath + "../../../..")
.getCanonicalPath
.getCanonicalPath.replaceAll("\\\\", "/")
LOGGER.info(s"project path: $projectPath")
val integrationPath = s"$projectPath/integration"
val metastoredb = s"$integrationPath/spark-common/target"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ object CarbonDataRDDFactory {
} else {
// in success case handle updation of the table status file.
// success case.
// write the dictionary file in case of single_pass true
writeDictionary(carbonLoadModel, result, false)
val segmentDetails = new util.HashSet[String]()
var resultSize = 0
res.foreach { resultOfSeg =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.{CausedBy, FileUtils}

import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
Expand All @@ -47,7 +48,7 @@ import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, Car
import org.apache.carbondata.processing.merger.CompactionType
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}

case class CarbonLoadDataCommand(
databaseNameOp: Option[String],
Expand Down Expand Up @@ -314,14 +315,19 @@ case class CarbonLoadDataCommand(
} else {
None
}
val loadDataFrame = if (updateModel.isDefined) {
Some(getDataFrameWithTupleID())
} else {
dataFrame
}
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
columnar,
partitionStatus,
server,
isOverwriteTable,
hadoopConf,
dataFrame,
loadDataFrame,
updateModel,
operationContext)
}
Expand All @@ -334,27 +340,11 @@ case class CarbonLoadDataCommand(
hadoopConf: Configuration,
operationContext: OperationContext): Unit = {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID()
// getting all fields except tupleId field as it is not required in the value
var otherFields = fields.toSeq.filter { field =>
!field.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
}.map { field =>
new Column(field.name)
}

// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// use dataFrameWithoutTupleId as dictionaryDataFrame
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
otherFields = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
} else {
(dataFrame, dataFrame)
Expand All @@ -378,6 +368,24 @@ case class CarbonLoadDataCommand(
operationContext)
}

def getDataFrameWithTupleID(): DataFrame = {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
// getting all fields except tupleId field as it is not required in the value
val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(fields)
// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
val fieldWithTupleId = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*)
(dataFrameWithTupleId)
}

private def updateTableMetadata(
carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext,
Expand Down

0 comments on commit d8d408f

Please sign in to comment.