Skip to content

Commit

Permalink
[CARBONDATA-1304] Support IUD with single pass
Browse files Browse the repository at this point in the history
[CARBONDATA-1293] update on carbon data failed with carbon.update.persist.enable false
  • Loading branch information
mohammadshahidkhan committed Nov 21, 2017
1 parent cb6c271 commit e2e7b6e
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.iud

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties

class TestUpdateCarbonTableWithPersistFalse extends UpdateCarbonTableTestCase {
override def beforeAll {
super.beforeAll
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
.addProperty(CarbonCommonConstants.isPersistEnabled, "false")
}

override def afterAll {
super.afterAll
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
.addProperty(CarbonCommonConstants.isPersistEnabled,
CarbonCommonConstants.defaultValueIsPersistEnabled)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.iud

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties

class TestUpdateCarbonTableWithSinglePass extends UpdateCarbonTableTestCase {
override def beforeAll {
super.beforeAll
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=true""")
}
override def afterAll {
super.afterAll
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=false""")
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION ,
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,21 @@ object CarbonScalaUtil {
} cannot be modified. Only Int and Decimal data types are allowed for modification")
}
}

/**
* returns all fields except tupleId field as it is not required in the value
*
* @param fields
* @return
*/
def getAllFieldsWithoutTupleIdField(fields: Array[StructField]): Seq[Column] = {
// getting all fields except tupleId field as it is not required in the value
val otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
new Column(field.name)
})
otherFields
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarForma
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
Expand Down Expand Up @@ -404,6 +404,8 @@ object CarbonDataRDDFactory {
} else {
// in success case handle updation of the table status file.
// success case.
// write the dictionary file in case of single_pass true
writeDictionary(carbonLoadModel, result, false)
val segmentDetails = new util.HashSet[String]()
var resultSize = 0
res.foreach { resultOfSeg =>
Expand Down Expand Up @@ -539,7 +541,9 @@ object CarbonDataRDDFactory {
} else {
// splitting as (key, value) i.e., (segment, updatedRows)
val keyRDD = updateRdd.map(row =>
(row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
(CarbonUpdateUtil
.getRequiredFieldFromTID(row.get(row.size - 1).toString, TupleIdEnum.SEGMENT_ID),
Row(row.toSeq.slice(0, row.size - 1): _*)))

val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
carbonTable.getMetaDataFilepath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}

case class LoadTableCommand(
databaseNameOp: Option[String],
Expand Down Expand Up @@ -171,21 +171,35 @@ case class LoadTableCommand(
}
val partitionStatus = SegmentStatus.SUCCESS
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
var otherFields: Seq[Column] = Seq.empty
val loadDataFrame = if (updateModel.isDefined) {
otherFields = CarbonScalaUtil
.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// appending the tupleId field in the end
val otherFieldsWithTupleId = otherFields :+ new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).as("tupleId")
val dataFrameWithTupleId = dataFrame.get.select(otherFieldsWithTupleId: _*)
Some(dataFrameWithTupleId)
} else {
dataFrame
}
if (carbonLoadModel.getUseOnePass) {
loadDataUsingOnePass(
sparkSession,
carbonProperty,
carbonLoadModel,
columnar,
partitionStatus,
hadoopConf)
hadoopConf,
loadDataFrame)
} else {
loadData(
sparkSession,
carbonLoadModel,
columnar,
partitionStatus,
hadoopConf)
hadoopConf,
loadDataFrame)
}
} catch {
case CausedBy(ex: NoRetryException) =>
Expand Down Expand Up @@ -233,7 +247,8 @@ case class LoadTableCommand(
carbonLoadModel: CarbonLoadModel,
columnar: Boolean,
partitionStatus: SegmentStatus,
hadoopConf: Configuration): Unit = {
hadoopConf: Configuration,
loadDataFrame: Option[DataFrame]): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
.getCarbonTableIdentifier
Expand Down Expand Up @@ -302,7 +317,7 @@ case class LoadTableCommand(
server,
isOverwriteTable,
hadoopConf,
dataFrame,
loadDataFrame,
updateModel)
}

Expand All @@ -311,33 +326,17 @@ case class LoadTableCommand(
carbonLoadModel: CarbonLoadModel,
columnar: Boolean,
partitionStatus: SegmentStatus,
hadoopConf: Configuration): Unit = {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
// extracting only segment from tupleId
val getSegIdUDF = udf((tupleId: String) =>
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
// getting all fields except tupleId field as it is not required in the value
var otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => new Column(field.name))

// extract tupleId field which will be used as a key
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
hadoopConf: Configuration,
loadDataFrame: Option[DataFrame]): Unit = {
val dictionaryDataFrame = if (updateModel.isDefined) {
val otherFields: Seq[Column] = CarbonScalaUtil
.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// use dataFrameWithoutTupleId as dictionaryDataFrame
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
otherFields = otherFields :+ segIdColumn
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
Some(dataFrameWithoutTupleId)
} else {
(dataFrame, dataFrame)
dataFrame
}

GlobalDictionaryUtil.generateGlobalDictionary(
sparkSession.sqlContext,
carbonLoadModel,
Expand Down

0 comments on commit e2e7b6e

Please sign in to comment.