Skip to content

Commit

Permalink
[CARBONDATA-1304] Support IUD with single pass
Browse files Browse the repository at this point in the history
  • Loading branch information
mohammadshahidkhan committed Jul 12, 2017
1 parent 5ea7c5b commit 7c0c452
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.spark.testsuite.iud

import org.apache.spark.sql.Row
import org.apache.spark.sql.common.util.QueryTest
import org.scalatest.BeforeAndAfterAll

import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.CarbonProperties

class UpdateCarbonTableTestCaseWithSinglePass extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, LoggerAction.FORCE.name())
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=true""").show
}


test("test update operation with Badrecords action as force with single_pass.") {
sql("""drop table if exists badtable""").show
sql("""create table badtable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/badrecord.csv' INTO table badtable""")
sql("""update badtable d set (d.c2) = (d.c2 / 1)""").show()
checkAnswer(
sql("""select c1,c2,c3,c5 from badtable"""),
Seq(Row("ravi",null,"kiran","huawei"),Row("manohar",null,"vanam","huawei"))
)
sql("""drop table badtable""").show


}
test("test update operation with Badrecords action as FAIL with single_pass.") {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION , LoggerAction.FAIL.name())
sql("""drop table if exists badtable""").show
sql("""create table badtable (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/badrecord.csv' INTO table badtable""")
val exec = intercept[Exception] {
sql("""update badtable d set (d.c2) = (d.c2 / 1)""").show()
}
checkAnswer(
sql("""select c1,c2,c3,c5 from badtable"""),
Seq(Row("ravi",2,"kiran","huawei"),Row("manohar",4,"vanam","huawei"))
)
sql("""drop table badtable""").show


}

override def afterAll {
sql(s"""set ${ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS }=false""").show
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION , CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,21 @@ object CarbonScalaUtil {
} cannot be modified. Only Int and Decimal data types are allowed for modification")
}
}

/**
* returns all fields except tupleId field as it is not required in the value
*
* @param fields
* @return
*/
def getAllFieldsWithoutTupleIdField(fields: Array[StructField]) : Seq[Column]= {
// getting all fields except tupleId field as it is not required in the value
val otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
new Column(field.name)
})
otherFields
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.language.implicitConversions

import org.apache.commons.lang3.StringUtils
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.{Column, _}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
Expand Down Expand Up @@ -56,7 +56,7 @@ import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConst
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.load.ValidateUtil
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DataManagementFunc, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}

object Checker {
def validateTableExists(
Expand Down Expand Up @@ -590,38 +590,35 @@ case class LoadTable(
} else {
None
}
val loadDataFrame = if (updateModel.isDefined) {
var otherFields = CarbonScalaUtil
.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// appending the tupleId field in the end
otherFields = otherFields :+ new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).as("tupleId")
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
Some(dataFrameWithTupleId)
} else {
dataFrame
}
CarbonDataRDDFactory.loadCarbonData(sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
columnar,
partitionStatus,
server,
dataFrame,
loadDataFrame,
updateModel)
} else {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
// getting all fields except tupleId field as it is not required in the value
var otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
new Column(field.name
.substring(0,
field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
} else {

new Column(field.name)
}
})

// extract tupleId field which will be used as a key
val segIdColumn = new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).as("tupleId")
var otherFields = CarbonScalaUtil
.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// use dataFrameWithoutTupleId as dictionaryDataFrame
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
otherFields = otherFields :+ segIdColumn
// append the tupleId in the end which will be used as a key
otherFields = otherFields :+ new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).as("tupleId")
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,8 @@ object CarbonDataRDDFactory {
} else {
// in success case handle updation of the table status file.
// success case.
// write the dictionary file in case of single_pass true
writeDictionary(carbonLoadModel, result, false)
val segmentDetails = new util.HashSet[String]()

var resultSize = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.processing.constants.TableOptionConstant
Expand All @@ -55,7 +54,7 @@ import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConst
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.load.ValidateUtil
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CarbonSparkUtil, CommonUtil, GlobalDictionaryUtil}

object Checker {
def validateTableExists(
Expand Down Expand Up @@ -655,39 +654,36 @@ case class LoadTable(
} else {
None
}
val loadDataFrame = if (updateModel.isDefined) {
var otherFields = CarbonScalaUtil
.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// appending the tupleId field in the end
otherFields = otherFields :+ new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).as("tupleId")
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
Some(dataFrameWithTupleId)
} else {
dataFrame
}
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
columnar,
partitionStatus,
server,
dataFrame,
loadDataFrame,
updateModel)
}
else {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
// getting all fields except tupleId field as it is not required in the value
var otherFields = fields.toSeq
.filter(field => !field.name
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
.map(field => {
if (field.name.endsWith(CarbonCommonConstants.UPDATED_COL_EXTENSION) && false) {
new Column(field.name
.substring(0,
field.name.lastIndexOf(CarbonCommonConstants.UPDATED_COL_EXTENSION)))
} else {

new Column(field.name)
}
})

// extract tupleId field which will be used as a key
val segIdColumn = new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).as("tupleId")
var otherFields = CarbonScalaUtil
.getAllFieldsWithoutTupleIdField(dataFrame.get.schema.fields)
// use dataFrameWithoutTupleId as dictionaryDataFrame
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
otherFields = otherFields :+ segIdColumn
// append the tupleId in the end which will be used as a key
otherFields = otherFields :+ new Column(UnresolvedAttribute
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)).as("tupleId")
// use dataFrameWithTupleId as loadDataFrame
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
Expand Down

0 comments on commit 7c0c452

Please sign in to comment.