Skip to content

Commit

Permalink
[CARBONDATA-1304] Support IUD with single pass
Browse files Browse the repository at this point in the history
  • Loading branch information
mohammadshahidkhan committed Dec 9, 2017
1 parent 2304303 commit ca3097e
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.iud

import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties

class TestUpdateCarbonTableWithPersistFalse extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
super.beforeAll
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
.addProperty(CarbonCommonConstants.isPersistEnabled, "false")
}
test("Update operation on carbon table") {
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
STORED BY 'org.apache.carbondata.format'""")
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
// update operation
sql("""update carbon.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
checkAnswer(
sql("select count(*) from carbontable"),
Seq(Row(6))
)
sql("drop table carbontable")
}

override def afterAll {
super.afterAll
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
.addProperty(CarbonCommonConstants.isPersistEnabled,
CarbonCommonConstants.defaultValueIsPersistEnabled)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.iud

import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties

class TestUpdateCarbonTableWithSinglePass extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
super.beforeAll
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
//.addProperty(CarbonCommonConstants.isPersistEnabled, "false")
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=true""")
}

test("Update operation on carbon table") {
sql("drop database if exists carbon cascade")
sql(s"create database carbon location '$dblocation'")
sql("use carbon")
sql("""CREATE TABLE carbontable(id int, name string, city string, age int)
STORED BY 'org.apache.carbondata.format'""")
val testData = s"$resourcesPath/sample.csv"
sql(s"LOAD DATA LOCAL INPATH '$testData' into table carbontable")
// update operation
sql("""update carbon.carbontable d set (d.id) = (d.id + 1) where d.id > 2""").show()
checkAnswer(
sql("select count(*) from carbontable"),
Seq(Row(6))
)
sql("drop table carbontable")
}

override def afterAll {
super.afterAll
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=false""")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION ,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,21 @@ object CarbonScalaUtil {
} cannot be modified. Only Int and Decimal data types are allowed for modification")
}
}

/**
* returns all fields except tupleId field as it is not required in the value
*
* @param fields
* @return
*/
def getAllFieldsWithoutTupleIdField(fields: Array[StructField]): Seq[Column] = {
// getting all fields except tupleId field as it is not required in the value
val otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
new Column(field.name)
})
otherFields
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ object TestQueryExecutor {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

val projectPath = new File(this.getClass.getResource("/").getPath + "../../../..")
.getCanonicalPath
.getCanonicalPath.replaceAll("\\\\", "/")
LOGGER.info(s"project path: $projectPath")
val integrationPath = s"$projectPath/integration"
val metastoredb = s"$integrationPath/spark-common/target"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarForma
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
Expand Down Expand Up @@ -417,6 +417,8 @@ object CarbonDataRDDFactory {
} else {
// in success case handle updation of the table status file.
// success case.
// write the dictionary file in case of single_pass true
writeDictionary(carbonLoadModel, result, false)
val segmentDetails = new util.HashSet[String]()
var resultSize = 0
res.foreach { resultOfSeg =>
Expand Down Expand Up @@ -549,7 +551,8 @@ object CarbonDataRDDFactory {
} else {
// splitting as (key, value) i.e., (segment, updatedRows)
val keyRDD = updateRdd.map(row =>
(row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
(row.get(row.size - 1).toString,
Row(row.toSeq.slice(0, row.size - 1): _*)))

val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
carbonTable.getMetaDataFilepath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.types.StructField
import org.apache.spark.util.{CausedBy, FileUtils}

import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
Expand All @@ -48,7 +49,7 @@ import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, Car
import org.apache.carbondata.processing.merger.CompactionType
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}

case class CarbonLoadDataCommand(
databaseNameOp: Option[String],
Expand Down Expand Up @@ -312,6 +313,11 @@ case class CarbonLoadDataCommand(
} else {
None
}
var loadDataFrame = dataFrame
if (updateModel.isDefined) {
val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID()
loadDataFrame = Some(dataFrameWithTupleId)
}
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
carbonLoadModel.getTablePath,
Expand All @@ -320,7 +326,7 @@ case class CarbonLoadDataCommand(
server,
isOverwriteTable,
hadoopConf,
dataFrame,
loadDataFrame,
updateModel)
}

Expand All @@ -331,27 +337,11 @@ case class CarbonLoadDataCommand(
partitionStatus: SegmentStatus,
hadoopConf: Configuration): Unit = {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID()
// getting all fields except tupleId field as it is not required in the value
var otherFields = fields.toSeq.filter { field =>
!field.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
}.map { field =>
new Column(field.name)
}

// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// use dataFrameWithoutTupleId as dictionaryDataFrame
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
otherFields = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
} else {
(dataFrame, dataFrame)
Expand All @@ -375,6 +365,24 @@ case class CarbonLoadDataCommand(
updateModel)
}

def getDataFrameWithTupleID(): DataFrame = {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
// getting all fields except tupleId field as it is not required in the value
val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(fields)
// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
val fieldWithTupleId = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(fieldWithTupleId: _*)
(dataFrameWithTupleId)
}

private def updateTableMetadata(
carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext,
Expand Down

0 comments on commit ca3097e

Please sign in to comment.