Navigation Menu

Skip to content

Commit

Permalink
[CARBONDATA-3442]Fix creating mv datamap with column name having leng…
Browse files Browse the repository at this point in the history
…th more than 128
  • Loading branch information
Indhumathi27 committed Jun 21, 2019
1 parent 51b3702 commit d7808e7
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 11 deletions.
Expand Up @@ -2235,4 +2235,9 @@ private CarbonCommonConstants() {
* index server temp file name
*/
public static final String INDEX_SERVER_TEMP_FOLDER_NAME = "indexservertmp";

/**
* hive column-name maximum length
*/
public static final int MAXIMUM_CHAR_LENGTH = 128;
}
Expand Up @@ -61,7 +61,8 @@ object MVHelper {
s"MV datamap does not support streaming"
)
}
MVUtil.validateDMProperty(dmProperties)
val mvUtil = new MVUtil
mvUtil.validateDMProperty(dmProperties)
val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
val query = sparkSession.sql(updatedQuery)
val logicalPlan = MVHelper.dropDummFuc(query.queryExecution.analyzed)
Expand All @@ -79,6 +80,7 @@ object MVHelper {
}
val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
val fullRebuild = isFullReload(logicalPlan)
var counter = 0
// the ctas query can have duplicate columns, so we should take distinct and create fields,
// so that it won't fail during create mv table
val fields = logicalPlan.output.map { attr =>
Expand All @@ -87,7 +89,8 @@ object MVHelper {
throw new UnsupportedOperationException(
s"MV datamap is unsupported for ComplexData type column: " + attr.name)
}
val name = updateColumnName(attr)
val name = updateColumnName(attr, counter)
counter += 1
val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName
if (attr.dataType.typeName.startsWith("decimal")) {
val (precision, scale) = CommonUtil.getScaleAndPrecision(attr.dataType.catalogString)
Expand Down Expand Up @@ -137,7 +140,7 @@ object MVHelper {
tableProperties.put(CarbonCommonConstants.DATAMAP_NAME, dataMapSchema.getDataMapName)
tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))

val fieldRelationMap = MVUtil.getFieldsAndDataMapFieldsFromPlan(
val fieldRelationMap = mvUtil.getFieldsAndDataMapFieldsFromPlan(
logicalPlan, queryString, sparkSession)
// If dataMap is mapped to single main table, then inherit table properties from main table,
// else, will use default table properties. If DMProperties contains table properties, then
Expand Down Expand Up @@ -329,19 +332,22 @@ object MVHelper {
modularPlan.asCompactSQL
}

def getUpdatedName(name: String): String = {
val updatedName = name.replace("(", "_")
def getUpdatedName(name: String, counter: Int): String = {
var updatedName = name.replace("(", "_")
.replace(")", "")
.replace(" ", "_")
.replace("=", "")
.replace(",", "")
.replace(".", "_")
.replace("`", "")
if (updatedName.length >= CarbonCommonConstants.MAXIMUM_CHAR_LENGTH) {
updatedName = updatedName.substring(0, 110) + CarbonCommonConstants.UNDERSCORE + counter
}
updatedName
}

def updateColumnName(attr: Attribute): String = {
val name = getUpdatedName(attr.name)
def updateColumnName(attr: Attribute, counter: Int): String = {
val name = getUpdatedName(attr.name, counter)
attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
}

Expand Down
Expand Up @@ -35,7 +35,9 @@ import org.apache.carbondata.spark.util.CommonUtil
/**
* Utility class for keeping all the utility method for mv datamap
*/
object MVUtil {
class MVUtil {

var counter = 0

/**
* Below method will be used to validate and get the required fields from select plan
Expand Down Expand Up @@ -127,9 +129,9 @@ object MVUtil {
aggExp.map { agg =>
var aggregateType: String = ""
val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
var isLiteralPresent = false
agg.collect {
case Alias(attr: AggregateExpression, name) =>
var isLiteralPresent = false
attr.aggregateFunction.collect {
case l@Literal(_, _) =>
isLiteralPresent = true
Expand Down Expand Up @@ -175,7 +177,7 @@ object MVUtil {
}
}
}
if (!aggregateType.isEmpty && arrayBuffer.nonEmpty) {
if (!aggregateType.isEmpty && arrayBuffer.nonEmpty && !isLiteralPresent) {
fieldToDataMapFieldMap +=
getFieldToDataMapFields(agg.name,
agg.dataType,
Expand Down Expand Up @@ -260,7 +262,8 @@ object MVUtil {
aggregateType: String,
columnTableRelationList: ArrayBuffer[ColumnTableRelation],
parenTableName: String) = {
var actualColumnName = MVHelper.getUpdatedName(name)
var actualColumnName = MVHelper.getUpdatedName(name, counter)
counter += 1
if (qualifier.isDefined) {
actualColumnName = qualifier.map(qualifier => qualifier + "_" + name)
.getOrElse(actualColumnName)
Expand Down
Expand Up @@ -1092,6 +1092,23 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
assert(TestUtil.verifyMVDataMap(analyzed4, "constant_mv"))
}

test("test datamap column having more than 128 characters") {
sql("drop table IF EXISTS maintable")
sql("create table maintable (m_month smallint, c_code string, " +
"c_country smallint, d_dollar_value double, q_quantity double, u_unit smallint, b_country smallint, i_id int, y_year smallint) stored by 'carbondata'")
sql("insert into maintable select 10, 'xxx', 123, 456, 45, 5, 23, 1, 2000")
sql("drop datamap if exists da_agg")
sql("create datamap da_agg using 'mv' as select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10)" +
"then d_dollar_value else 0 end), sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and " +
"(m_month>=7 and m_month <=12)) then q_quantity else 0 end) from maintable group by u_unit, y_year, m_month, c_country, b_country")
val df = sql("select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10) then d_dollar_value else 0 end), " +
"sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and (m_month>=7 and m_month " +
"<=12)) then q_quantity else 0 end) from maintable group by u_unit,y_year, m_month, c_country, b_country")
val analyzed = df.queryExecution.analyzed
assert(TestUtil.verifyMVDataMap(analyzed, "da_agg"))
sql("drop table IF EXISTS maintable")
}

def drop(): Unit = {
sql("drop table IF EXISTS fact_table1")
sql("drop table IF EXISTS fact_table2")
Expand Down

0 comments on commit d7808e7

Please sign in to comment.