Skip to content

Commit

Permalink
Added SORT_SCOPE in Load Options & in SET Command
Browse files Browse the repository at this point in the history
1. Added load level SORT_SCOPE
2. Added Sort_Scope for PreAgg
3. Added sort_scope msg for LoadDataCommand
4. Added property CARBON.TABLE.LOAD.SORT.SCOPE.<database>.<table> to set table level sort_scope property
5. Removed test case veryfying LOAD_OPTIONS with SORT_SCOPE
  • Loading branch information
NamanRastogi committed Jan 8, 2019
1 parent 96b2ea3 commit 2e994ed
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 43 deletions.
Expand Up @@ -80,6 +80,12 @@ public final class CarbonLoadOptionConstants {
public static final String CARBON_OPTIONS_SORT_SCOPE =
"carbon.options.sort.scope";

/**
* option to specify table level sort_scope
*/
@CarbonProperty(dynamicConfigurable = true)
public static final String CARBON_TABLE_LOAD_SORT_SCOPE = "carbon.table.load.sort.scope.";

/**
* option to specify the batch sort size inmb
*/
Expand Down
Expand Up @@ -161,7 +161,7 @@ private boolean validateKeyValue(String key, String value) throws InvalidConfigu
isValid = CarbonUtil.isValidSortOption(value);
if (!isValid) {
throw new InvalidConfigurationException("The sort scope " + key
+ " can have only either BATCH_SORT or LOCAL_SORT or NO_SORT.");
+ " can have only either NO_SORT, BATCH_SORT, LOCAL_SORT or GLOBAL_SORT.");
}
break;
case CARBON_OPTIONS_BATCH_SORT_SIZE_INMB:
Expand Down Expand Up @@ -229,6 +229,12 @@ private boolean validateKeyValue(String key, String value) throws InvalidConfigu
if (!isValid) {
throw new InvalidConfigurationException("Invalid value " + value + " for key " + key);
}
} else if (key.startsWith(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE)) {
isValid = CarbonUtil.isValidSortOption(value);
if (!isValid) {
throw new InvalidConfigurationException("The sort scope " + key
+ " can have only either NO_SORT, BATCH_SORT, LOCAL_SORT or GLOBAL_SORT.");
}
} else {
throw new InvalidConfigurationException(
"The key " + key + " not supported for dynamic configuration.");
Expand Down
Expand Up @@ -32,25 +32,6 @@ class TestCreateTableWithSortScope extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS tableWithBatchSort")
sql("DROP TABLE IF EXISTS tableWithNoSort")
sql("DROP TABLE IF EXISTS tableWithUnsupportSortScope")
sql("DROP TABLE IF EXISTS tableLoadWithSortScope")
}

test("Do not support load data with specify sort scope") {
sql(
s"""
| CREATE TABLE tableLoadWithSortScope(
| intField INT,
| stringField STRING
| )
| STORED BY 'carbondata'
| TBLPROPERTIES('SORT_COLUMNS'='stringField')
""".stripMargin)

val exception_loaddata_sortscope: Exception = intercept[Exception] {
sql("LOAD DATA LOCAL INPATH '/path/to/data' INTO TABLE tableLoadWithSortScope " +
"OPTIONS('SORT_SCOPE'='GLOBAL_SORT')")
}
assert(exception_loaddata_sortscope.getMessage.contains("Error: Invalid option(s): sort_scope"))
}

