From 73390e08aad788698dd9f12e3513a4a4814afd73 Mon Sep 17 00:00:00 2001 From: manishgupta88 Date: Thu, 27 Dec 2018 15:18:07 +0530 Subject: [PATCH] Fixed performance issue for Implicit filter column 1. Removed serialization all the implicit filter values in each task. Instead serialized values only for the blocks going to particular task 2. Removed 2 times deserialization of implicit filter values in executor for each task. 1 time is sufficient --- .../blockletindex/BlockDataMap.java | 3 +- .../conditional/ImplicitExpression.java | 109 ++++++++++++++++ .../core/scan/filter/ColumnFilterInfo.java | 43 ++----- .../core/scan/filter/FilterUtil.java | 73 +++++++++-- .../ImplicitIncludeFilterExecutorImpl.java | 23 +++- .../core/scan/filter/intf/ExpressionType.java | 3 +- .../visitor/ImplicitColumnVisitor.java | 24 ++-- .../carbondata/hadoop/CarbonInputSplit.java | 28 +++++ .../hadoop/api/CarbonInputFormat.java | 43 ++++++- .../TestImplicitFilterExpression.scala | 117 ++++++++++++++++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 31 ++++- .../spark/sql/optimizer/CarbonFilters.scala | 15 ++- 12 files changed, 443 insertions(+), 69 deletions(-) create mode 100644 core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ImplicitExpression.java create mode 100644 integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java index 6b04cf73e4c..e29dfefcbf9 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder; import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore; import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.Blocklet; @@ -485,7 +486,7 @@ protected String getFileNameWithFilePath(DataMapRow dataMapRow, String filePath) String fileName = filePath + CarbonCommonConstants.FILE_SEPARATOR + new String( dataMapRow.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + CarbonTablePath.getCarbonDataExtension(); - return fileName; + return FileFactory.getUpdatedFilePath(fileName); } private void addTaskSummaryRowToUnsafeMemoryStore(CarbonRowSchema[] taskSummarySchema, diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ImplicitExpression.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ImplicitExpression.java new file mode 100644 index 00000000000..eab564e7555 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/conditional/ImplicitExpression.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.expression.conditional; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.ExpressionResult; +import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException; +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; + +import org.apache.commons.lang.StringUtils; + +/** + * Custom class to handle filter values for Implicit filter + */ +public class ImplicitExpression extends Expression { + + /** + * map that contains the mapping of block id to the valid blocklets in that block which contain + * the data as per the applied filter + */ + private Map> blockIdToBlockletIdMapping; + + public ImplicitExpression(List implicitFilterList) { + // initialize map with half the size of filter list as one block id can contain + // multiple blocklets + blockIdToBlockletIdMapping = new HashMap<>(implicitFilterList.size() / 2); + for (Expression value : implicitFilterList) { + String blockletPath = ((LiteralExpression) value).getLiteralExpValue().toString(); + addBlockEntry(blockletPath); + } + } + + public ImplicitExpression(Map> blockIdToBlockletIdMapping) { + this.blockIdToBlockletIdMapping = blockIdToBlockletIdMapping; + } + + private void addBlockEntry(String blockletPath) { + String blockId = + blockletPath.substring(0, blockletPath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); + Set blockletIds = blockIdToBlockletIdMapping.get(blockId); + if (null == blockletIds) { + blockletIds = new HashSet<>(); + blockIdToBlockletIdMapping.put(blockId, blockletIds); + } + blockletIds.add(Integer.parseInt(blockletPath.substring(blockId.length() + 1))); + } + + @Override public ExpressionResult evaluate(RowIntf value) + throws FilterUnsupportedException, FilterIllegalMemberException { + throw new UnsupportedOperationException("Operation not supported for Implicit expression"); + } + + public Map> getBlockIdToBlockletIdMapping() { + return blockIdToBlockletIdMapping; + } + + @Override public ExpressionType getFilterExpressionType() { + return ExpressionType.IMPLICIT; + } + + @Override public void findAndSetChild(Expression oldExpr, Expression newExpr) { + } + + @Override public String getString() { + StringBuilder value = new StringBuilder(); + value.append("ImplicitExpression("); + for (Map.Entry> entry : blockIdToBlockletIdMapping.entrySet()) { + value.append(entry.getKey()).append(" --> "); + value.append( + StringUtils.join(entry.getValue().toArray(new Integer[entry.getValue().size()]), ",")) + .append(";"); + // return maximum of 100 characters in the getString method + if (value.length() > 100) { + value.append("..."); + break; + } + } + value.append(')'); + return value.toString(); + } + + @Override public String getStatement() { + return getString(); + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java index 8677a2d2966..bf7694a18d3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/ColumnFilterInfo.java @@ -18,12 +18,10 @@ package org.apache.carbondata.core.scan.filter; import java.io.Serializable; -import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; -import org.apache.carbondata.core.constants.CarbonCommonConstants; - public class ColumnFilterInfo implements Serializable { private static final long serialVersionUID = 8181578747306832771L; @@ -34,9 +32,9 @@ public class ColumnFilterInfo implements Serializable { /** * Implicit column filter values to be used for block and blocklet pruning + * Contains block id to its blocklet mapping */ - private Set implicitColumnFilterList; - private transient Set implicitDriverColumnFilterList; + private Map> implicitColumnFilterBlockToBlockletsMap; private List excludeFilterList; /** * maintain the no dictionary filter values list. @@ -85,15 +83,16 @@ public List getExcludeFilterList() { public void setExcludeFilterList(List excludeFilterList) { this.excludeFilterList = excludeFilterList; } - public Set getImplicitColumnFilterList() { - return implicitColumnFilterList; + public Map> getImplicitColumnFilterBlockToBlockletsMap() { + return implicitColumnFilterBlockToBlockletsMap; } - public void setImplicitColumnFilterList(List implicitColumnFilterList) { + public void setImplicitColumnFilterBlockToBlockletsMap( + Map> implicitColumnFilterBlockToBlockletsMap) { // this is done to improve the query performance. As the list of size increases time taken to // search in list will increase as list contains method uses equals check internally but set // will be very fast as it will directly use the has code to find the bucket and search - this.implicitColumnFilterList = new HashSet<>(implicitColumnFilterList); + this.implicitColumnFilterBlockToBlockletsMap = implicitColumnFilterBlockToBlockletsMap; } public List getMeasuresFilterValuesList() { @@ -103,30 +102,4 @@ public List getMeasuresFilterValuesList() { public void setMeasuresFilterValuesList(List measuresFilterValuesList) { this.measuresFilterValuesList = measuresFilterValuesList; } - - public Set getImplicitDriverColumnFilterList() { - // this list is required to be populated only n case of driver, so in executor this check will - // avoid unnecessary loading of the driver filter list - if (implicitDriverColumnFilterList != null) { - return implicitDriverColumnFilterList; - } - synchronized (this) { - if (null == implicitDriverColumnFilterList) { - // populate only once. (can be called in multi-thread) - implicitDriverColumnFilterList = populateBlockIdListForDriverBlockPruning(); - } - } - return implicitDriverColumnFilterList; - } - - private Set populateBlockIdListForDriverBlockPruning() { - Set columnFilterList = new HashSet<>(implicitColumnFilterList.size()); - String blockId; - for (String blockletId : implicitColumnFilterList) { - blockId = - blockletId.substring(0, blockletId.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); - columnFilterList.add(blockId); - } - return columnFilterList; - } } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java index f382f0b29be..15b8cbaa04c 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java @@ -71,6 +71,7 @@ import org.apache.carbondata.core.scan.expression.ExpressionResult; import org.apache.carbondata.core.scan.expression.LiteralExpression; import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression; +import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression; import org.apache.carbondata.core.scan.expression.conditional.InExpression; import org.apache.carbondata.core.scan.expression.conditional.ListExpression; import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException; @@ -1996,16 +1997,16 @@ public static void updateIndexOfColumnExpression(Expression exp, int dimOridnalM * This method will get the no dictionary data based on filters and same * will be in DimColumnFilterInfo * - * @param evaluateResultListFinal + * @param implicitColumnFilterList * @param isIncludeFilter * @return */ - public static ColumnFilterInfo getImplicitColumnFilterList(List evaluateResultListFinal, - boolean isIncludeFilter) { + public static ColumnFilterInfo getImplicitColumnFilterList( + Map> implicitColumnFilterList, boolean isIncludeFilter) { ColumnFilterInfo columnFilterInfo = new ColumnFilterInfo(); columnFilterInfo.setIncludeFilter(isIncludeFilter); - if (null != evaluateResultListFinal) { - columnFilterInfo.setImplicitColumnFilterList(evaluateResultListFinal); + if (null != implicitColumnFilterList) { + columnFilterInfo.setImplicitColumnFilterBlockToBlockletsMap(implicitColumnFilterList); } return columnFilterInfo; } @@ -2019,6 +2020,44 @@ public static ColumnFilterInfo getImplicitColumnFilterList(List evaluate * @param expression */ public static void removeInExpressionNodeWithPositionIdColumn(Expression expression) { + if (null != getImplicitFilterExpression(expression)) { + setTrueExpressionAsRightChild(expression); + } + } + + /** + * This method will check for ColumnExpression with column name positionID and if found will + * replace the InExpression with true expression. This is done to stop serialization of List + * expression which is right children of InExpression as it can impact the query performance + * as the size of list grows bigger. + * + * @param expression + */ + public static void setTrueExpressionAsRightChild(Expression expression) { + setNewExpressionForRightChild(expression, new TrueExpression(null)); + } + + /** + * Method to remove right child of the AND expression and set new expression for right child + * + * @param expression + * @param rightChild + */ + public static void setNewExpressionForRightChild(Expression expression, Expression rightChild) { + // Remove the right expression node and point the expression to left node expression + expression.findAndSetChild(((AndExpression) expression).getRight(), rightChild); + LOGGER.info("In expression removed from the filter expression list to prevent it from" + + " serializing on executor"); + } + + /** + * This methdd will check if ImplictFilter is present or not + * if it is present then return that ImplicitFilterExpression + * + * @param expression + * @return + */ + public static Expression getImplicitFilterExpression(Expression expression) { ExpressionType filterExpressionType = expression.getFilterExpressionType(); if (ExpressionType.AND == filterExpressionType) { Expression rightExpression = ((AndExpression) expression).getRight(); @@ -2030,14 +2069,30 @@ public static void removeInExpressionNodeWithPositionIdColumn(Expression express if (childExpression instanceof ColumnExpression && ((ColumnExpression) childExpression) .getColumnName().equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) { // Remove the right expression node and point the expression to left node expression - expression - .findAndSetChild(((AndExpression) expression).getRight(), new TrueExpression(null)); - LOGGER.info("In expression removed from the filter expression list to prevent it from" - + " serializing on executor"); + // if 1st children is implict column positionID then 2nd children will be + // implicit filter list + return children.get(1); } } } } + return null; + } + + /** + * This method will create implicit expression and set as right child in the current expression + * + * @param expression + * @param blockIdToBlockletIdMapping + */ + public static void createImplicitExpressionAndSetAsRightChild(Expression expression, + Map> blockIdToBlockletIdMapping) { + ColumnExpression columnExpression = + new ColumnExpression(CarbonCommonConstants.POSITION_ID, DataTypes.STRING); + ImplicitExpression implicitExpression = new ImplicitExpression(blockIdToBlockletIdMapping); + InExpression inExpression = new InExpression(columnExpression, implicitExpression); + setNewExpressionForRightChild(expression, inExpression); + LOGGER.info("Implicit expression added to the filter expression"); } /** diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ImplicitIncludeFilterExecutorImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ImplicitIncludeFilterExecutorImpl.java index fd93e5913a7..c9325043d5a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ImplicitIncludeFilterExecutorImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/ImplicitIncludeFilterExecutorImpl.java @@ -19,7 +19,9 @@ import java.io.IOException; import java.util.BitSet; +import java.util.Set; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.intf.RowIntf; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo; @@ -85,13 +87,24 @@ public BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][] boolean isScanRequired = false; String shortBlockId = CarbonTablePath.getShortBlockId(uniqueBlockPath); if (uniqueBlockPath.endsWith(".carbondata")) { - if (dimColumnEvaluatorInfo.getFilterValues().getImplicitDriverColumnFilterList() - .contains(shortBlockId)) { + if (dimColumnEvaluatorInfo.getFilterValues().getImplicitColumnFilterBlockToBlockletsMap() + .containsKey(shortBlockId)) { isScanRequired = true; } - } else if (dimColumnEvaluatorInfo.getFilterValues().getImplicitColumnFilterList() - .contains(shortBlockId)) { - isScanRequired = true; + } else { + // in case of CACHE_LEVEL = BLOCKLET, shortBlockId contains both block id and blocklet id + // so separating out block id for look up in implicit filter + String blockId = + shortBlockId.substring(0, shortBlockId.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); + Set blockletIds = + dimColumnEvaluatorInfo.getFilterValues().getImplicitColumnFilterBlockToBlockletsMap() + .get(blockId); + if (null != blockletIds) { + int idInUniqueBlockPath = Integer.parseInt(shortBlockId.substring(blockId.length() + 1)); + if (blockletIds.contains(idInUniqueBlockPath)) { + isScanRequired = true; + } + } } if (isScanRequired) { bitSet.set(0); diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java index d66c5b41ed1..a89a84f432a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java @@ -43,5 +43,6 @@ public enum ExpressionType { STARTSWITH, ENDSWITH, CONTAINSWITH, - TEXT_MATCH + TEXT_MATCH, + IMPLICIT } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/ImplicitColumnVisitor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/ImplicitColumnVisitor.java index 32a3b12f1a5..1653a4a0d93 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/ImplicitColumnVisitor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/visitor/ImplicitColumnVisitor.java @@ -16,10 +16,10 @@ */ package org.apache.carbondata.core.scan.filter.resolver.resolverinfo.visitor; -import java.io.IOException; -import java.util.List; +import java.util.Map; +import java.util.Set; -import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException; +import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.core.scan.filter.ColumnFilterInfo; import org.apache.carbondata.core.scan.filter.FilterUtil; @@ -43,18 +43,18 @@ public class ImplicitColumnVisitor implements ResolvedFilterInfoVisitorIntf { */ @Override public void populateFilterResolvedInfo(ColumnResolvedFilterInfo visitableObj, - FilterResolverMetadata metadata) throws FilterUnsupportedException, IOException { + FilterResolverMetadata metadata) throws FilterUnsupportedException { if (visitableObj instanceof DimColumnResolvedFilterInfo) { ColumnFilterInfo resolvedFilterObject = null; - List evaluateResultListFinal; - try { - evaluateResultListFinal = metadata.getExpression().evaluate(null).getListAsString(); - } catch (FilterIllegalMemberException e) { - throw new FilterUnsupportedException(e); + if (metadata.getExpression() instanceof ImplicitExpression) { + Map> blockIdToBlockletIdMapping = + ((ImplicitExpression) metadata.getExpression()).getBlockIdToBlockletIdMapping(); + resolvedFilterObject = FilterUtil + .getImplicitColumnFilterList(blockIdToBlockletIdMapping, metadata.isIncludeFilter()); + ((DimColumnResolvedFilterInfo) visitableObj).setFilterValues(resolvedFilterObject); + } else { + throw new FilterUnsupportedException("Expression not an instance of implicit expression"); } - resolvedFilterObject = FilterUtil - .getImplicitColumnFilterList(evaluateResultListFinal, metadata.isIncludeFilter()); - ((DimColumnResolvedFilterInfo)visitableObj).setFilterValues(resolvedFilterObject); } } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index 405ff532afa..de2451b9336 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -23,8 +23,10 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.Segment; @@ -89,6 +91,11 @@ public class CarbonInputSplit extends FileSplit private FileFormat fileFormat = FileFormat.COLUMNAR_V3; private String dataMapWritePath; + /** + * validBlockletIds will contain the valid blocklted ids for a given block that contains the data + * after pruning from driver. These will be used in executor for further pruning of blocklets + */ + private Set validBlockletIds; public CarbonInputSplit() { segment = null; @@ -252,6 +259,11 @@ public Segment getSegment() { if (dataMapWriterPathExists) { dataMapWritePath = in.readUTF(); } + int validBlockletIdCount = in.readShort(); + validBlockletIds = new HashSet<>(validBlockletIdCount); + for (int i = 0; i < validBlockletIdCount; i++) { + validBlockletIds.add((int) in.readShort()); + } } @Override public void write(DataOutput out) throws IOException { @@ -278,6 +290,10 @@ public Segment getSegment() { if (dataMapWritePath != null) { out.writeUTF(dataMapWritePath); } + out.writeShort(getValidBlockletIds().size()); + for (Integer blockletId : getValidBlockletIds()) { + out.writeShort(blockletId); + } } public List getInvalidSegments() { @@ -444,4 +460,16 @@ public void setFormat(FileFormat fileFormat) { public Blocklet makeBlocklet() { return new Blocklet(getPath().getName(), blockletId); } + + public Set getValidBlockletIds() { + if (null == validBlockletIds) { + validBlockletIds = new HashSet<>(); + } + return validBlockletIds; + } + + public void setValidBlockletIds(Set validBlockletIds) { + this.validBlockletIds = validBlockletIds; + } + } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java index 9b43877abf3..24691f248fe 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java @@ -48,6 +48,7 @@ import org.apache.carbondata.core.profiler.ExplainCollector; import org.apache.carbondata.core.readcommitter.ReadCommittedScope; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.FilterUtil; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.core.scan.model.QueryModel; import org.apache.carbondata.core.scan.model.QueryModelBuilder; @@ -611,7 +612,8 @@ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) th @Override public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); - QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext); + QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext, + getFilterPredicates(taskAttemptContext.getConfiguration())); CarbonReadSupport readSupport = getReadSupportClass(configuration); return new CarbonRecordReader(queryModel, readSupport, taskAttemptContext.getConfiguration()); @@ -619,6 +621,12 @@ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) th public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException { + return createQueryModel(inputSplit, taskAttemptContext, + getFilterPredicates(taskAttemptContext.getConfiguration())); + } + + public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext, + Expression filterExpression) throws IOException { Configuration configuration = taskAttemptContext.getConfiguration(); CarbonTable carbonTable = getOrCreateCarbonTable(configuration); @@ -630,9 +638,10 @@ public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext tas } else { projectColumns = new String[]{}; } + checkAndAddImplicitExpression(filterExpression, inputSplit); QueryModel queryModel = new QueryModelBuilder(carbonTable) .projectColumns(projectColumns) - .filterExpression(getFilterPredicates(configuration)) + .filterExpression(filterExpression) .dataConverter(getDataTypeConverter(configuration)) .build(); @@ -652,6 +661,36 @@ public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext tas return queryModel; } + /** + * This method will create an Implict Expression and set it as right child in the given + * expression + * + * @param expression + * @param inputSplit + */ + private void checkAndAddImplicitExpression(Expression expression, InputSplit inputSplit) { + if (inputSplit instanceof CarbonMultiBlockSplit) { + CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; + List splits = split.getAllSplits(); + // iterate over all the splits and create block to bblocklet mapping + Map> blockIdToBlockletIdMapping = new HashMap<>(); + for (CarbonInputSplit carbonInputSplit : splits) { + Set validBlockletIds = carbonInputSplit.getValidBlockletIds(); + if (null != validBlockletIds && !validBlockletIds.isEmpty()) { + String uniqueBlockPath = carbonInputSplit.getPath().toString(); + String shortBlockPath = CarbonTablePath + .getShortBlockId(uniqueBlockPath.substring(uniqueBlockPath.lastIndexOf("/Part") + 1)); + blockIdToBlockletIdMapping.put(shortBlockPath, validBlockletIds); + } + } + if (!blockIdToBlockletIdMapping.isEmpty()) { + // create implicit expression and set as right child + FilterUtil + .createImplicitExpressionAndSetAsRightChild(expression, blockIdToBlockletIdMapping); + } + } + } + public CarbonReadSupport getReadSupportClass(Configuration configuration) { String readSupportClass = configuration.get(CARBON_READ_SUPPORT); //By default it uses dictionary decoder read class diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala new file mode 100644 index 00000000000..e28eaad637d --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/TestImplicitFilterExpression.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.filterexpr + +import java.util + +import org.apache.spark.sql.{CarbonEnv, DataFrame, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan +import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.scan.expression.logical.{AndExpression, TrueExpression} +import org.apache.carbondata.core.scan.filter.FilterUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.spark.rdd.CarbonScanRDD + +/** + * test class to verify the functionality of Implicit filter expression + */ +class TestImplicitFilterExpression extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + sql("drop table if exists implicit_test") + sql( + "create table implicit_test(firstname string, lastname string, age int) stored by " + + "'carbondata'") + sql("insert into implicit_test select 'bob','marshall',35") + } + + test("test implicit filter expression for data pruning with valid implicit filter value") { + val query: DataFrame = sql("select count(*) from implicit_test where lastname='marshall'") + // 1 row should be returned for blockletId 0 + verifyResultWithImplicitFilter(query, 1, 0) + } + + test("test implicit filter expression for data pruning with invalid implicit filter value") { + val query: DataFrame = sql("select count(*) from implicit_test where lastname='marshall'") + // No row should be returned for blockletId 1 + verifyResultWithImplicitFilter(query, 0, 1) + } + + private def verifyResultWithImplicitFilter(query: DataFrame, + expectedResultCount: Int, + blockletId: Int): Unit = { + // from the plan extract the CarbonScanRDD + val scanRDD = query.queryExecution.sparkPlan.collect { + case scan: CarbonDataSourceScan if (scan.rdd.isInstanceOf[CarbonScanRDD[InternalRow]]) => + scan.rdd.asInstanceOf[CarbonScanRDD[InternalRow]] + }.head + // get carbon relation + val relation: CarbonRelation = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore + .lookupRelation(Some("default"), "implicit_test")(sqlContext.sparkSession) + .asInstanceOf[CarbonRelation] + // get carbon table from carbon relation + val carbonTable = relation.carbonTable + // get the segment path + val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0") + // list carbondata files from the segment path + val files = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter { + override def accept(file: CarbonFile): Boolean = { + if (file.getName.endsWith(CarbonTablePath.getCarbonDataExtension)) { + true + } else { + false + } + } + }) + // assert that only 1 carbondata file exists + assert(files.length == 1) + // get the carbondata file complete path and name + val carbondataFileName = FileFactory.getUpdatedFilePath(files.head.getPath) + // get the shourt unique name for the carbondata file + // Example: complete file name will be as below + // /opt/db/implicit_test/Fact/Part0/Segment_0/part-0-0_batchno0-0-0-1545986389020.carbondata + // short file name: 0/0/0-0_batchno0-0-0-1545986389020 + val carbondataFileShortName = CarbonTablePath + .getShortBlockId(carbondataFileName.substring(carbondataFileName.lastIndexOf("/Part") + 1)) + // create block to blocklet mapping indicating which all blocklets for a given block + // contain the data + val blockToBlockletMap = new util.HashMap[String, util.Set[Integer]]() + val blockletList = new util.HashSet[Integer]() + // add blocklet Id 0 to the list + blockletList.add(blockletId) + blockToBlockletMap.put(carbondataFileShortName, blockletList) + // create a new AND expression with True expression as right child + val filterExpression = new AndExpression(scanRDD.filterExpression, new TrueExpression(null)) + // create implicit expression which will replace the right child (True expression) + FilterUtil.createImplicitExpressionAndSetAsRightChild(filterExpression, blockToBlockletMap) + // update the filter expression + scanRDD.filterExpression = filterExpression + // execute the query and get the result count + checkAnswer(query.toDF(), Seq(Row(expectedResultCount))) + } + + override def afterAll(): Unit = { + sql("drop table if exists implicit_test") + } + +} diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 28049b506fd..a32a8de729f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -50,11 +50,13 @@ import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression import org.apache.carbondata.core.scan.filter.FilterUtil import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants} import org.apache.carbondata.core.statusmanager.FileFormat import org.apache.carbondata.core.util._ +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop._ import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat @@ -416,7 +418,7 @@ class CarbonScanRDD[T: ClassTag]( TaskMetricsMap.getInstance().registerThreadCallback() inputMetricsStats.initBytesReadCallback(context, inputSplit) val iterator = if (inputSplit.getAllSplits.size() > 0) { - val model = format.createQueryModel(inputSplit, attemptContext) + val model = format.createQueryModel(inputSplit, attemptContext, filterExpression) // one query id per table model.setQueryId(queryId) // get RecordReader by FileFormat @@ -687,6 +689,33 @@ class CarbonScanRDD[T: ClassTag]( if (identifiedPartitions.nonEmpty && !checkForBlockWithoutBlockletInfo(identifiedPartitions)) { FilterUtil.removeInExpressionNodeWithPositionIdColumn(filterExpression) + } else if (identifiedPartitions.nonEmpty) { + // the below piece of code will serialize only the required blocklet ids + val filterValues = FilterUtil.getImplicitFilterExpression(filterExpression) + if (null != filterValues) { + val implicitExpression = filterValues.asInstanceOf[ImplicitExpression] + identifiedPartitions.foreach { partition => + // for each partition get the list if input split + val inputSplit = partition.asInstanceOf[CarbonSparkPartition].split.value + val splitList = if (inputSplit.isInstanceOf[CarbonMultiBlockSplit]) { + inputSplit.asInstanceOf[CarbonMultiBlockSplit].getAllSplits + } else { + new java.util.ArrayList().add(inputSplit.asInstanceOf[CarbonInputSplit]) + }.asInstanceOf[java.util.List[CarbonInputSplit]] + // for each split and given block path set all the valid blocklet ids + splitList.asScala.map { split => + val uniqueBlockPath = split.getPath.toString + val shortBlockPath = CarbonTablePath + .getShortBlockId(uniqueBlockPath + .substring(uniqueBlockPath.lastIndexOf("/Part") + 1)) + val blockletIds = implicitExpression.getBlockIdToBlockletIdMapping.get(shortBlockPath) + split.setValidBlockletIds(blockletIds) + } + } + // remove the right child of the expression here to prevent serialization of + // implicit filter values to executor + FilterUtil.setTrueExpressionAsRightChild(filterExpression) + } } } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index b96b6a76af8..98359383967 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -350,9 +350,18 @@ object CarbonFilters { ) } case In(left, right) if (isCarbonSupportedDataTypes(left)) => - new InExpression(transformExpression(left), - new ListExpression(convertToJavaList(right.filter(_ != null).filter(!isNullLiteral(_)) - .map(transformExpression)))) + left match { + case left: AttributeReference if (left.name + .equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) => + new InExpression(transformExpression(left), + new ImplicitExpression(convertToJavaList(right.filter(_ != null) + .filter(!isNullLiteral(_)) + .map(transformExpression)))) + case _ => + new InExpression(transformExpression(left), + new ListExpression(convertToJavaList(right.filter(_ != null).filter(!isNullLiteral(_)) + .map(transformExpression)))) + } case InSet(left, right) if (isCarbonSupportedDataTypes(left)) => val validData = right.filter(_ != null).map { x => val e = Literal(x.toString)