Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai committed Apr 23, 2019
1 parent 1b49216 commit 4d93a58
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 84 deletions.
Expand Up @@ -47,6 +47,7 @@
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
import org.apache.carbondata.core.util.CarbonProperties;
Expand Down Expand Up @@ -143,8 +144,9 @@ public CarbonTable getTable() {
* @param filterExp
* @return
*/
public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolverIntf filterExp,
final List<PartitionSpec> partitions) throws IOException {
public List<ExtendedBlocklet> prune(List<Segment> segments, Expression filterExp,
final FilterResolverIntf filterResolver, final List<PartitionSpec> partitions)
throws IOException {
final List<ExtendedBlocklet> blocklets = new ArrayList<>();
final Map<Segment, List<DataMap>> dataMaps = dataMapFactory.getDataMaps(segments);
// for non-filter queries
Expand All @@ -164,15 +166,16 @@ public List<ExtendedBlocklet> prune(List<Segment> segments, final FilterResolver
// As 0.1 million files block pruning can take only 1 second.
// Doing multi-thread for smaller values is not recommended as
// driver should have minimum threads opened to support multiple concurrent queries.
if (filterExp == null) {
if (filterResolver == null) {
// if filter is not passed, then return all the blocklets.
return pruneWithoutFilter(segments, partitions, blocklets);
}
return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps);
return
pruneWithFilter(segments, filterExp, filterResolver, partitions, blocklets, dataMaps);
}
// handle by multi-thread
List<ExtendedBlocklet> extendedBlocklets =
pruneMultiThread(segments, filterExp, partitions, blocklets, dataMaps, totalFiles);
List<ExtendedBlocklet> extendedBlocklets = pruneMultiThread(
segments, filterExp, filterResolver, partitions, blocklets, dataMaps, totalFiles);
return extendedBlocklets;
}

Expand All @@ -187,14 +190,21 @@ private List<ExtendedBlocklet> pruneWithoutFilter(List<Segment> segments,
return blocklets;
}

private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
FilterResolverIntf filterExp, List<PartitionSpec> partitions,
private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments, Expression filterExp,
FilterResolverIntf filterResolver, List<PartitionSpec> partitions,
List<ExtendedBlocklet> blocklets, Map<Segment, List<DataMap>> dataMaps) throws IOException {
for (Segment segment : segments) {
List<Blocklet> pruneBlocklets = new ArrayList<>();
SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment);
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions));
if (filterExp != null && table.hasColumnDrift() &&
RestructureUtil.hasColumnDriftOnSegment(table, segmentProperties)) {
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions, table));
}
} else {
for (DataMap dataMap : dataMaps.get(segment)) {
pruneBlocklets.addAll(dataMap.prune(filterResolver, segmentProperties, partitions));
}
}
blocklets.addAll(
addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment),
Expand All @@ -204,9 +214,9 @@ private List<ExtendedBlocklet> pruneWithFilter(List<Segment> segments,
}