test("test create table with sort scope in normal cases") {
Expand Down
Expand Up @@ -109,7 +109,7 @@ object StreamSinkFactory {
carbonLoadModel.setSegmentId(segmentId)

// Used to generate load commands for child tables in case auto-handoff is fired.
val loadMetaEvent = new LoadMetadataEvent(carbonTable, false)
val loadMetaEvent = new LoadMetadataEvent(carbonTable, false, parameters.asJava)
OperationListenerBus.getInstance().fireEvent(loadMetaEvent, operationContext)
// start server if necessary
val server = startDictionaryServer(
Expand Down
Expand Up @@ -1096,7 +1096,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
"TIMESTAMPFORMAT",
"SKIP_EMPTY_LINE",
"SORT_COLUMN_BOUNDS",
"LOAD_MIN_SIZE_INMB"
"LOAD_MIN_SIZE_INMB",
"SORT_SCOPE"
)
var isSupported = true
val invalidOptions = StringBuilder.newBuilder
Expand Down
Expand Up @@ -95,9 +95,9 @@ case class CarbonAlterTableCompactionCommand(
// If set to true then only loadCommands for compaction will be created.
val loadMetadataEvent =
if (alterTableModel.compactionType.equalsIgnoreCase(CompactionType.STREAMING.name())) {
new LoadMetadataEvent(table, false)
new LoadMetadataEvent(table, false, Map.empty[String, String].asJava)
} else {
new LoadMetadataEvent(table, true)
new LoadMetadataEvent(table, true, Map.empty[String, String].asJava)
}
OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
}
Expand Down
Expand Up @@ -72,8 +72,7 @@ import org.apache.carbondata.events.exception.PreEventException
import org.apache.carbondata.processing.loading.{ComplexDelimitersEnum, TableProcessingOperations}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
Expand Down Expand Up @@ -142,7 +141,7 @@ case class CarbonLoadDataCommand(
}
operationContext.setProperty("isOverwrite", isOverwriteTable)
if(CarbonUtil.hasAggregationDataMap(table)) {
val loadMetadataEvent = new LoadMetadataEvent(table, false)
val loadMetadataEvent = new LoadMetadataEvent(table, false, options.asJava)
OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
}
Seq.empty
Expand Down Expand Up @@ -191,10 +190,34 @@ case class CarbonLoadDataCommand(
optionsFinal
.put("complex_delimiter_level_4",
ComplexDelimitersEnum.COMPLEX_DELIMITERS_LEVEL_4.value())
optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))

/**
* Priority of sort_scope assignment :
* -----------------------------------
*
* 1. Load Options ->
* LOAD DATA INPATH 'data.csv' INTO TABLE tableName OPTIONS('sort_scope'='no_sort')
*
* 2. Session property CARBON_TABLE_LOAD_SORT_SCOPE ->
* SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=no_sort
* SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=batch_sort
* SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=local_sort
* SET CARBON.TABLE.LOAD.SORT.SCOPE.database.table=global_sort
*
* 3. Sort Scope provided in TBLPROPERTIES
* 4. Session property CARBON_OPTIONS_SORT_SCOPE
* 5. Default Sort Scope LOAD_SORT_SCOPE
*/
optionsFinal.put("sort_scope",
options.getOrElse("sort_scope",
carbonProperty.getProperty(
CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE + table.getDatabaseName + "." +
table.getTableName,
tableProperties.asScala.getOrElse("sort_scope",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))))

optionsFinal
.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
val factPath = if (dataFrame.isDefined) {
Expand Down Expand Up @@ -304,6 +327,7 @@ case class CarbonLoadDataCommand(
}
val partitionStatus = SegmentStatus.SUCCESS
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
if (carbonLoadModel.getUseOnePass) {
loadDataUsingOnePass(
sparkSession,
Expand Down
Expand Up @@ -349,7 +349,8 @@ object CompactionProcessMetaListener extends OperationEventListener {
TableIdentifier(childTableName, Some(childDatabaseName)),
childDataFrame,
false,
sparkSession)
sparkSession,
mutable.Map.empty[String, String])
val uuid = Option(operationContext.getProperty("uuid")).
getOrElse(UUID.randomUUID()).toString
operationContext.setProperty("uuid", uuid)
Expand Down Expand Up @@ -377,7 +378,8 @@ object CompactionProcessMetaListener extends OperationEventListener {
TableIdentifier(childTableName, Some(childDatabaseName)),
childDataFrame,
false,
sparkSession)
sparkSession,
mutable.Map.empty[String, String])
val uuid = Option(operationContext.getProperty("uuid")).getOrElse(UUID.randomUUID()).toString
loadCommand.processMetadata(sparkSession)
operationContext.setProperty(table.getTableName + "_Compaction", loadCommand)
Expand Down Expand Up @@ -453,6 +455,7 @@ object LoadProcessMetaListener extends OperationEventListener {
childDataFrame,
isOverwrite,
sparkSession,
tableEvent.getOptions.asScala,
timeseriesParentTableName = childSelectQuery._2)
operationContext.setProperty("uuid", uuid)
loadCommand.operationContext.setProperty("uuid", uuid)
Expand Down
Expand Up @@ -253,7 +253,8 @@ case class PreAggregateTableHelper(
tableIdentifier,
dataFrame,
isOverwrite = false,
sparkSession = sparkSession)
sparkSession = sparkSession,
mutable.Map.empty[String, String])
loadCommand.processMetadata(sparkSession)
Seq.empty
}
Expand Down
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.command.preaaggregate

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession, _}
Expand Down Expand Up @@ -893,6 +894,7 @@ object PreAggregateUtil {
dataFrame: DataFrame,
isOverwrite: Boolean,
sparkSession: SparkSession,
options: mutable.Map[String, String],
timeseriesParentTableName: String = ""): CarbonLoadDataCommand = {
val headers = columns.asScala.filter { column =>
!column.getColumnName.equalsIgnoreCase(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)
Expand Down
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand

import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal}
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonCommonConstantsInternal, CarbonLoadOptionConstants}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}

