Skip to content

Commit

Permalink
[CARBONDATA-1304] [IUD Bug] Iud with single pass
Browse files Browse the repository at this point in the history
The Update on carbon table is failing with single pass. In case of single pass the tupleId is not being arranged in the end.

In case of single pass the tupleId should be retrieved using SegIdUDF function and should be arranged in the end.

This closes #1167
  • Loading branch information
mohammadshahidkhan authored and manishgupta88 committed Dec 16, 2017
1 parent eb7cf54 commit 8bf72a6
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package org.apache.carbondata.spark.testsuite.iud

import org.apache.spark.sql.{Row, SaveMode}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants

import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest

Expand Down Expand Up @@ -528,6 +529,46 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists preaggMain_preagg1")
}

test("Update operation on carbon table with singlepass") {
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=true""")
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
STORED BY 'org.apache.carbondata.format'""")
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
// update operation
sql("""update carbon.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
checkAnswer(
sql("select count(*) from carbontable"),
Seq(Row(6))
)
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=false""")
sql("drop table carbontable")
}
test("Update operation on carbon table with persist false") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.isPersistEnabled, "false")
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
STORED BY 'org.apache.carbondata.format'""")
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
// update operation
sql("""update carbon.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
checkAnswer(
sql("select count(*) from carbontable"),
Seq(Row(6))
)
sql("drop table carbontable")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.isPersistEnabled,
CarbonCommonConstants.defaultValueIsPersistEnabled)
}

override def afterAll {
sql("use default")
sql("drop database if exists iud cascade")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,21 @@ object CarbonScalaUtil {
} cannot be modified. Only Int and Decimal data types are allowed for modification")
}
}

/**
* returns all fields except tupleId field as it is not required in the value
*
* @param fields
* @return
*/
def getAllFieldsWithoutTupleIdField(fields: Array[StructField]): Seq[Column] = {
// getting all fields except tupleId field as it is not required in the value
val otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
new Column(field.name)
})
otherFields
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object TestQueryExecutor {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

val projectPath = new File(this.getClass.getResource("/").getPath + "../../../..")
.getCanonicalPath
.getCanonicalPath.replaceAll("\\\\", "/")
LOGGER.info(s"project path: $projectPath")
val integrationPath = s"$projectPath/integration"
val metastoredb = s"$integrationPath/spark-common/target"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ object CarbonDataRDDFactory {
} else {
// in success case handle updation of the table status file.
// success case.
// write the dictionary file in case of single_pass true
writeDictionary(carbonLoadModel, result, false)
val segmentDetails = new util.HashSet[String]()
var resultSize = 0
res.foreach { resultOfSeg =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.{CausedBy, FileUtils}

import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
Expand All @@ -47,7 +48,7 @@ import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}

case class CarbonLoadDataCommand(
databaseNameOp: Option[String],
Expand Down Expand Up @@ -310,14 +311,19 @@ case class CarbonLoadDataCommand(
} else {
None
}
val loadDataFrame = if (updateModel.isDefined) {
Some(getDataFrameWithTupleID())
} else {
dataFrame
}
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
columnar,
partitionStatus,
server,
isOverwriteTable,
hadoopConf,
dataFrame,
loadDataFrame,
updateModel,
operationContext)
}
Expand All @@ -330,27 +336,11 @@ case class CarbonLoadDataCommand(
hadoopConf: Configuration,
operationContext: OperationContext): Unit = {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID()
// getting all fields except tupleId field as it is not required in the value
var otherFields = fields.toSeq.filter { field =>
!field.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
}.map { field =>
new Column(field.name)
}

// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// use dataFrameWithoutTupleId as dictionaryDataFrame
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
otherFields = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
} else {
(dataFrame, dataFrame)
Expand All @@ -374,6 +364,24 @@ case class CarbonLoadDataCommand(
operationContext)
}

def getDataFrameWithTupleID(): DataFrame = {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
// getting all fields except tupleId field as it is not required in the value
val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(fields)
// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
val fieldWithTupleId = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*)
(dataFrameWithTupleId)
}

private def updateTableMetadata(
carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext,
Expand Down

0 comments on commit 8bf72a6

Please sign in to comment.