private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
final FilterResolverIntf filterExp, final List<PartitionSpec> partitions,
List<ExtendedBlocklet> blocklets, final Map<Segment, List<DataMap>> dataMaps,
int totalFiles) {
final Expression filterExp, final FilterResolverIntf filterResolver,
final List<PartitionSpec> partitions, List<ExtendedBlocklet> blocklets,
final Map<Segment, List<DataMap>> dataMaps, int totalFiles) {
/*
*********************************************************************************
* Below is the example of how this part of code works.
Expand Down Expand Up @@ -295,14 +305,25 @@ private List<ExtendedBlocklet> pruneMultiThread(List<Segment> segments,
SegmentProperties segmentProperties =
segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0));
Segment segment = segmentDataMapGroup.getSegment();
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(filterExp,
segmentProperties,
partitions);
pruneBlocklets.addAll(addSegmentId(blockletDetailsFetcher
.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
if (filterExp != null && table.hasColumnDrift() &&
RestructureUtil.hasColumnDriftOnSegment(table, segmentProperties)) {
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets =
dataMapList.get(i).prune(filterExp, segmentProperties, partitions, table);
pruneBlocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
}
} else {
for (int i = segmentDataMapGroup.getFromIndex();
i <= segmentDataMapGroup.getToIndex(); i++) {
List<Blocklet> dmPruneBlocklets =
dataMapList.get(i).prune(filterResolver, segmentProperties, partitions);
pruneBlocklets.addAll(addSegmentId(
blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
segment));
}
}
synchronized (prunedBlockletMap) {
List<ExtendedBlocklet> pruneBlockletsExisting =
Expand Down
Expand Up @@ -50,7 +50,7 @@ public DataMapExprWrapperImpl(TableDataMap dataMap, FilterResolverIntf expressio
@Override
public List<ExtendedBlocklet> prune(List<Segment> segments, List<PartitionSpec> partitionsToPrune)
throws IOException {
return dataMap.prune(segments, expression, partitionsToPrune);
return dataMap.prune(segments, null, expression, partitionsToPrune);
}

public List<ExtendedBlocklet> prune(DataMapDistributable distributable,
Expand Down
Expand Up @@ -120,6 +120,11 @@ public class CarbonTable implements Serializable {
*/
private List<CarbonMeasure> allMeasures;

/**
* list of column drift
*/
private List<CarbonDimension> columnDrift;

/**
* table bucket map.
*/
Expand Down Expand Up @@ -189,6 +194,7 @@ private CarbonTable() {
this.tablePartitionMap = new HashMap<>();
this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
this.tablePrimitiveDimensionsMap = new HashMap<String, List<CarbonDimension>>();
this.columnDrift = new ArrayList<CarbonDimension>();
}

/**
Expand Down Expand Up @@ -898,6 +904,12 @@ private void fillVisibleDimensions(String tableName) {
for (CarbonDimension dimension : allDimensions) {
if (!dimension.isInvisible()) {
visibleDimensions.add(dimension);
Map<String, String> columnProperties = dimension.getColumnProperties();
if (columnProperties != null) {
if (columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT) != null) {
columnDrift.add(dimension);
}
}
}
}
tableDimensionsMap.put(tableName, visibleDimensions);
Expand All @@ -912,6 +924,10 @@ public List<CarbonMeasure> getAllMeasures() {
return allMeasures;
}

public List<CarbonDimension> getColumnDrift() {
return columnDrift;
}

public boolean hasColumnDrift() {
return tableInfo.hasColumnDrift();
}
Expand Down
Expand Up @@ -504,4 +504,16 @@ public static void actualProjectionOfSegment(BlockExecutionInfo blockExecutionIn
projectionMeasures.toArray(new ProjectionMeasure[projectionMeasures.size()]));
}
}

public static boolean hasColumnDriftOnSegment(CarbonTable table,
SegmentProperties segmentProperties) {
for (CarbonDimension queryColumn : table.getColumnDrift()) {
for (CarbonMeasure tableColumn : segmentProperties.getMeasures()) {
if (isColumnMatches(table.isTransactionalTable(), queryColumn, tableColumn)) {
return true;
}
}
}
return false;
}
}
Expand Up @@ -487,8 +487,8 @@ private List<ExtendedBlocklet> getPrunedBlocklets(JobContext job, CarbonTable ca
List<ExtendedBlocklet> prunedBlocklets = null;
// This is to log the event, so user will know what is happening by seeing logs.
LOG.info("Started block pruning ...");
if (carbonTable.isTransactionalTable() && !carbonTable.hasColumnDrift()) {
prunedBlocklets = defaultDataMap.prune(segmentIds, resolver, partitionsToPrune);
if (carbonTable.isTransactionalTable()) {
prunedBlocklets = defaultDataMap.prune(segmentIds, expression, resolver, partitionsToPrune);
} else {
prunedBlocklets = defaultDataMap.prune(segmentIds, expression, partitionsToPrune);
}
Expand Down
Expand Up @@ -302,17 +302,17 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
ex = intercept[RuntimeException] {
sql("alter table alter_sc_validate set tblproperties('sort_columns'='doubleField')")
}
assert(ex.getMessage.contains("sort_columns is unsupported for DOUBLE data type column: doubleField"))
assert(ex.getMessage.contains("sort_columns is unsupported for DOUBLE datatype column: doubleField"))

ex = intercept[RuntimeException] {
sql("alter table alter_sc_validate set tblproperties('sort_columns'='arrayField')")
}
assert(ex.getMessage.contains("sort_columns is unsupported for ARRAY data type column: arrayField"))
assert(ex.getMessage.contains("sort_columns is unsupported for ARRAY datatype column: arrayField"))

ex = intercept[RuntimeException] {
sql("alter table alter_sc_validate set tblproperties('sort_columns'='structField')")
}
assert(ex.getMessage.contains("sort_columns is unsupported for STRUCT data type column: structField"))
assert(ex.getMessage.contains("sort_columns is unsupported for STRUCT datatype column: structField"))

ex = intercept[RuntimeException] {
sql("alter table alter_sc_validate set tblproperties('sort_columns'='structField.col1')")
Expand All @@ -324,7 +324,7 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
val ex = intercept[RuntimeException] {
sql("alter table alter_sc_long_string set tblproperties('sort_columns'='intField, stringField')")
}
assert(ex.getMessage.contains("sort_columns is unsupported for long string data type column: stringField"))
assert(ex.getMessage.contains("sort_columns is unsupported for long string datatype column: stringField"))
}

test("describe formatted") {
Expand Down
Expand Up @@ -232,6 +232,10 @@ class CarbonScanRDD[T: ClassTag](
statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis)
statisticRecorder.recordStatisticsForDriver(statistic, queryId)
statistic = new QueryStatistic()
// When the table has column drift, it means different blocks maybe have different schemas.
// the query doesn't support to scan the blocks with different schemas in a task.
// So if the table has the column drift, CARBON_TASK_DISTRIBUTION_MERGE_FILES and
// CARBON_TASK_DISTRIBUTION_CUSTOM can't work.
val carbonDistribution = if (directFill && !tableInfo.hasColumnDrift) {
CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES
} else {
Expand Down
Expand Up @@ -794,9 +794,6 @@ object CommonUtil {
storeLocation
}




/**
* This method will validate the cache level
*
Expand Down Expand Up @@ -928,13 +925,45 @@ object CommonUtil {
}
}

def validateSortColumns(
sortKey: Array[String],
fields: Seq[(String, String)],
varcharCols: Seq[String]
): Unit = {
if (sortKey.diff(sortKey.distinct).length > 0 ||
(sortKey.length > 1 && sortKey.contains(""))) {
throw new MalformedCarbonCommandException(
"SORT_COLUMNS Either having duplicate columns : " +
sortKey.diff(sortKey.distinct).mkString(",") + " or it contains illegal argumnet.")
}

sortKey.foreach { column =>
if (!fields.exists(x => x._1.equalsIgnoreCase(column))) {
val errorMsg = "sort_columns: " + column +
" does not exist in table. Please check the create table statement."
throw new MalformedCarbonCommandException(errorMsg)
} else {
val dataType = fields.find(x =>
x._1.equalsIgnoreCase(column)).get._2
if (isDataTypeSupportedForSortColumn(dataType)) {
val errorMsg = s"sort_columns is unsupported for $dataType datatype column: " + column
throw new MalformedCarbonCommandException(errorMsg)
}
if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
throw new MalformedCarbonCommandException(
s"sort_columns is unsupported for long string datatype column: $column")
}
}
}
}

def validateSortColumns(carbonTable: CarbonTable, newProperties: Map[String, String]): Unit = {
val fields = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties
var sortKeyOption = newProperties.get(CarbonCommonConstants.SORT_COLUMNS)
val varcharColsString = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS)
val varcharCols = if (varcharColsString == null) {
Array.empty[String]
val varcharCols: Seq[String] = if (varcharColsString == null) {
Seq.empty[String]
} else {
varcharColsString.split(",").map(_.trim)
}
Expand All @@ -946,31 +975,11 @@ object CommonUtil {
val sortKeyString = CarbonUtil.unquoteChar(sortKeyOption.get).trim
if (!sortKeyString.isEmpty) {
val sortKey = sortKeyString.split(',').map(_.trim)
if (sortKey.diff(sortKey.distinct).length > 0 ||
(sortKey.length > 1 && sortKey.contains(""))) {
throw new MalformedCarbonCommandException(
"SORT_COLUMNS Either having duplicate columns : " +
sortKey.diff(sortKey.distinct).mkString(",") + " or it contains illegal argumnet.")
}

sortKey.foreach { column =>
if (!fields.exists(x => x.getColName.equalsIgnoreCase(column))) {
val errorMsg = "sort_columns: " + column +
" does not exist in table. Please check the create table statement."
throw new MalformedCarbonCommandException(errorMsg)
} else {
val dataType = fields.find(x =>
x.getColName.equalsIgnoreCase(column)).get.getDataType.getName
if (isDataTypeSupportedForSortColumn(dataType)) {
val errorMsg = s"sort_columns is unsupported for $dataType data type column: " + column
throw new MalformedCarbonCommandException(errorMsg)
}
if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
throw new MalformedCarbonCommandException(
s"sort_columns is unsupported for long string data type column: $column")
}
}
}
validateSortColumns(
sortKey,
fields.map { field => (field.getColName, field.getDataType.getName) },
varcharCols
)
}
}

Expand Down
Expand Up @@ -753,32 +753,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
var sortKeyDimsTmp: Seq[String] = Seq[String]()
if (!sortKeyString.isEmpty) {
val sortKey = sortKeyString.split(',').map(_.trim)
if (sortKey.diff(sortKey.distinct).length > 0 ||
(sortKey.length > 1 && sortKey.contains(""))) {
throw new MalformedCarbonCommandException(
"SORT_COLUMNS Either having duplicate columns : " +
sortKey.diff(sortKey.distinct).mkString(",") + " or it contains illegal argumnet.")
}

sortKey.foreach { column =>
if (!fields.exists(x => x.column.equalsIgnoreCase(column))) {
val errorMsg = "sort_columns: " + column +
" does not exist in table. Please check the create table statement."
throw new MalformedCarbonCommandException(errorMsg)
} else {
val dataType = fields.find(x =>
x.column.equalsIgnoreCase(column)).get.dataType.get
if (isDataTypeSupportedForSortColumn(dataType)) {
val errorMsg = s"sort_columns is unsupported for $dataType datatype column: " + column
throw new MalformedCarbonCommandException(errorMsg)
}
if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
throw new MalformedCarbonCommandException(
s"sort_columns is unsupported for long string datatype column: $column")
}
}
}

CommonUtil.validateSortColumns(
sortKey,
fields.map { field => (field.column, field.dataType.get) },
varcharCols
)
sortKey.foreach { dimension =>
if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase)) {
fields.foreach { field =>
Expand Down

0 comments on commit 4d93a58

Please sign in to comment.