case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
Expand Down Expand Up @@ -84,10 +84,7 @@ case class CarbonSetCommand(command: SetCommand)
object CarbonSetCommand {
def validateAndSetValue(sessionParams: SessionParams, key: String, value: String): Unit = {
val isCarbonProperty: Boolean = CarbonProperties.getInstance().isCarbonProperty(key)
if (isCarbonProperty) {
sessionParams.addProperty(key, value)
}
else if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
if (key.startsWith(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
if (key.split("\\.").length == 5) {
sessionParams.addProperty(key.toLowerCase(), value)
}
Expand All @@ -96,8 +93,6 @@ object CarbonSetCommand {
"property should be in \" carbon.input.segments.<database_name>" +
".<table_name>=<seg_id list> \" format.")
}
} else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) {
sessionParams.addProperty(key.toLowerCase(), value)
} else if (key.startsWith(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING)) {
sessionParams.addProperty(key.toLowerCase(), value)
} else if (key.startsWith(CarbonCommonConstants.CARBON_DATAMAP_VISIBLE)) {
Expand All @@ -117,6 +112,17 @@ object CarbonSetCommand {
"property should be in \" carbon.load.datamaps.parallel.<database_name>" +
".<table_name>=<true/false> \" format.")
}
} else if (key.startsWith(CarbonLoadOptionConstants.CARBON_TABLE_LOAD_SORT_SCOPE)) {
if (key.split("\\.").length == 7) {
sessionParams.addProperty(key.toLowerCase(), value)
}
else {
throw new MalformedCarbonCommandException(
"property should be in \" carbon.table.load.sort.scope.<database_name>" +
".<table_name>=<sort_sope> \" format.")
}
} else if (isCarbonProperty) {
sessionParams.addProperty(key, value)
}
}

Expand Down
Expand Up @@ -128,11 +128,39 @@ class SetCommandTestCase extends Spark2QueryTest with BeforeAndAfterAll{
sql(s"set ${CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS}"))
}
}

test(s"test set carbon.table.load.sort.scope for valid options") {
checkAnswer(
sql(s"set carbon.table.load.sort.scope.db.tbl=no_sort"),
sql(s"set carbon.table.load.sort.scope.db.tbl"))

checkAnswer(
sql(s"set carbon.table.load.sort.scope.db.tbl=batch_sort"),
sql(s"set carbon.table.load.sort.scope.db.tbl"))

checkAnswer(
sql(s"set carbon.table.load.sort.scope.db.tbl=local_sort"),
sql(s"set carbon.table.load.sort.scope.db.tbl"))

checkAnswer(
sql(s"set carbon.table.load.sort.scope.db.tbl=global_sort"),
sql(s"set carbon.table.load.sort.scope.db.tbl"))
}

test(s"test set carbon.table.load.sort.scope for invalid options") {
intercept[InvalidConfigurationException] {
checkAnswer(
sql(s"set carbon.table.load.sort.scope.db.tbl=fake_sort"),
sql(s"set carbon.table.load.sort.scope.db.tbl"))
}
}

override def afterAll {
sqlContext.sparkSession.catalog.clearCache()
sql("reset")
sql("set carbon=true")
checkAnswer(sql("set carbon"),
sql("set"))
sql("drop database if exists db cascade")
}
}
Expand Up @@ -66,7 +66,7 @@ public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String[] s
return buildInternalWithNoConverter(inputIterators, configuration, sortScope);
} else if (loadModel.isJsonFileLoad()) {
return buildInternalWithJsonInputProcessor(inputIterators, configuration, sortScope);
} else if (!configuration.isSortTable() || sortScope.equals(
} else if (!configuration.isSortTable() && sortScope.equals(
SortScopeOptions.SortScope.NO_SORT)) {
return buildInternalForNoSort(inputIterators, configuration);
} else if (configuration.getBucketingInfo() != null) {
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.carbondata.processing.loading.events;

import java.util.Map;

import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.events.Event;
Expand Down Expand Up @@ -101,16 +103,25 @@ public CarbonTableIdentifier getCarbonTableIdentifier() {
public static class LoadMetadataEvent extends Event {
private CarbonTable carbonTable;
private boolean isCompaction;
public LoadMetadataEvent(CarbonTable carbonTable, boolean isCompaction) {
private Map<String, String> options;

public LoadMetadataEvent(CarbonTable carbonTable, boolean isCompaction,
Map<String, String> options) {
this.carbonTable = carbonTable;
this.isCompaction = isCompaction;
this.options = options;
}
public boolean isCompaction() {
return isCompaction;
}
public CarbonTable getCarbonTable() {
return carbonTable;
}


public Map<String, String> getOptions() {
return options;
}
}

public static class LoadTablePostStatusUpdateEvent extends Event {
Expand Down

0 comments on commit 2e994ed

Please sign in to comment.