Skip to content

Commit

Permalink
[CARBONDATA-1304] Support IUD with single pass
Browse files Browse the repository at this point in the history
[CARBONDATA-1293] update on carbon data failed with carbon.update.persist.enable false
  • Loading branch information
mohammadshahidkhan committed Jul 13, 2017
1 parent cbe1419 commit c128a75
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 71 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.iud

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties

class UpdateCarbonTableTestCaseWithPersistFalse extends UpdateCarbonTableTestCase {
override def beforeAll {
super.beforeAll
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
.addProperty(CarbonCommonConstants.isPersistEnabled, "false")
}

override def afterAll {
super.afterAll
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
.addProperty(CarbonCommonConstants.isPersistEnabled,
CarbonCommonConstants.defaultValueIsPersistEnabled)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.iud

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties

class UpdateCarbonTableTestCaseWithSinglePass extends UpdateCarbonTableTestCase {
override def beforeAll {
super.beforeAll
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=true""")
}
override def afterAll {
super.afterAll
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=false""")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION ,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,21 @@ object CarbonScalaUtil {
} cannot be modified. Only Int and Decimal data types are allowed for modification")
}
}

/**
* returns all fields except tupleId field as it is not required in the value
*
* @param fields
* @return
*/
def getAllFieldsWithoutTupleIdField(fields: Array[StructField]): Seq[Column] = {
// getting all fields except tupleId field as it is not required in the value
val otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
new Column(field.name)
})
otherFields
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarForma
import org.apache.carbondata.core.metadata.datatype.DataType
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
Expand Down Expand Up @@ -722,8 +722,8 @@ object CarbonDataRDDFactory {

val keyRDD = updateRdd.map(row =>
// splitting as (key, value) i.e., (segment, updatedRows)
(row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))
)
(CarbonUpdateUtil.getRequiredFieldFromTID(row.get(row.size - 1).toString,
TupleIdEnum.SEGMENT_ID), Row(row.toSeq.slice(0, row.size - 1): _*)))
val groupBySegmentRdd = keyRDD.groupByKey()

val nodeNumOfData = groupBySegmentRdd.partitions.flatMap[String, Array[String]] { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.language.implicitConversions

import org.apache.commons.lang3.StringUtils
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.{Column, _}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand All @@ -44,9 +44,8 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.constants.TableOptionConstant
Expand All @@ -56,7 +55,7 @@ import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConst
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.load.ValidateUtil
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}

object Checker {
def validateTableExists(
Expand Down Expand Up @@ -543,6 +542,18 @@ case class LoadTable(
LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
carbonLoadModel.setUseOnePass(false)
}
var otherFields: Seq[Column] = Seq.empty
val loadDataFrame = if (updateModel.isDefined) {
otherFields = CarbonScalaUtil
.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// appending the tupleId field in the end
val otherFieldsWithTupleId = otherFields :+ new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).as("tupleId")
val dataFrameWithTupleId = dataFrame.get.select(otherFieldsWithTupleId: _*)
Some(dataFrameWithTupleId)
} else {
dataFrame
}
if (carbonLoadModel.getUseOnePass) {
val colDictFilePath = carbonLoadModel.getColDictFilePath
if (!StringUtils.isEmpty(colDictFilePath)) {
Expand Down Expand Up @@ -596,41 +607,15 @@ case class LoadTable(
columnar,
partitionStatus,
server,
dataFrame,
loadDataFrame,
updateModel)
} else {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
// getting all fields except tupleId field as it is not required in the value
var otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
new Column(field.name
.substring(0,
field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
} else {

new Column(field.name)
}
})

// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
val dictionaryDataFrame = if (updateModel.isDefined) {
// use dataFrameWithoutTupleId as dictionaryDataFrame
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
otherFields = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
Some(dataFrameWithoutTupleId)
} else {
(dataFrame, dataFrame)
dataFrame
}
GlobalDictionaryUtil
.generateGlobalDictionary(sqlContext, carbonLoadModel, relation.tableMeta.storePath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarForma
import org.apache.carbondata.core.metadata.datatype.DataType
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
Expand Down Expand Up @@ -734,7 +734,9 @@ object CarbonDataRDDFactory {

val keyRDD = updateRdd.map(row =>
// splitting as (key, value) i.e., (segment, updatedRows)
(row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))
(CarbonUpdateUtil
.getRequiredFieldFromTID(row.get(row.size - 1).toString, TupleIdEnum.SEGMENT_ID), Row(
row.toSeq.slice(0, row.size - 1): _*))
)
val groupBySegmentRdd = keyRDD.groupByKey()

Expand Down Expand Up @@ -893,6 +895,8 @@ object CarbonDataRDDFactory {
} else {
// in success case handle updation of the table status file.
// success case.
// write the dictionary file in case of single_pass true
writeDictionary(carbonLoadModel, result, false)
val segmentDetails = new util.HashSet[String]()

var resultSize = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.constants.TableOptionConstant
Expand All @@ -55,7 +54,7 @@ import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConst
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.load.ValidateUtil
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil}

object Checker {
def validateTableExists(
Expand Down Expand Up @@ -600,6 +599,18 @@ case class LoadTable(
LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
carbonLoadModel.setUseOnePass(false)
}
var otherFields: Seq[Column] = Seq.empty
val loadDataFrame = if (updateModel.isDefined) {
otherFields = CarbonScalaUtil
.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// appending the tupleId field in the end
val otherFieldsWithTupleId = otherFields :+ new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).as("tupleId")
val dataFrameWithTupleId = dataFrame.get.select(otherFieldsWithTupleId: _*)
Some(dataFrameWithTupleId)
} else {
dataFrame
}
if (carbonLoadModel.getUseOnePass) {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
Expand Down Expand Up @@ -661,42 +672,16 @@ case class LoadTable(
columnar,
partitionStatus,
server,
dataFrame,
loadDataFrame,
updateModel)
}
else {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
// getting all fields except tupleId field as it is not required in the value
var otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
new Column(field.name
.substring(0,
field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
} else {

new Column(field.name)
}
})

// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).as("segId")
val dictionaryDataFrame = if (updateModel.isDefined) {
// use dataFrameWithoutTupleId as dictionaryDataFrame
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
otherFields = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
Some(dataFrameWithoutTupleId)
} else {
(dataFrame, dataFrame)
dataFrame
}
GlobalDictionaryUtil
.generateGlobalDictionary(
Expand Down

0 comments on commit c128a75

Please sign in to comment.