Skip to content

Commit

Permalink
Merge 9fd4139 into 14f170c
Browse files Browse the repository at this point in the history
  • Loading branch information
Indhumathi27 committed Jun 15, 2019
2 parents 14f170c + 9fd4139 commit 8c0407d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 7 deletions.
Expand Up @@ -2194,4 +2194,6 @@ private CarbonCommonConstants() {

public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT =
"50";

public static final int MAXIMUM_CHAR_LENGTH = 128;
}
Expand Up @@ -72,13 +72,15 @@ object MVHelper {
}
val updatedQueryWithDb = validateMVQuery(sparkSession, logicalPlan)
val fullRebuild = isFullReload(logicalPlan)
var counter = 0
val fields = logicalPlan.output.map { attr =>
if (attr.dataType.isInstanceOf[ArrayType] || attr.dataType.isInstanceOf[StructType] ||
attr.dataType.isInstanceOf[MapType]) {
throw new UnsupportedOperationException(
s"MV datamap is unsupported for ComplexData type column: " + attr.name)
}
val name = updateColumnName(attr)
val name = updateColumnName(attr, counter)
counter = counter + 1
val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName
if (attr.dataType.typeName.startsWith("decimal")) {
val (precision, scale) = CommonUtil.getScaleAndPrecision(attr.dataType.catalogString)
Expand Down Expand Up @@ -319,19 +321,22 @@ object MVHelper {
modularPlan.asCompactSQL
}

def getUpdatedName(name: String): String = {
val updatedName = name.replace("(", "_")
def getUpdatedName(name: String, counter: Int): String = {
var updatedName = name.replace("(", "_")
.replace(")", "")
.replace(" ", "_")
.replace("=", "")
.replace(",", "")
.replace(".", "_")
.replace("`", "")
if (updatedName.length >= CarbonCommonConstants.MAXIMUM_CHAR_LENGTH) {
updatedName = updatedName.substring(0, 110) + CarbonCommonConstants.UNDERSCORE + counter
}
updatedName
}

def updateColumnName(attr: Attribute): String = {
val name = getUpdatedName(attr.name)
def updateColumnName(attr: Attribute, counter: Int): String = {
val name = getUpdatedName(attr.name, counter)
attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
}

Expand Down
Expand Up @@ -37,20 +37,24 @@ import org.apache.carbondata.spark.util.CommonUtil
*/
object MVUtil {

var counter = 0

/**
* Below method will be used to validate and get the required fields from select plan
*/
def getFieldsAndDataMapFieldsFromPlan(plan: LogicalPlan,
selectStmt: String,
sparkSession: SparkSession): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
plan match {
val fieldToDataMapField = plan match {
case Project(projectList, child: Sort) =>
getFieldsFromProject(projectList, plan, child)
case Project(projectList, _) =>
getFieldsFromProject(projectList, plan)
case Aggregate(groupByExp, aggExp, _) =>
getFieldsFromAggregate(groupByExp, aggExp, plan)
}
counter = 0
fieldToDataMapField
}

def getFieldsFromProject(projectList: Seq[NamedExpression],
Expand Down Expand Up @@ -255,7 +259,8 @@ object MVUtil {
aggregateType: String,
columnTableRelationList: ArrayBuffer[ColumnTableRelation],
parenTableName: String) = {
var actualColumnName = MVHelper.getUpdatedName(name)
var actualColumnName = MVHelper.getUpdatedName(name, counter)
counter = counter + 1
if (qualifier.isDefined) {
actualColumnName = qualifier.map(qualifier => qualifier + "_" + name)
.getOrElse(actualColumnName)
Expand Down
Expand Up @@ -1081,6 +1081,23 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName + "_table"))
}

test("test datamap column having more than 128 characters") {
sql("drop table IF EXISTS maintable")
sql("create table maintable (m_month smallint, c_code string, " +
"c_country smallint, d_dollar_value double, q_quantity double, u_unit smallint, b_country smallint, i_id int, y_year smallint) stored by 'carbondata'")
sql("insert into maintable select 10, 'xxx', 123, 456, 45, 5, 23, 1, 2000")
sql("drop datamap if exists da_agg")
sql("create datamap da_agg using 'mv' as select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10)" +
"then d_dollar_value else 0 end), sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and " +
"(m_month>=7 and m_month <=12)) then q_quantity else 0 end) from maintable group by u_unit, y_year, m_month, c_country, b_country")
val df = sql("select u_unit, y_year, m_month, c_country, b_country, sum(case when i_id=1 and (y_year=2000 and m_month=10) then d_dollar_value else 0 end), " +
"sum(case when i_id=1 and (y_year=2000 and m_month=10) then q_quantity else 0 end) ex, sum(case when i_id=1 and (y_year=2011 and (m_month>=7 and m_month " +
"<=12)) then q_quantity else 0 end) from maintable group by u_unit,y_year, m_month, c_country, b_country")
val analyzed = df.queryExecution.analyzed
assert(TestUtil.verifyMVDataMap(analyzed, "da_agg"))
sql("drop table IF EXISTS maintable")
}

def drop(): Unit = {
sql("drop table IF EXISTS fact_table1")
sql("drop table IF EXISTS fact_table2")
Expand Down

0 comments on commit 8c0407d

Please sign in to comment.