Skip to content

Commit

Permalink
[CARBONDATA-3257] Fix for NO_SORT load and describe formatted being i…
Browse files Browse the repository at this point in the history
…n NO_SORT flow even with Sort Columns given

Problem: Data Load is in No sort flow when version is upgraded even if sort columns are given. Also describe formatted displays wrong sort scope after refresh.

Solution: Added a condition to check for the presence of Sort Columns.

This closes #3083
  • Loading branch information
manishnalla1994 authored and kumarvishal09 committed Jan 23, 2019
1 parent 134df2b commit 7916aa6
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 28 deletions.
Expand Up @@ -426,6 +426,7 @@ private CarbonCommonConstants() {
*/
public static final String DICTIONARY_PATH = "dictionary_path";
public static final String SORT_COLUMNS = "sort_columns";
public static final String SORT_SCOPE = "sort_scope";
public static final String RANGE_COLUMN = "range_column";
public static final String PARTITION_TYPE = "partition_type";
public static final String NUM_PARTITIONS = "num_partitions";
Expand Down
Expand Up @@ -209,11 +209,11 @@ class SetParameterTestCase extends QueryTest with BeforeAndAfterAll {
sql("SET carbon.options.sort.scope=local_sort")
sql(
"create table carbon_table(empno int, empname String, designation String, doj Timestamp," +
"workgroupcategory int) STORED BY 'org.apache.carbondata.format'")
checkExistence(sql("DESC FORMATTED carbon_table"), true, "LOCAL_SORT")
val sortscope=sql("DESC FORMATTED carbon_table").collect().filter(_.getString(1).trim.equals("LOCAL_SORT"))
"workgroupcategory int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_COLUMNS'='empno,empname')")
checkExistence(sql("DESC FORMATTED carbon_table"), true, "local_sort")
val sortscope=sql("DESC FORMATTED carbon_table").collect().filter(_.getString(1).trim.equals("local_sort"))
assertResult(1)(sortscope.length)
assertResult("LOCAL_SORT")(sortscope(0).getString(1).trim)
assertResult("local_sort")(sortscope(0).getString(1).trim)
}

test("TC_011-test SET property to Enable Unsafe Sort") {
Expand Down
Expand Up @@ -854,18 +854,6 @@ class TableNewProcessor(cm: TableModel) {
tableSchema.getTableId,
cm.databaseNameOp.getOrElse("default"))
tablePropertiesMap.put("bad_record_path", badRecordsPath)
if (tablePropertiesMap.get("sort_columns") != null) {
val sortCol = tablePropertiesMap.get("sort_columns")
if ((!sortCol.trim.isEmpty) && tablePropertiesMap.get("sort_scope") == null) {
// If sort_scope is not specified, but sort_columns are present, set sort_scope as
// local_sort in carbon_properties (cannot add in table properties as if user sets carbon
// properties it won't be reflected as table properties is given higher priority)
if (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE) ==
null) {
tablePropertiesMap.put("sort_scope", "LOCAL_SORT")
}
}
}
tableSchema.setTableProperties(tablePropertiesMap)
if (cm.bucketFields.isDefined) {
val bucketCols = cm.bucketFields.get.bucketColumns.map { b =>
Expand Down
Expand Up @@ -209,15 +209,28 @@ case class CarbonLoadDataCommand(
* 4. Session property CARBON_OPTIONS_SORT_SCOPE
* 5. Default Sort Scope LOAD_SORT_SCOPE
*/
optionsFinal.put("sort_scope",
options.getOrElse("sort_scope",
carbonProperty.getProperty(
CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
table.getTableName,
tableProperties.asScala.getOrElse("sort_scope",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
if (tableProperties.get(CarbonCommonConstants.SORT_COLUMNS) != null &&
tableProperties.get(CarbonCommonConstants.SORT_SCOPE) == null) {
// If there are Sort Columns given for the table and Sort Scope is not specified,
// we will take it as whichever sort scope given or LOCAL_SORT as default
optionsFinal
.put(CarbonCommonConstants.SORT_SCOPE,
carbonProperty
.getProperty(
CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
table.getTableName, carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
SortScopeOptions.getSortScope("LOCAL_SORT").toString)))
} else {
optionsFinal.put(CarbonCommonConstants.SORT_SCOPE,
options.getOrElse(CarbonCommonConstants.SORT_SCOPE,
carbonProperty.getProperty(
CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
table.getTableName,
tableProperties.asScala.getOrElse(CarbonCommonConstants.SORT_SCOPE,
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))
}

optionsFinal
.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
Expand Down
Expand Up @@ -30,11 +30,11 @@ import org.apache.spark.sql.execution.command.MetadataCommand
import org.apache.spark.sql.hive.CarbonRelation

import org.apache.carbondata.common.Strings
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}

private[sql] case class CarbonDescribeFormattedCommand(
child: SparkPlan,
Expand All @@ -54,10 +54,22 @@ private[sql] case class CarbonDescribeFormattedCommand(

val carbonTable = relation.carbonTable
val tblProps = carbonTable.getTableInfo.getFactTable.getTableProperties.asScala
// If Sort Columns are given and Sort Scope is not given in either table properties
// or carbon properties then pass LOCAL_SORT as the sort scope,
// else pass NO_SORT
val sortScope = if (carbonTable.getNumberOfSortColumns == 0) {
"NO_SORT"
} else {
tblProps.getOrElse("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
if (tblProps.contains(CarbonCommonConstants.SORT_SCOPE)) {
tblProps.get(CarbonCommonConstants.SORT_SCOPE).toString
} else {
tblProps
.getOrElse(CarbonCommonConstants.SORT_SCOPE,
CarbonProperties.getInstance()
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
"LOCAL_SORT")))
}
}
val streaming: String = if (carbonTable.isStreamingSink) {
"sink"
Expand Down

0 comments on commit 7916aa6

Please sign in to comment.