diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index c9efc340d24..608b5fbc0c5 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -478,6 +478,11 @@ private CarbonCommonConstants() { */ public static final String CACHE_LEVEL_DEFAULT_VALUE = "BLOCK"; + /** + * column level property: the measure is changed to the dimension + */ + public static final String COLUMN_DRIFT = "column_drift"; + ////////////////////////////////////////////////////////////////////////////////////////// // Data loading parameter start here ////////////////////////////////////////////////////////////////////////////////////////// diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java new file mode 100644 index 00000000000..c20d0d5d039 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.io.Serializable; + +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.scan.executor.util.RestructureUtil; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +/** + * the filter of DataMap + */ +public class DataMapFilter implements Serializable { + + private CarbonTable table; + + private Expression expression; + + private FilterResolverIntf resolver; + + public DataMapFilter(CarbonTable table, Expression expression) { + this.table = table; + this.expression = expression; + resolve(); + } + + public DataMapFilter(FilterResolverIntf resolver) { + this.resolver = resolver; + } + + private void resolve() { + if (expression != null) { + table.processFilterExpression(expression, null, null); + resolver = CarbonTable.resolveFilter(expression, table.getAbsoluteTableIdentifier()); + } + } + + public Expression getExpression() { + return expression; + } + + public void setExpression(Expression expression) { + this.expression = expression; + } + + public FilterResolverIntf getResolver() { + return resolver; + } + + public void setResolver(FilterResolverIntf resolver) { + this.resolver = resolver; + } + + public boolean isEmpty() { + return resolver == null; + } + + public boolean isResolvedOnSegment(SegmentProperties segmentProperties) { + if (expression == null || table == null) { + return true; + } + if (!table.isTransactionalTable()) { + return false; + } + if (table.hasColumnDrift() && RestructureUtil + .hasColumnDriftOnSegment(table, segmentProperties)) { + return false; + } + return true; + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index f9020bd2e1f..4375abbca1f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -47,7 +47,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.DataMapSchema; -import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.events.Event; @@ -100,38 +99,6 @@ public BlockletDetailsFetcher getBlockletDetailsFetcher() { return blockletDetailsFetcher; } - - /** - * Pass the valid segments and prune the datamap using filter expression - * - * @param segments - * @param filterExp - * @return - */ - public List prune(List segments, Expression filterExp, - List partitions) throws IOException { - List blocklets = new ArrayList<>(); - SegmentProperties segmentProperties; - Map> dataMaps = dataMapFactory.getDataMaps(segments); - for (Segment segment : segments) { - List pruneBlocklets = new ArrayList<>(); - // if filter is not passed then return all the blocklets - if (filterExp == null) { - pruneBlocklets = blockletDetailsFetcher.getAllBlocklets(segment, partitions); - } else { - segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment); - for (DataMap dataMap : dataMaps.get(segment)) { - pruneBlocklets.addAll(dataMap - .prune(filterExp, segmentProperties, partitions, table)); - } - } - blocklets.addAll(addSegmentId( - blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment), - segment)); - } - return blocklets; - } - public CarbonTable getTable() { return table; } @@ -140,10 +107,10 @@ public CarbonTable getTable() { * Pass the valid segments and prune the datamap using filter expression * * @param segments - * @param filterExp + * @param filter * @return */ - public List prune(List segments, final FilterResolverIntf filterExp, + public List prune(List segments, final DataMapFilter filter, final List partitions) throws IOException { final List blocklets = new ArrayList<>(); final Map> dataMaps = dataMapFactory.getDataMaps(segments); @@ -164,15 +131,15 @@ public List prune(List segments, final FilterResolver // As 0.1 million files block pruning can take only 1 second. // Doing multi-thread for smaller values is not recommended as // driver should have minimum threads opened to support multiple concurrent queries. - if (filterExp == null) { + if (filter.isEmpty()) { // if filter is not passed, then return all the blocklets. return pruneWithoutFilter(segments, partitions, blocklets); } - return pruneWithFilter(segments, filterExp, partitions, blocklets, dataMaps); + return pruneWithFilter(segments, filter, partitions, blocklets, dataMaps); } // handle by multi-thread - List extendedBlocklets = - pruneMultiThread(segments, filterExp, partitions, blocklets, dataMaps, totalFiles); + List extendedBlocklets = pruneMultiThread( + segments, filter, partitions, blocklets, dataMaps, totalFiles); return extendedBlocklets; } @@ -187,14 +154,22 @@ private List pruneWithoutFilter(List segments, return blocklets; } - private List pruneWithFilter(List segments, - FilterResolverIntf filterExp, List partitions, - List blocklets, Map> dataMaps) throws IOException { + private List pruneWithFilter(List segments, DataMapFilter filter, + List partitions, List blocklets, + Map> dataMaps) throws IOException { for (Segment segment : segments) { List pruneBlocklets = new ArrayList<>(); SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment); - for (DataMap dataMap : dataMaps.get(segment)) { - pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions)); + if (filter.isResolvedOnSegment(segmentProperties)) { + for (DataMap dataMap : dataMaps.get(segment)) { + pruneBlocklets.addAll( + dataMap.prune(filter.getResolver(), segmentProperties, partitions)); + } + } else { + for (DataMap dataMap : dataMaps.get(segment)) { + pruneBlocklets.addAll( + dataMap.prune(filter.getExpression(), segmentProperties, partitions, table)); + } } blocklets.addAll( addSegmentId(blockletDetailsFetcher.getExtendedBlocklets(pruneBlocklets, segment), @@ -204,7 +179,7 @@ private List pruneWithFilter(List segments, } private List pruneMultiThread(List segments, - final FilterResolverIntf filterExp, final List partitions, + final DataMapFilter filter, final List partitions, List blocklets, final Map> dataMaps, int totalFiles) { /* @@ -295,14 +270,24 @@ private List pruneMultiThread(List segments, SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0)); Segment segment = segmentDataMapGroup.getSegment(); - for (int i = segmentDataMapGroup.getFromIndex(); - i <= segmentDataMapGroup.getToIndex(); i++) { - List dmPruneBlocklets = dataMapList.get(i).prune(filterExp, - segmentProperties, - partitions); - pruneBlocklets.addAll(addSegmentId(blockletDetailsFetcher - .getExtendedBlocklets(dmPruneBlocklets, segment), - segment)); + if (filter.isResolvedOnSegment(segmentProperties)) { + for (int i = segmentDataMapGroup.getFromIndex(); + i <= segmentDataMapGroup.getToIndex(); i++) { + List dmPruneBlocklets = dataMapList.get(i).prune( + filter.getResolver(), segmentProperties, partitions); + pruneBlocklets.addAll(addSegmentId( + blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment), + segment)); + } + } else { + for (int i = segmentDataMapGroup.getFromIndex(); + i <= segmentDataMapGroup.getToIndex(); i++) { + List dmPruneBlocklets = dataMapList.get(i).prune( + filter.getExpression(), segmentProperties, partitions, table); + pruneBlocklets.addAll(addSegmentId( + blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment), + segment)); + } } synchronized (prunedBlockletMap) { List pruneBlockletsExisting = diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java index 4643b473957..bb2662b0a4c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/expr/DataMapExprWrapperImpl.java @@ -22,6 +22,7 @@ import java.util.UUID; import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapFilter; import org.apache.carbondata.core.datamap.DataMapLevel; import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; @@ -50,7 +51,7 @@ public DataMapExprWrapperImpl(TableDataMap dataMap, FilterResolverIntf expressio @Override public List prune(List segments, List partitionsToPrune) throws IOException { - return dataMap.prune(segments, expression, partitionsToPrune); + return dataMap.prune(segments, new DataMapFilter(expression), partitionsToPrune); } public List prune(DataMapDistributable distributable, diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 36231475baa..54ea7729937 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -120,6 +120,11 @@ public class CarbonTable implements Serializable { */ private List allMeasures; + /** + * list of column drift + */ + private List columnDrift; + /** * table bucket map. */ @@ -189,6 +194,7 @@ private CarbonTable() { this.tablePartitionMap = new HashMap<>(); this.createOrderColumn = new HashMap>(); this.tablePrimitiveDimensionsMap = new HashMap>(); + this.columnDrift = new ArrayList(); } /** @@ -898,6 +904,12 @@ private void fillVisibleDimensions(String tableName) { for (CarbonDimension dimension : allDimensions) { if (!dimension.isInvisible()) { visibleDimensions.add(dimension); + Map columnProperties = dimension.getColumnProperties(); + if (columnProperties != null) { + if (columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT) != null) { + columnDrift.add(dimension); + } + } } } tableDimensionsMap.put(tableName, visibleDimensions); @@ -912,6 +924,14 @@ public List getAllMeasures() { return allMeasures; } + public List getColumnDrift() { + return columnDrift; + } + + public boolean hasColumnDrift() { + return tableInfo.hasColumnDrift(); + } + /** * This method will all the visible allMeasures * diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java index daba29bd204..ec9d3113283 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java @@ -91,6 +91,8 @@ public class TableInfo implements Serializable, Writable { */ private boolean isTransactionalTable = true; + private boolean hasColumnDrift = false; + // this identifier is a lazy field which will be created when it is used first time private AbsoluteTableIdentifier identifier; @@ -122,6 +124,7 @@ public void setFactTable(TableSchema factTable) { this.factTable = factTable; updateParentRelationIdentifier(); updateIsSchemaModified(); + updateHasColumnDrift(); } private void updateIsSchemaModified() { @@ -276,6 +279,7 @@ public void write(DataOutput out) throws IOException { out.writeLong(lastUpdatedTime); out.writeUTF(getOrCreateAbsoluteTableIdentifier().getTablePath()); out.writeBoolean(isTransactionalTable); + out.writeBoolean(hasColumnDrift); boolean isChildSchemaExists = null != dataMapSchemaList && dataMapSchemaList.size() > 0; out.writeBoolean(isChildSchemaExists); @@ -305,6 +309,7 @@ public void write(DataOutput out) throws IOException { this.lastUpdatedTime = in.readLong(); this.tablePath = in.readUTF(); this.isTransactionalTable = in.readBoolean(); + this.hasColumnDrift = in.readBoolean(); boolean isChildSchemaExists = in.readBoolean(); this.dataMapSchemaList = new ArrayList<>(); if (isChildSchemaExists) { @@ -371,4 +376,22 @@ public boolean isSchemaModified() { return isSchemaModified; } + private void updateHasColumnDrift() { + this.hasColumnDrift = false; + for (ColumnSchema columnSchema : factTable.getListOfColumns()) { + if (columnSchema.isDimensionColumn() && !columnSchema.isInvisible()) { + Map columnProperties = columnSchema.getColumnProperties(); + if (columnProperties != null) { + if (columnProperties.get(CarbonCommonConstants.COLUMN_DRIFT) != null) { + this.hasColumnDrift = true; + break; + } + } + } + } + } + + public boolean hasColumnDrift() { + return hasColumnDrift; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index b15bdb59d17..f06f5c34f9f 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -46,7 +46,6 @@ import org.apache.carbondata.core.memory.UnsafeMemoryManager; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; -import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; @@ -139,20 +138,7 @@ protected void initQuery(QueryModel queryModel) throws IOException { queryStatistic .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, System.currentTimeMillis()); queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic); - // calculating the total number of aggregated columns - int measureCount = queryModel.getProjectionMeasures().size(); - - int currentIndex = 0; - DataType[] dataTypes = new DataType[measureCount]; - - for (ProjectionMeasure carbonMeasure : queryModel.getProjectionMeasures()) { - // adding the data type and aggregation type of all the measure this - // can be used - // to select the aggregator - dataTypes[currentIndex] = carbonMeasure.getMeasure().getDataType(); - currentIndex++; - } - queryProperties.measureDataTypes = dataTypes; + // as aggregation will be executed in following order // 1.aggregate dimension expression // 2. expression @@ -461,14 +447,15 @@ private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel, throws QueryExecutionException { BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo(); SegmentProperties segmentProperties = blockIndex.getSegmentProperties(); - List tableBlockDimensions = segmentProperties.getDimensions(); - + // set actual query dimensions and measures. It may differ in case of restructure scenarios + RestructureUtil.actualProjectionOfSegment(blockExecutionInfo, queryModel, segmentProperties); // below is to get only those dimension in query which is present in the // table block List projectDimensions = RestructureUtil .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo, - queryModel.getProjectionDimensions(), tableBlockDimensions, - segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(), + blockExecutionInfo.getActualQueryDimensions(), segmentProperties.getDimensions(), + segmentProperties.getComplexDimensions(), + blockExecutionInfo.getActualQueryMeasures().length, queryModel.getTable().getTableInfo().isTransactionalTable()); boolean isStandardTable = CarbonUtil.isStandardCarbonTable(queryModel.getTable()); String blockId = CarbonUtil @@ -486,10 +473,12 @@ private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel, blockExecutionInfo.setProjectionDimensions(projectDimensions .toArray(new ProjectionDimension[projectDimensions.size()])); // get measures present in the current block - List currentBlockQueryMeasures = - getCurrentBlockQueryMeasures(blockExecutionInfo, queryModel, blockIndex); + List projectionMeasures = RestructureUtil + .createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo, + blockExecutionInfo.getActualQueryMeasures(), segmentProperties.getMeasures(), + queryModel.getTable().getTableInfo().isTransactionalTable()); blockExecutionInfo.setProjectionMeasures( - currentBlockQueryMeasures.toArray(new ProjectionMeasure[currentBlockQueryMeasures.size()])); + projectionMeasures.toArray(new ProjectionMeasure[projectionMeasures.size()])); blockExecutionInfo.setDataBlock(blockIndex); // setting whether raw record query or not blockExecutionInfo.setRawRecordDetailQuery(queryModel.isForcedDetailRawQuery()); @@ -581,7 +570,7 @@ private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel, // list of measures to be projected List allProjectionListMeasureIndexes = new ArrayList<>(); int[] measureChunkIndexes = QueryUtil.getMeasureChunkIndexes( - currentBlockQueryMeasures, expressionMeasures, + projectionMeasures, expressionMeasures, segmentProperties.getMeasuresOrdinalToChunkMapping(), filterMeasures, allProjectionListMeasureIndexes); reusableBufferSize = Math.max(segmentProperties.getMeasuresOrdinalToChunkMapping().size(), @@ -637,11 +626,6 @@ private BlockExecutionInfo getBlockExecutionInfoForBlock(QueryModel queryModel, blockExecutionInfo.setComplexColumnParentBlockIndexes( getComplexDimensionParentBlockIndexes(projectDimensions)); blockExecutionInfo.setVectorBatchCollector(queryModel.isVectorReader()); - // set actual query dimensions and measures. It may differ in case of restructure scenarios - blockExecutionInfo.setActualQueryDimensions(queryModel.getProjectionDimensions() - .toArray(new ProjectionDimension[queryModel.getProjectionDimensions().size()])); - blockExecutionInfo.setActualQueryMeasures(queryModel.getProjectionMeasures() - .toArray(new ProjectionMeasure[queryModel.getProjectionMeasures().size()])); DataTypeUtil.setDataTypeConverter(queryModel.getConverter()); blockExecutionInfo.setRequiredRowId(queryModel.isRequiredRowId()); return blockExecutionInfo; @@ -691,28 +675,6 @@ private int getKeySize(List queryDimension, return 0; } - /** - * Below method will be used to get the measures present in the current block - * - * @param executionInfo - * @param queryModel query model - * @param tableBlock table block - * @return - */ - private List getCurrentBlockQueryMeasures(BlockExecutionInfo executionInfo, - QueryModel queryModel, AbstractIndex tableBlock) throws QueryExecutionException { - // getting the measure info which will be used while filling up measure data - List updatedQueryMeasures = RestructureUtil - .createMeasureInfoAndGetCurrentBlockQueryMeasures(executionInfo, - queryModel.getProjectionMeasures(), - tableBlock.getSegmentProperties().getMeasures(), - queryModel.getTable().getTableInfo().isTransactionalTable()); - // setting the measure aggregator for all aggregation function selected - // in query - executionInfo.getMeasureInfo().setMeasureDataTypes(queryProperties.measureDataTypes); - return updatedQueryMeasures; - } - private int[] getComplexDimensionParentBlockIndexes(List queryDimensions) { List parentBlockIndexList = new ArrayList(); for (ProjectionDimension queryDimension : queryDimensions) { diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java index 4b59aa7aaa9..22939e1e9d4 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/QueryExecutorProperties.java @@ -23,7 +23,6 @@ import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.datastore.block.AbstractIndex; -import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.stats.QueryStatisticsRecorder; @@ -39,10 +38,6 @@ public class QueryExecutorProperties { */ public Map columnToDictionaryMapping; - /** - * Measure datatypes - */ - public DataType[] measureDataTypes; /** * all the complex dimension which is on filter */ diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java index e823eb2299c..11b7372505c 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator; import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory; import org.apache.carbondata.core.metadata.datatype.DataType; @@ -38,6 +39,7 @@ import org.apache.carbondata.core.scan.executor.infos.MeasureInfo; import org.apache.carbondata.core.scan.model.ProjectionDimension; import org.apache.carbondata.core.scan.model.ProjectionMeasure; +import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; @@ -63,16 +65,16 @@ public class RestructureUtil { * @return list of query dimension which is present in the table block */ public static List createDimensionInfoAndGetCurrentBlockQueryDimension( - BlockExecutionInfo blockExecutionInfo, List queryDimensions, + BlockExecutionInfo blockExecutionInfo, ProjectionDimension[] queryDimensions, List tableBlockDimensions, List tableComplexDimension, int measureCount, boolean isTransactionalTable) { List presentDimension = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - boolean[] isDimensionExists = new boolean[queryDimensions.size()]; - Object[] defaultValues = new Object[queryDimensions.size()]; + boolean[] isDimensionExists = new boolean[queryDimensions.length]; + Object[] defaultValues = new Object[queryDimensions.length]; // create dimension information instance DimensionInfo dimensionInfo = new DimensionInfo(isDimensionExists, defaultValues); - dimensionInfo.dataType = new DataType[queryDimensions.size() + measureCount]; + dimensionInfo.dataType = new DataType[queryDimensions.length + measureCount]; int newDictionaryColumnCount = 0; int newNoDictionaryColumnCount = 0; // selecting only those dimension which is present in the query @@ -412,14 +414,15 @@ public static Object getMeasureDefaultValueByType(ColumnSchema columnSchema, * @return measures present in the block */ public static List createMeasureInfoAndGetCurrentBlockQueryMeasures( - BlockExecutionInfo blockExecutionInfo, List queryMeasures, + BlockExecutionInfo blockExecutionInfo, ProjectionMeasure[] queryMeasures, List currentBlockMeasures, boolean isTransactionalTable) { MeasureInfo measureInfo = new MeasureInfo(); - List presentMeasure = new ArrayList<>(queryMeasures.size()); - int numberOfMeasureInQuery = queryMeasures.size(); + List presentMeasure = new ArrayList<>(queryMeasures.length); + int numberOfMeasureInQuery = queryMeasures.length; List measureOrdinalList = new ArrayList<>(numberOfMeasureInQuery); Object[] defaultValues = new Object[numberOfMeasureInQuery]; boolean[] measureExistsInCurrentBlock = new boolean[numberOfMeasureInQuery]; + DataType[] measureDataTypes = new DataType[numberOfMeasureInQuery]; int index = 0; for (ProjectionMeasure queryMeasure : queryMeasures) { // if query measure exists in current dimension measures @@ -437,12 +440,14 @@ public static List createMeasureInfoAndGetCurrentBlockQueryMe presentMeasure.add(currentBlockMeasure); measureOrdinalList.add(carbonMeasure.getOrdinal()); measureExistsInCurrentBlock[index] = true; + measureDataTypes[index] = carbonMeasure.getDataType(); break; } } if (!measureExistsInCurrentBlock[index]) { defaultValues[index] = getMeasureDefaultValue(queryMeasure.getMeasure().getColumnSchema(), queryMeasure.getMeasure().getDefaultValue()); + measureDataTypes[index] = queryMeasure.getMeasure().getDataType(); blockExecutionInfo.setRestructuredBlock(true); } index++; @@ -452,7 +457,63 @@ public static List createMeasureInfoAndGetCurrentBlockQueryMe measureInfo.setDefaultValues(defaultValues); measureInfo.setMeasureOrdinals(measureOrdinals); measureInfo.setMeasureExists(measureExistsInCurrentBlock); + measureInfo.setMeasureDataTypes(measureDataTypes); blockExecutionInfo.setMeasureInfo(measureInfo); return presentMeasure; } + + /** + * set actual projection of blockExecutionInfo + */ + public static void actualProjectionOfSegment(BlockExecutionInfo blockExecutionInfo, + QueryModel queryModel, SegmentProperties segmentProperties) { + List projectionDimensions = queryModel.getProjectionDimensions(); + List projectionMeasures = queryModel.getProjectionMeasures(); + if (queryModel.getTable().hasColumnDrift()) { + List tableBlockMeasures = segmentProperties.getMeasures(); + List updatedProjectionMeasures = + new ArrayList<>(projectionMeasures.size() + tableBlockMeasures.size()); + updatedProjectionMeasures.addAll(projectionMeasures); + List updatedProjectionDimensions = + new ArrayList<>(projectionDimensions.size()); + for (ProjectionDimension projectionDimension : projectionDimensions) { + CarbonMeasure carbonMeasure = null; + for (CarbonMeasure tableBlockMeasure : tableBlockMeasures) { + if (isColumnMatches(queryModel.getTable().isTransactionalTable(), + projectionDimension.getDimension(), tableBlockMeasure)) { + carbonMeasure = tableBlockMeasure; + break; + } + } + if (carbonMeasure != null) { + ProjectionMeasure projectionMeasure = new ProjectionMeasure(carbonMeasure); + projectionMeasure.setOrdinal(projectionDimension.getOrdinal()); + updatedProjectionMeasures.add(projectionMeasure); + } else { + updatedProjectionDimensions.add(projectionDimension); + } + } + blockExecutionInfo.setActualQueryDimensions(updatedProjectionDimensions + .toArray(new ProjectionDimension[updatedProjectionDimensions.size()])); + blockExecutionInfo.setActualQueryMeasures(updatedProjectionMeasures + .toArray(new ProjectionMeasure[updatedProjectionMeasures.size()])); + } else { + blockExecutionInfo.setActualQueryDimensions( + projectionDimensions.toArray(new ProjectionDimension[projectionDimensions.size()])); + blockExecutionInfo.setActualQueryMeasures( + projectionMeasures.toArray(new ProjectionMeasure[projectionMeasures.size()])); + } + } + + public static boolean hasColumnDriftOnSegment(CarbonTable table, + SegmentProperties segmentProperties) { + for (CarbonDimension queryColumn : table.getColumnDrift()) { + for (CarbonMeasure tableColumn : segmentProperties.getMeasures()) { + if (isColumnMatches(table.isTransactionalTable(), queryColumn, tableColumn)) { + return true; + } + } + } + return false; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java index 4f934ce51b6..d736805099a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/model/QueryModelBuilder.java @@ -312,7 +312,7 @@ public QueryModel build() { queryModel.setReadPageByPage(readPageByPage); queryModel.setProjection(projection); - if (table.isTransactionalTable()) { + if (table.isTransactionalTable() && !table.hasColumnDrift()) { // set the filter to the query model in order to filter blocklet before scan boolean[] isFilterDimensions = new boolean[table.getDimensionOrdinalMax()]; boolean[] isFilterMeasures = new boolean[table.getAllMeasures().size()]; diff --git a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java index 7332614b9d0..80ec6470696 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/executor/util/RestructureUtilTest.java @@ -86,8 +86,8 @@ public class RestructureUtilTest { ProjectionMeasure queryMeasure2 = new ProjectionMeasure(new CarbonMeasure(columnSchema4, 4)); List queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2); - List queryDimensions = - Arrays.asList(queryDimension1, queryDimension2, queryDimension3); + ProjectionDimension[] queryDimensions = + new ProjectionDimension[] { queryDimension1, queryDimension2, queryDimension3 }; List result = null; result = RestructureUtil @@ -124,10 +124,11 @@ public class RestructureUtilTest { ProjectionMeasure queryMeasure1 = new ProjectionMeasure(carbonMeasure1); ProjectionMeasure queryMeasure2 = new ProjectionMeasure(carbonMeasure2); ProjectionMeasure queryMeasure3 = new ProjectionMeasure(carbonMeasure3); - List queryMeasures = Arrays.asList(queryMeasure1, queryMeasure2, queryMeasure3); + ProjectionMeasure[] queryMeasures = + new ProjectionMeasure[] { queryMeasure1, queryMeasure2, queryMeasure3 }; BlockExecutionInfo blockExecutionInfo = new BlockExecutionInfo(); - RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo, queryMeasures, - currentBlockMeasures, true); + RestructureUtil.createMeasureInfoAndGetCurrentBlockQueryMeasures(blockExecutionInfo, + queryMeasures, currentBlockMeasures, true); MeasureInfo measureInfo = blockExecutionInfo.getMeasureInfo(); boolean[] measuresExist = { true, true, false }; assertThat(measureInfo.getMeasureExists(), is(equalTo(measuresExist))); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index aba0ab74dba..90532fbb665 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -34,6 +34,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonCommonConstantsInternal; import org.apache.carbondata.core.datamap.DataMapChooser; +import org.apache.carbondata.core.datamap.DataMapFilter; import org.apache.carbondata.core.datamap.DataMapJob; import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datamap.DataMapUtil; @@ -54,7 +55,6 @@ import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.FilterUtil; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.model.QueryModelBuilder; import org.apache.carbondata.core.stats.QueryStatistic; @@ -468,15 +468,8 @@ private int getBlockCount(List blocklets) { private List getPrunedBlocklets(JobContext job, CarbonTable carbonTable, Expression expression, List segmentIds) throws IOException { ExplainCollector.addPruningInfo(carbonTable.getTableName()); - FilterResolverIntf resolver = null; - if (expression != null) { - carbonTable.processFilterExpression(expression, null, null); - resolver = CarbonTable.resolveFilter(expression, carbonTable.getAbsoluteTableIdentifier()); - ExplainCollector.setFilterStatement(expression.getStatement()); - } else { - ExplainCollector.setFilterStatement("none"); - } - + final DataMapFilter filter = new DataMapFilter(carbonTable, expression); + ExplainCollector.setFilterStatement(expression == null ? "none" : expression.getStatement()); boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP, CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT)); @@ -487,11 +480,7 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca List prunedBlocklets = null; // This is to log the event, so user will know what is happening by seeing logs. LOG.info("Started block pruning ..."); - if (carbonTable.isTransactionalTable()) { - prunedBlocklets = defaultDataMap.prune(segmentIds, resolver, partitionsToPrune); - } else { - prunedBlocklets = defaultDataMap.prune(segmentIds, expression, partitionsToPrune); - } + prunedBlocklets = defaultDataMap.prune(segmentIds, filter, partitionsToPrune); if (ExplainCollector.enabled()) { ExplainCollector.setDefaultDataMapPruningBlockHit(getBlockCount(prunedBlocklets)); @@ -504,15 +493,15 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca DataMapChooser chooser = new DataMapChooser(getOrCreateCarbonTable(job.getConfiguration())); // Get the available CG datamaps and prune further. - DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(resolver); + DataMapExprWrapper cgDataMapExprWrapper = chooser.chooseCGDataMap(filter.getResolver()); if (cgDataMapExprWrapper != null) { // Prune segments from already pruned blocklets pruneSegments(segmentIds, prunedBlocklets); List cgPrunedBlocklets; // Again prune with CG datamap. if (distributedCG && dataMapJob != null) { - cgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable, - resolver, segmentIds, cgDataMapExprWrapper, dataMapJob, partitionsToPrune); + cgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable, filter.getResolver(), + segmentIds, cgDataMapExprWrapper, dataMapJob, partitionsToPrune); } else { cgPrunedBlocklets = cgDataMapExprWrapper.prune(segmentIds, partitionsToPrune); } @@ -529,12 +518,12 @@ private List getPrunedBlocklets(JobContext job, CarbonTable ca } // Now try to prune with FG DataMap. if (isFgDataMapPruningEnable(job.getConfiguration()) && dataMapJob != null) { - DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(resolver); + DataMapExprWrapper fgDataMapExprWrapper = chooser.chooseFGDataMap(filter.getResolver()); if (fgDataMapExprWrapper != null) { // Prune segments from already pruned blocklets pruneSegments(segmentIds, prunedBlocklets); List fgPrunedBlocklets = DataMapUtil.executeDataMapJob(carbonTable, - resolver, segmentIds, fgDataMapExprWrapper, dataMapJob, partitionsToPrune); + filter.getResolver(), segmentIds, fgDataMapExprWrapper, dataMapJob, partitionsToPrune); // note that the 'fgPrunedBlocklets' has extra datamap related info compared with // 'prunedBlocklets', so the intersection should keep the elements in 'fgPrunedBlocklets' prunedBlocklets = intersectFilteredBlocklets(carbonTable, prunedBlocklets, diff --git a/integration/spark-common-test/src/test/resources/sort_columns/alldatatype1.csv b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype1.csv new file mode 100644 index 00000000000..1176363ac6a --- /dev/null +++ b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype1.csv @@ -0,0 +1,13 @@ +smallIntField,intField,bigIntField,floatField,doubleField,decimalField,timestampField,dateField,stringField,varcharField,charField,arrayField,structField +1,1,2,1.1,12.12,1.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1 +1,1,2,2.1,11.12,2.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1 +2,1,2,3.1,10.12,3.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde2,a$b$c$2,a$b$2 +2,1,2,4.1,9.12,4.123,2017-01-11 00:00:01,2017-01-11,abc2,abcd2,abcde2,a$b$c$2,a$b$2 +2,2,3,5.2,8.12,5.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$3 +2,2,3,6.2,7.12,6.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$3 +4,2,3,7.2,6.12,7.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde1,a$b$c$4,a$b$1 +4,2,3,8.1,5.12,8.123,2017-02-12 00:00:02,2017-02-12,abc4,abcd1,abcde1,a$b$c$4,a$b$1 +4,4,1,9.1,4.12,9.123,2017-03-13 00:00:04,2017-03-14,abc4,abcd4,abcde4,a$b$c$4,a$b$2 +4,4,1,10.1,3.12,10.123,2017-03-13 00:00:04,2017-03-14,abc4,abcd4,abcde4,a$b$c$4,a$b$2 +1,4,1,11.1,2.12,11.123,2017-03-13 00:00:04,2017-03-14,abc4,abcd4,abcde4,a$b$c$1,a$b$3 +1,4,1,12.1,1.12,12.123,2017-03-13 00:00:04,2017-03-14,abc1,abcd4,abcde4,a$b$c$1,a$b$3 \ No newline at end of file diff --git a/integration/spark-common-test/src/test/resources/sort_columns/alldatatype2.csv b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype2.csv new file mode 100644 index 00000000000..649bbdce51c --- /dev/null +++ b/integration/spark-common-test/src/test/resources/sort_columns/alldatatype2.csv @@ -0,0 +1,13 @@ +smallIntField,intField,bigIntField,floatField,doubleField,decimalField,timestampField,dateField,stringField,varcharField,charField,arrayField,structField +1,1,1,13.2,6.12,7.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde1,a$b$c$1,a$b$1 +1,1,1,14.1,5.12,8.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde1,a$b$c$1,a$b$1 +1,1,2,15.1,4.12,9.123,2017-03-11 00:00:03,2017-03-11,abc2,abcd1,abcde1,a$b$c$2,a$b$2 +1,2,2,16.1,3.12,10.123,2017-03-11 00:00:03,2017-03-11,abc1,abcd1,abcde2,a$b$c$2,a$b$2 +1,2,2,17.1,2.12,11.123,2017-03-12 00:00:03,2017-03-12,abc1,abcd2,abcde2,a$b$c$1,a$b$1 +1,2,1,18.1,1.12,12.123,2017-03-12 00:00:03,2017-03-12,abc1,abcd2,abcde1,a$b$c$1,a$b$1 +2,2,1,19.1,12.12,1.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1 +2,2,1,20.1,11.12,2.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde1,a$b$c$1,a$b$1 +2,2,2,21.1,10.12,3.123,2017-01-11 00:00:01,2017-01-11,abc1,abcd2,abcde2,a$b$c$2,a$b$2 +2,1,2,22.1,9.12,4.123,2017-01-11 00:00:01,2017-01-11,abc2,abcd2,abcde2,a$b$c$2,a$b$2 +2,1,2,23.2,8.12,5.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$2 +2,1,1,24.2,7.12,6.123,2017-02-12 00:00:02,2017-02-12,abc2,abcd1,abcde2,a$b$c$2,a$b$2 \ No newline at end of file diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala new file mode 100644 index 00000000000..bf4bae6cda0 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala @@ -0,0 +1,541 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.alterTable + +import org.apache.spark.sql.Row +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + "yyyy-MM-dd") + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + "yyyy-MM-dd HH:mm:ss") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true") + dropTable() + prepareTable() + } + + override def afterAll(): Unit = { + dropTable() + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, + CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT) + } + + private def prepareTable(): Unit = { + createTable( + "alter_sc_base", + Map("sort_scope"->"local_sort", "sort_columns"->"stringField") + ) + createTable( + "alter_sc_base_complex", + Map("sort_scope"->"local_sort", "sort_columns"->"stringField"), + true + ) + createTable( + "alter_sc_validate", + Map("dictionary_include"->"charField"), + true + ) + createTable( + "alter_sc_iud", + Map("dictionary_include"->"charField") + ) + createTable( + "alter_sc_iud_complex", + Map("dictionary_include"->"charField"), + true + ) + createTable( + "alter_sc_long_string", + Map("LONG_STRING_COLUMNS"->"stringField"), + true + ) + createTable( + "alter_sc_insert", + Map("sort_scope"->"local_sort", "sort_columns"->"stringField") + ) + loadData("alter_sc_insert") + createTable( + "alter_sc_insert_complex", + Map("sort_scope"->"local_sort", "sort_columns"->"stringField"), + true + ) + loadData("alter_sc_insert_complex") + createTable( + "alter_sc_range_column", + Map("sort_scope"->"local_sort", "sort_columns"->"stringField", "range_column"->"smallIntField") + ) + createTable( + "alter_sc_range_column_base", + Map("sort_scope"->"local_sort", "sort_columns"->"stringField") + ) + + Array("alter_sc_add_column", "alter_sc_add_column_base").foreach { tableName => + sql( + s"""create table $tableName( + | smallIntField smallInt, + | intField int, + | bigIntField bigint, + | floatField float, + | doubleField double, + | timestampField timestamp, + | dateField date, + | stringField string + | ) + | stored as carbondata + """.stripMargin) + } + // decimalField decimal(25, 4), + + createTable( + "alter_sc_bloom", + Map("sort_scope"->"local_sort", "sort_columns"->"stringField") + ) + createBloomDataMap("alter_sc_bloom", "alter_sc_bloom_dm1") + createTable( + "alter_sc_bloom_base", + Map("sort_scope"->"local_sort", "sort_columns"->"stringField") + ) + createBloomDataMap("alter_sc_bloom_base", "alter_sc_bloom_base_dm1") + createTable( + "alter_sc_agg", + Map("sort_scope"->"local_sort", "sort_columns"->"intField") + ) + createAggDataMap("alter_sc_agg", "alter_sc_agg_dm1") + createTable( + "alter_sc_agg_base", + Map("sort_scope"->"local_sort", "sort_columns"->"intField") + ) + createAggDataMap("alter_sc_agg_base", "alter_sc_agg_base_dm1") + } + + private def dropTable(): Unit = { + sql(s"drop table if exists alter_sc_base") + sql(s"drop table if exists alter_sc_base_complex") + sql(s"drop table if exists alter_sc_validate") + sql(s"drop table if exists alter_sc_iud") + sql(s"drop table if exists alter_sc_iud_complex") + sql(s"drop table if exists alter_sc_long_string") + sql(s"drop table if exists alter_sc_insert") + sql(s"drop table if exists alter_sc_insert_complex") + sql(s"drop table if exists alter_sc_range_column") + sql(s"drop table if exists alter_sc_range_column_base") + sql(s"drop table if exists alter_sc_add_column") + sql(s"drop table if exists alter_sc_add_column_base") + sql(s"drop table if exists alter_sc_bloom") + sql(s"drop table if exists alter_sc_bloom_base") + sql(s"drop table if exists alter_sc_agg") + sql(s"drop table if exists alter_sc_agg_base") + } + + private def createTable( + tableName: String, + tblProperties: Map[String, String] = Map.empty, + withComplex: Boolean = false + ): Unit = { + val complexSql = + if (withComplex) { + ", arrayField array, structField struct" + } else { + "" + } + val tblPropertiesSql = + if (tblProperties.isEmpty) { + "" + } else { + val propertiesString = + tblProperties + .map { entry => + s"'${ entry._1 }'='${ entry._2 }'" + } + .mkString(",") + s"tblproperties($propertiesString)" + } + + sql( + s"""create table $tableName( + | smallIntField smallInt, + | intField int, + | bigIntField bigint, + | floatField float, + | doubleField double, + | timestampField timestamp, + | dateField date, + | stringField string, + | varcharField varchar(10), + | charField char(10) + | $complexSql + | ) + | stored as carbondata + | $tblPropertiesSql + """.stripMargin) + // decimalField decimal(25, 4), + } + + private def createBloomDataMap(tableName: String, dataMapName: String): Unit = { + sql( + s""" + | CREATE DATAMAP $dataMapName ON TABLE $tableName + | USING 'bloomfilter' + | DMPROPERTIES( + | 'INDEX_COLUMNS'='smallIntField,floatField,timestampField,dateField,stringField', + | 'BLOOM_SIZE'='6400', + | 'BLOOM_FPP'='0.001', + | 'BLOOM_COMPRESS'='TRUE') + """.stripMargin) + } + + private def createAggDataMap(tableName: String, dataMapName: String): Unit = { + sql(s"create datamap PreAggSum$dataMapName on table $tableName using 'preaggregate' as " + + s"select stringField,sum(intField) as sum from $tableName group by stringField") + sql(s"create datamap PreAggAvg$dataMapName on table $tableName using 'preaggregate' as " + + s"select stringField,avg(intField) as avg from $tableName group by stringField") + sql(s"create datamap PreAggCount$dataMapName on table $tableName using 'preaggregate' as " + + s"select stringField,count(intField) as count from $tableName group by stringField") + sql(s"create datamap PreAggMin$dataMapName on table $tableName using 'preaggregate' as " + + s"select stringField,min(intField) as min from $tableName group by stringField") + sql(s"create datamap PreAggMax$dataMapName on table $tableName using 'preaggregate' as " + + s"select stringField,max(intField) as max from $tableName group by stringField") + } + + private def loadData(tableNames: String*): Unit = { + tableNames.foreach { tableName => + sql( + s"""load data local inpath '$resourcesPath/sort_columns' + | into table $tableName + | options ('global_sort_partitions'='2', 'COMPLEX_DELIMITER_LEVEL_1'='$$', 'COMPLEX_DELIMITER_LEVEL_2'=':') + """.stripMargin) + } + } + + private def insertData(insertTable: String, tableNames: String*): Unit = { + tableNames.foreach { tableName => + sql( + s"""insert into table $tableName select * from $insertTable + """.stripMargin) + } + } + + test("validate sort_scope and sort_columns") { + // invalid combination + var ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort')") + } + assert(ex.getMessage.contains("Cannot set SORT_SCOPE as local_sort when table has no SORT_COLUMNS")) + + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort')") + } + assert(ex.getMessage.contains("Cannot set SORT_SCOPE as global_sort when table has no SORT_COLUMNS")) + + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort', 'sort_columns'='')") + } + assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when setting SORT_SCOPE as local_sort")) + + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort', 'sort_columns'=' ')") + } + assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when setting SORT_SCOPE as global_sort")) + + sql("alter table alter_sc_validate set tblproperties('sort_columns'='stringField', 'sort_scope'='local_sort')") + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_columns'=' ')") + } + assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when SORT_SCOPE is LOCAL_SORT")) + + sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort')") + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_columns'='')") + } + assert(ex.getMessage.contains("Cannot set SORT_COLUMNS as empty when SORT_SCOPE is GLOBAL_SORT")) + + // wrong/duplicate sort_columns + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_columns'=' stringField1 , intField')") + } + assert(ex.getMessage.contains("stringField1 does not exist in table")) + + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_columns'=' stringField1 , intField, stringField1')") + } + assert(ex.getMessage.contains("SORT_COLUMNS Either having duplicate columns : stringField1 or it contains illegal argumnet")) + + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_columns'=' stringField , intField, stringField')") + } + assert(ex.getMessage.contains("SORT_COLUMNS Either having duplicate columns : stringField or it contains illegal argumnet")) + + // not supported data type +// ex = intercept[RuntimeException] { +// sql("alter table alter_sc_validate set tblproperties('sort_columns'='decimalField')") +// } +// assert(ex.getMessage.contains("sort_columns is unsupported for DECIMAL data type column: decimalField")) + + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_columns'='doubleField')") + } + assert(ex.getMessage.contains("sort_columns is unsupported for DOUBLE datatype column: doubleField")) + + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_columns'='arrayField')") + } + assert(ex.getMessage.contains("sort_columns is unsupported for ARRAY datatype column: arrayField")) + + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_columns'='structField')") + } + assert(ex.getMessage.contains("sort_columns is unsupported for STRUCT datatype column: structField")) + + ex = intercept[RuntimeException] { + sql("alter table alter_sc_validate set tblproperties('sort_columns'='structField.col1')") + } + assert(ex.getMessage.contains("sort_columns: structField.col1 does not exist in table")) + } + + test("long string column") { + val ex = intercept[RuntimeException] { + sql("alter table alter_sc_long_string set tblproperties('sort_columns'='intField, stringField')") + } + assert(ex.getMessage.contains("sort_columns is unsupported for long string datatype column: stringField")) + } + + test("describe formatted") { + // valid combination + sql("alter table alter_sc_validate set tblproperties('sort_scope'='no_sort', 'sort_columns'='')") + checkExistence(sql("describe formatted alter_sc_validate"), true, "NO_SORT") + + sql("alter table alter_sc_validate set tblproperties('sort_scope'='no_sort', 'sort_columns'='bigIntField,stringField')") + checkExistence(sql("describe formatted alter_sc_validate"), true, "no_sort", "bigIntField, stringField".toLowerCase()) + + sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort', 'sort_columns'='stringField,bigIntField')") + checkExistence(sql("describe formatted alter_sc_validate"), true, "local_sort", "stringField, bigIntField".toLowerCase()) + + // global dictionary or direct dictionary + sql("alter table alter_sc_validate set tblproperties('sort_scope'='global_sort', 'sort_columns'=' charField , bigIntField , timestampField ')") + checkExistence(sql("describe formatted alter_sc_validate"), true, "global_sort", "charField, bigIntField, timestampField".toLowerCase()) + + // supported data type + sql("alter table alter_sc_validate set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, intField, bigIntField, timestampField, dateField, stringField, varcharField, charField')") + checkExistence(sql("describe formatted alter_sc_validate"), true, "local_sort", "smallIntField, intField, bigIntField, timestampField, dateField, stringField, varcharField, charField".toLowerCase()) + } + + test("IUD and Query") { + testIUDAndQuery("alter_sc_iud", "alter_sc_base", "alter_sc_insert") + } + + test("IUD and Query with complex data type") { + testIUDAndQuery("alter_sc_iud_complex", "alter_sc_base_complex", "alter_sc_insert_complex") + } + + private def testIUDAndQuery(tableName: String, baseTableName: String, insertTableName: String): Unit = { + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + // alter table to local_sort with new SORT_COLUMNS + sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='timestampField, intField, stringField')") + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + // alter table to revert SORT_COLUMNS + sql(s"alter table $tableName set tblproperties('sort_columns'='stringField, intField, timestampField')") + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + // alter table to change SORT_COLUMNS + sql(s"alter table $tableName set tblproperties('sort_columns'='smallIntField, stringField, intField')") + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + // alter table to change SORT_SCOPE and SORT_COLUMNS + sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='charField, bigIntField, smallIntField')") + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + // alter table to change SORT_SCOPE + sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField, bigIntField, smallIntField')") + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + // query + checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField")) + + // set input segments + (0 to 5).foreach { segment => + sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false) + sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) + } + sql(s"set carbon.input.segments.default.$tableName=*").show(100, false) + sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false) + + // delete + sql(s"delete from $tableName where smallIntField = 2") + sql(s"delete from $baseTableName where smallIntField = 2") + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + sql(s"delete from $tableName") + checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(0))) + sql(s"delete from $baseTableName") + checkAnswer(sql(s"select count(*) from $baseTableName"), Seq(Row(0))) + + // insert & load data + sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='timestampField')") + insertData(insertTableName, tableName, baseTableName) + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + sql(s"alter table $tableName set tblproperties('sort_scope'='no_sort', 'sort_columns'='')") + insertData(insertTableName, tableName, baseTableName) + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='charField, bigIntField, smallIntField')") + insertData(insertTableName, tableName, baseTableName) + loadData(tableName, baseTableName) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + // update + sql(s"update $tableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").show() + sql(s"update $baseTableName set (smallIntField, intField, bigIntField, floatField, doubleField) = (smallIntField + 3, intField + 3, bigIntField + 3, floatField + 3, doubleField + 3) where smallIntField = 2").show() + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + + // query + checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField")) + + // set input segments + (6 to 11).foreach { segment => + sql(s"set carbon.input.segments.default.$tableName=$segment").show(100, false) + sql(s"set carbon.input.segments.default.$baseTableName=$segment").show(100, false) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) + } + sql(s"set carbon.input.segments.default.$tableName=*").show(100, false) + sql(s"set carbon.input.segments.default.$baseTableName=*").show(100, false) + + // compaction + sql(s"show segments for table $tableName").show(100, false) + sql(s"show segments for table $baseTableName").show(100, false) + sql(s"alter table $tableName compact 'minor'") + sql(s"alter table $baseTableName compact 'minor'") + sql(s"show segments for table $tableName").show(100, false) + sql(s"show segments for table $baseTableName").show(100, false) + checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where intField >= 2 order by floatField"), sql(s"select * from $baseTableName where intField >= 2 order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 or intField >= 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 or intField >= 2 order by floatField")) + } + + test("range column") { + val tableName = "alter_sc_range_column" + val baseTableName = "alter_sc_range_column_base" + loadData(tableName, baseTableName) + sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, charField')") + loadData(tableName, baseTableName) + + checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName order by floatField"), sql(s"select * from $baseTableName order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField")) + } + + test("add/drop column for sort_columns") { + val tableName = "alter_sc_add_column" + val baseTableName = "alter_sc_add_column_base" + loadData(tableName, baseTableName) + sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, stringField')") + loadData(tableName, baseTableName) + // add column + sql(s"alter table $tableName add columns( varcharField varchar(10), charField char(10))") + sql(s"alter table $baseTableName add columns( varcharField varchar(10), charField char(10))") + loadData(tableName, baseTableName) + + checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName order by floatField, charField"), sql(s"select * from $baseTableName order by floatField, charField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField, charField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField, charField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is null order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is not null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is not null order by floatField")) + + // add new column to sort_columns + sql(s"alter table $tableName set tblproperties('sort_scope'='local_sort', 'sort_columns'='smallIntField, charField')") + loadData(tableName, baseTableName) + checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName order by floatField, charField"), sql(s"select * from $baseTableName order by floatField, charField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField, charField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField, charField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is null order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is not null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is not null order by floatField")) + + // drop column of old sort_columns + sql(s"alter table $tableName drop columns(stringField)") + sql(s"alter table $baseTableName drop columns(stringField)") + loadData(tableName, baseTableName) + checkAnswer(sql(s"select count(*) from $tableName"), sql(s"select count(*) from $baseTableName")) + checkAnswer(sql(s"select * from $tableName order by floatField, charField"), sql(s"select * from $baseTableName order by floatField, charField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 order by floatField, charField"), sql(s"select * from $baseTableName where smallIntField = 2 order by floatField, charField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is null order by floatField")) + checkAnswer(sql(s"select * from $tableName where smallIntField = 2 and charField is not null order by floatField"), sql(s"select * from $baseTableName where smallIntField = 2 and charField is not null order by floatField")) + } + + test("bloom filter") { + val tableName = "alter_sc_bloom" + val dataMapName = "alter_sc_bloom_dm1" + val baseTableName = "alter_sc_bloom_base" + loadData(tableName, baseTableName) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), true, "bloomfilter", dataMapName) + checkExistence(sql(s"EXPLAIN SELECT * FROM $tableName WHERE smallIntField = 3"), true, "bloomfilter", dataMapName) + checkAnswer(sql(s"select * from $tableName where smallIntField = 3 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 3 order by floatField")) + + sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='smallIntField, charField')") + loadData(tableName, baseTableName) + checkExistence(sql(s"EXPLAIN SELECT * FROM $tableName WHERE smallIntField = 3"), true, "bloomfilter", dataMapName) + checkAnswer(sql(s"select * from $tableName where smallIntField = 3 order by floatField"), sql(s"select * from $baseTableName where smallIntField = 3 order by floatField")) + } + + test("pre-aggregate") { + val tableName = "alter_sc_agg" + val dataMapName = "alter_sc_agg_dm1" + val baseTableName = "alter_sc_agg_base" + loadData(tableName, baseTableName) + sql(s"SHOW DATAMAP ON TABLE $tableName").show(100, false) + checkExistence(sql(s"SHOW DATAMAP ON TABLE $tableName"), true, "preaggregate", dataMapName) + checkExistence(sql(s"EXPLAIN select stringField,sum(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName) + checkAnswer(sql(s"select stringField,sum(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), sql(s"select stringField,sum(intField) as sum from $baseTableName where stringField = 'abc2' group by stringField")) + + sql(s"alter table $tableName set tblproperties('sort_scope'='global_sort', 'sort_columns'='smallIntField, charField')") + loadData(tableName, baseTableName) + sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField").show(100, false) + checkExistence(sql(s"EXPLAIN select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), true, "preaggregate", dataMapName) + checkAnswer(sql(s"select stringField,max(intField) as sum from $tableName where stringField = 'abc2' group by stringField"), sql(s"select stringField,max(intField) as sum from $baseTableName where stringField = 'abc2' group by stringField")) + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 6cee8dc74eb..d0ed815ddb0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -232,7 +232,11 @@ class CarbonScanRDD[T: ClassTag]( statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis) statisticRecorder.recordStatisticsForDriver(statistic, queryId) statistic = new QueryStatistic() - val carbonDistribution = if (directFill) { + // When the table has column drift, it means different blocks maybe have different schemas. + // the query doesn't support to scan the blocks with different schemas in a task. + // So if the table has the column drift, CARBON_TASK_DISTRIBUTION_MERGE_FILES and + // CARBON_TASK_DISTRIBUTION_CUSTOM can't work. + val carbonDistribution = if (directFill && !tableInfo.hasColumnDrift) { CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES } else { CarbonProperties.getInstance().getProperty( @@ -260,7 +264,7 @@ class CarbonScanRDD[T: ClassTag]( CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, "false").toBoolean || carbonDistribution.equalsIgnoreCase(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM) - if (useCustomDistribution) { + if (useCustomDistribution && !tableInfo.hasColumnDrift) { // create a list of block based on split val blockList = splits.asScala.map(_.asInstanceOf[Distributable]) @@ -297,7 +301,7 @@ class CarbonScanRDD[T: ClassTag]( val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) result.add(partition) } - } else if (carbonDistribution.equalsIgnoreCase( + } else if (!tableInfo.hasColumnDrift && carbonDistribution.equalsIgnoreCase( CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES)) { // sort blocks in reverse order of length diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index d90c6b2b67e..2e8fa387f1e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -17,7 +17,6 @@ package org.apache.carbondata.spark.util - import java.io.File import java.math.BigDecimal import java.text.SimpleDateFormat @@ -47,16 +46,16 @@ import org.apache.carbondata.core.metadata.CarbonMetadata import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes} import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.partition.PartitionUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, ThreadLocalTaskInfo} +import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil, ThreadLocalTaskInfo} import org.apache.carbondata.core.util.comparator.Comparator import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.util.CarbonDataProcessorUtil - object CommonUtil { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -794,6 +793,7 @@ object CommonUtil { } storeLocation } + /** * This method will validate the cache level * @@ -909,6 +909,80 @@ object CommonUtil { } } + def isDataTypeSupportedForSortColumn(columnDataType: String): Boolean = { + val dataTypes = Array("array", "struct", "map", "double", "float", "decimal") + dataTypes.exists(x => x.equalsIgnoreCase(columnDataType)) + } + + def validateSortScope(newProperties: Map[String, String]): Unit = { + val sortScopeOption = newProperties.get(CarbonCommonConstants.SORT_SCOPE) + if (sortScopeOption.isDefined) { + if (!CarbonUtil.isValidSortOption(sortScopeOption.get)) { + throw new MalformedCarbonCommandException( + s"Invalid SORT_SCOPE ${ sortScopeOption.get }, " + + s"valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', 'LOCAL_SORT' and 'GLOBAL_SORT'") + } + } + } + + def validateSortColumns( + sortKey: Array[String], + fields: Seq[(String, String)], + varcharCols: Seq[String] + ): Unit = { + if (sortKey.diff(sortKey.distinct).length > 0 || + (sortKey.length > 1 && sortKey.contains(""))) { + throw new MalformedCarbonCommandException( + "SORT_COLUMNS Either having duplicate columns : " + + sortKey.diff(sortKey.distinct).mkString(",") + " or it contains illegal argumnet.") + } + + sortKey.foreach { column => + if (!fields.exists(x => x._1.equalsIgnoreCase(column))) { + val errorMsg = "sort_columns: " + column + + " does not exist in table. Please check the create table statement." + throw new MalformedCarbonCommandException(errorMsg) + } else { + val dataType = fields.find(x => + x._1.equalsIgnoreCase(column)).get._2 + if (isDataTypeSupportedForSortColumn(dataType)) { + val errorMsg = s"sort_columns is unsupported for $dataType datatype column: " + column + throw new MalformedCarbonCommandException(errorMsg) + } + if (varcharCols.exists(x => x.equalsIgnoreCase(column))) { + throw new MalformedCarbonCommandException( + s"sort_columns is unsupported for long string datatype column: $column") + } + } + } + } + + def validateSortColumns(carbonTable: CarbonTable, newProperties: Map[String, String]): Unit = { + val fields = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala + val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties + var sortKeyOption = newProperties.get(CarbonCommonConstants.SORT_COLUMNS) + val varcharColsString = tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS) + val varcharCols: Seq[String] = if (varcharColsString == null) { + Seq.empty[String] + } else { + varcharColsString.split(",").map(_.trim) + } + + if (!sortKeyOption.isDefined) { + // default no columns are selected for sorting in no_sort scope + sortKeyOption = Some("") + } + val sortKeyString = CarbonUtil.unquoteChar(sortKeyOption.get).trim + if (!sortKeyString.isEmpty) { + val sortKey = sortKeyString.split(',').map(_.trim) + validateSortColumns( + sortKey, + fields.map { field => (field.getColName, field.getDataType.getName) }, + varcharCols + ) + } + } + def bytesToDisplaySize(size: Long): String = bytesToDisplaySize(BigDecimal.valueOf(size)) // This method converts the bytes count to display size upto 2 decimal places diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 3e80ea64420..d978128588f 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -760,32 +760,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { var sortKeyDimsTmp: Seq[String] = Seq[String]() if (!sortKeyString.isEmpty) { val sortKey = sortKeyString.split(',').map(_.trim) - if (sortKey.diff(sortKey.distinct).length > 0 || - (sortKey.length > 1 && sortKey.contains(""))) { - throw new MalformedCarbonCommandException( - "SORT_COLUMNS Either having duplicate columns : " + - sortKey.diff(sortKey.distinct).mkString(",") + " or it contains illegal argumnet.") - } - - sortKey.foreach { column => - if (!fields.exists(x => x.column.equalsIgnoreCase(column))) { - val errorMsg = "sort_columns: " + column + - " does not exist in table. Please check the create table statement." - throw new MalformedCarbonCommandException(errorMsg) - } else { - val dataType = fields.find(x => - x.column.equalsIgnoreCase(column)).get.dataType.get - if (isDataTypeSupportedForSortColumn(dataType)) { - val errorMsg = s"sort_columns is unsupported for $dataType datatype column: " + column - throw new MalformedCarbonCommandException(errorMsg) - } - if (varcharCols.exists(x => x.equalsIgnoreCase(column))) { - throw new MalformedCarbonCommandException( - s"sort_columns is unsupported for long string datatype column: $column") - } - } - } - + CommonUtil.validateSortColumns( + sortKey, + fields.map { field => (field.column, field.dataType.get) }, + varcharCols + ) sortKey.foreach { dimension => if (!sortKeyDimsTmp.exists(dimension.equalsIgnoreCase)) { fields.foreach { field => diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 1dc562dc418..99bc8639cde 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ListBuffer +import org.apache.commons.lang3.StringUtils import org.apache.spark.SparkConf import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -32,13 +33,14 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.constants.SortScopeOptions.SortScope import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.exception.InvalidConfigurationException import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.converter.{SchemaConverter, ThriftWrapperSchemaConverterImpl} import org.apache.carbondata.core.metadata.datatype.DataTypes import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema @@ -100,6 +102,76 @@ object AlterTableUtil { } } + /** + * update schema when SORT_COLUMNS are be changed + */ + private def updateSchemaForSortColumns( + thriftTable: TableInfo, + lowerCasePropertiesMap: mutable.Map[String, String], + schemaConverter: SchemaConverter + ) = { + val sortColumnsOption = lowerCasePropertiesMap.get(CarbonCommonConstants.SORT_COLUMNS) + if (sortColumnsOption.isDefined) { + val sortColumnsString = CarbonUtil.unquoteChar(sortColumnsOption.get).trim + val columns = thriftTable.getFact_table.getTable_columns + // remove old sort_columns property from ColumnSchema + val columnSeq = + columns + .asScala + .map { column => + val columnProperties = column.getColumnProperties + if (columnProperties != null) { + columnProperties.remove(CarbonCommonConstants.SORT_COLUMNS) + } + column + } + .zipWithIndex + if (!sortColumnsString.isEmpty) { + val newSortColumns = sortColumnsString.split(',').map(_.trim) + // map sort_columns index in column list + val sortColumnsIndexMap = newSortColumns + .zipWithIndex + .map { entry => + val column = columnSeq.find(_._1.getColumn_name.equalsIgnoreCase(entry._1)).get + var columnProperties = column._1.getColumnProperties + if (columnProperties == null) { + columnProperties = new util.HashMap[String, String]() + column._1.setColumnProperties(columnProperties) + } + // change sort_columns to dimension + if (!column._1.isDimension) { + column._1.setDimension(true) + columnProperties.put(CarbonCommonConstants.COLUMN_DRIFT, "true") + } + // add sort_columns property + columnProperties.put(CarbonCommonConstants.SORT_COLUMNS, "true") + (column._2, entry._2) + } + .toMap + var index = newSortColumns.length + // re-order all columns, move sort_columns to the head of column list + val newColumns = columnSeq + .map { entry => + val sortColumnIndexOption = sortColumnsIndexMap.get(entry._2) + val newIndex = if (sortColumnIndexOption.isDefined) { + sortColumnIndexOption.get + } else { + val tempIndex = index + index += 1 + tempIndex + } + (newIndex, entry._1) + } + .sortWith(_._1 < _._1) + .map(_._2) + .asJava + // use new columns + columns.clear() + columns.addAll(newColumns) + } + } + } + /** * @param carbonTable * @param schemaEvolutionEntry @@ -361,9 +433,10 @@ object AlterTableUtil { // validate the range column properties validateRangeColumnProperties(carbonTable, lowerCasePropertiesMap) - // validate the Sort Scope - validateSortScopeProperty(carbonTable, lowerCasePropertiesMap) - + // validate the Sort Scope and Sort Columns + validateSortScopeAndSortColumnsProperties(carbonTable, lowerCasePropertiesMap) + // if SORT_COLUMN is changed, it will move them to the head of column list + updateSchemaForSortColumns(thriftTable, lowerCasePropertiesMap, schemaConverter) // below map will be used for cache invalidation. As tblProperties map is getting modified // in the next few steps the original map need to be retained for any decision making val existingTablePropertiesMap = mutable.Map(tblPropertiesMap.toSeq: _*) @@ -394,9 +467,13 @@ object AlterTableUtil { if (propKey.equalsIgnoreCase(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)) { tblPropertiesMap .put(propKey.toLowerCase, CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT) - } else if (propKey.equalsIgnoreCase("sort_scope")) { + } else if (propKey.equalsIgnoreCase(CarbonCommonConstants.SORT_SCOPE)) { tblPropertiesMap .put(propKey.toLowerCase, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT) + } else if (propKey.equalsIgnoreCase(CarbonCommonConstants.SORT_COLUMNS)) { + val errorMessage = "Error: Invalid option(s): " + propKey + + ", please set SORT_COLUMNS as empty instead of unset" + throw new MalformedCarbonCommandException(errorMessage) } else { tblPropertiesMap.remove(propKey.toLowerCase) } @@ -440,7 +517,8 @@ object AlterTableUtil { "LOCAL_DICTIONARY_EXCLUDE", "LOAD_MIN_SIZE_INMB", "RANGE_COLUMN", - "SORT_SCOPE") + "SORT_SCOPE", + "SORT_COLUMNS") supportedOptions.contains(propKey.toUpperCase) } @@ -542,18 +620,34 @@ object AlterTableUtil { } } - def validateSortScopeProperty(carbonTable: CarbonTable, + def validateSortScopeAndSortColumnsProperties(carbonTable: CarbonTable, propertiesMap: mutable.Map[String, String]): Unit = { - propertiesMap.foreach { property => - if (property._1.equalsIgnoreCase("SORT_SCOPE")) { - if (!CarbonUtil.isValidSortOption(property._2)) { - throw new MalformedCarbonCommandException( - s"Invalid SORT_SCOPE ${ property._2 }, valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT', " + - s"'LOCAL_SORT' and 'GLOBAL_SORT'") - } else if (!property._2.equalsIgnoreCase("NO_SORT") && - (carbonTable.getNumberOfSortColumns == 0)) { + CommonUtil.validateSortScope(propertiesMap) + CommonUtil.validateSortColumns(carbonTable, propertiesMap) + // match SORT_SCOPE and SORT_COLUMNS + val newSortScope = propertiesMap.get(CarbonCommonConstants.SORT_SCOPE) + val newSortColumns = propertiesMap.get(CarbonCommonConstants.SORT_COLUMNS) + if (newSortScope.isDefined) { + // 1. check SORT_COLUMNS when SORT_SCOPE is not changed to NO_SORT + if (!SortScope.NO_SORT.name().equalsIgnoreCase(newSortScope.get)) { + if (newSortColumns.isDefined) { + if (StringUtils.isBlank(CarbonUtil.unquoteChar(newSortColumns.get))) { + throw new InvalidConfigurationException( + s"Cannot set SORT_COLUMNS as empty when setting SORT_SCOPE as ${newSortScope.get} ") + } + } else { + if (carbonTable.getNumberOfSortColumns == 0) { + throw new InvalidConfigurationException( + s"Cannot set SORT_SCOPE as ${newSortScope.get} when table has no SORT_COLUMNS") + } + } + } + } else if (newSortColumns.isDefined) { + // 2. check SORT_SCOPE when SORT_COLUMNS is changed to empty + if (StringUtils.isBlank(CarbonUtil.unquoteChar(newSortColumns.get))) { + if (!SortScope.NO_SORT.equals(carbonTable.getSortScope)) { throw new InvalidConfigurationException( - s"Cannot set SORT_SCOPE as ${ property._2 } when table has no SORT_COLUMNS") + s"Cannot set SORT_COLUMNS as empty when SORT_SCOPE is ${carbonTable.getSortScope} ") } } }