Skip to content

Commit

Permalink
[CARBONDATA-3217] Optimize implicit filter expression performance by …
Browse files Browse the repository at this point in the history
…removing extra serialization

Fixed performance issue for Implicit filter column
1. Removed serialization all the implicit filter values in each task. Instead serialized values only for the blocks going to particular task
2. Removed 2 times deserialization of implicit filter values in executor for each task. 1 time is sufficient

This closes #3039
  • Loading branch information
manishgupta88 authored and kumarvishal09 committed Jan 4, 2019
1 parent 9fa045d commit bc1e944
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 69 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.AbstractMemoryDMStore;
import org.apache.carbondata.core.indexstore.BlockMetaInfo;
import org.apache.carbondata.core.indexstore.Blocklet;
Expand Down Expand Up @@ -485,7 +486,7 @@ protected String getFileNameWithFilePath(DataMapRow dataMapRow, String filePath)
String fileName = filePath + CarbonCommonConstants.FILE_SEPARATOR + new String(
dataMapRow.getByteArray(FILE_PATH_INDEX), CarbonCommonConstants.DEFAULT_CHARSET_CLASS)
+ CarbonTablePath.getCarbonDataExtension();
return fileName;
return FileFactory.getUpdatedFilePath(fileName);
}

private void addTaskSummaryRowToUnsafeMemoryStore(CarbonRowSchema[] taskSummarySchema,
Expand Down
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.core.scan.expression.conditional;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.ExpressionResult;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;

import org.apache.commons.lang.StringUtils;

/**
* Custom class to handle filter values for Implicit filter
*/
public class ImplicitExpression extends Expression {

/**
* map that contains the mapping of block id to the valid blocklets in that block which contain
* the data as per the applied filter
*/
private Map<String, Set<Integer>> blockIdToBlockletIdMapping;

public ImplicitExpression(List<Expression> implicitFilterList) {
// initialize map with half the size of filter list as one block id can contain
// multiple blocklets
blockIdToBlockletIdMapping = new HashMap<>(implicitFilterList.size() / 2);
for (Expression value : implicitFilterList) {
String blockletPath = ((LiteralExpression) value).getLiteralExpValue().toString();
addBlockEntry(blockletPath);
}
}

public ImplicitExpression(Map<String, Set<Integer>> blockIdToBlockletIdMapping) {
this.blockIdToBlockletIdMapping = blockIdToBlockletIdMapping;
}

private void addBlockEntry(String blockletPath) {
String blockId =
blockletPath.substring(0, blockletPath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
Set<Integer> blockletIds = blockIdToBlockletIdMapping.get(blockId);
if (null == blockletIds) {
blockletIds = new HashSet<>();
blockIdToBlockletIdMapping.put(blockId, blockletIds);
}
blockletIds.add(Integer.parseInt(blockletPath.substring(blockId.length() + 1)));
}

@Override public ExpressionResult evaluate(RowIntf value)
throws FilterUnsupportedException, FilterIllegalMemberException {
throw new UnsupportedOperationException("Operation not supported for Implicit expression");
}

public Map<String, Set<Integer>> getBlockIdToBlockletIdMapping() {
return blockIdToBlockletIdMapping;
}

@Override public ExpressionType getFilterExpressionType() {
return ExpressionType.IMPLICIT;
}

@Override public void findAndSetChild(Expression oldExpr, Expression newExpr) {
}

@Override public String getString() {
StringBuilder value = new StringBuilder();
value.append("ImplicitExpression(");
for (Map.Entry<String, Set<Integer>> entry : blockIdToBlockletIdMapping.entrySet()) {
value.append(entry.getKey()).append(" --> ");
value.append(
StringUtils.join(entry.getValue().toArray(new Integer[entry.getValue().size()]), ","))
.append(";");
// return maximum of 100 characters in the getString method
if (value.length() > 100) {
value.append("...");
break;
}
}
value.append(')');
return value.toString();
}

@Override public String getStatement() {
return getString();
}
}
Expand Up @@ -18,12 +18,10 @@
package org.apache.carbondata.core.scan.filter;

import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.carbondata.core.constants.CarbonCommonConstants;

public class ColumnFilterInfo implements Serializable {

private static final long serialVersionUID = 8181578747306832771L;
Expand All @@ -34,9 +32,9 @@ public class ColumnFilterInfo implements Serializable {

/**
* Implicit column filter values to be used for block and blocklet pruning
* Contains block id to its blocklet mapping
*/
private Set<String> implicitColumnFilterList;
private transient Set<String> implicitDriverColumnFilterList;
private Map<String, Set<Integer>> implicitColumnFilterBlockToBlockletsMap;
private List<Integer> excludeFilterList;
/**
* maintain the no dictionary filter values list.
Expand Down Expand Up @@ -85,15 +83,16 @@ public List<Integer> getExcludeFilterList() {
public void setExcludeFilterList(List<Integer> excludeFilterList) {
this.excludeFilterList = excludeFilterList;
}
public Set<String> getImplicitColumnFilterList() {
return implicitColumnFilterList;
public Map<String, Set<Integer>> getImplicitColumnFilterBlockToBlockletsMap() {
return implicitColumnFilterBlockToBlockletsMap;
}

public void setImplicitColumnFilterList(List<String> implicitColumnFilterList) {
public void setImplicitColumnFilterBlockToBlockletsMap(
Map<String, Set<Integer>> implicitColumnFilterBlockToBlockletsMap) {
// this is done to improve the query performance. As the list of size increases time taken to
// search in list will increase as list contains method uses equals check internally but set
// will be very fast as it will directly use the has code to find the bucket and search
this.implicitColumnFilterList = new HashSet<>(implicitColumnFilterList);
this.implicitColumnFilterBlockToBlockletsMap = implicitColumnFilterBlockToBlockletsMap;
}

public List<Object> getMeasuresFilterValuesList() {
Expand All @@ -103,30 +102,4 @@ public List<Object> getMeasuresFilterValuesList() {
public void setMeasuresFilterValuesList(List<Object> measuresFilterValuesList) {
this.measuresFilterValuesList = measuresFilterValuesList;
}

public Set<String> getImplicitDriverColumnFilterList() {
// this list is required to be populated only n case of driver, so in executor this check will
// avoid unnecessary loading of the driver filter list
if (implicitDriverColumnFilterList != null) {
return implicitDriverColumnFilterList;
}
synchronized (this) {
if (null == implicitDriverColumnFilterList) {
// populate only once. (can be called in multi-thread)
implicitDriverColumnFilterList = populateBlockIdListForDriverBlockPruning();
}
}
return implicitDriverColumnFilterList;
}

private Set<String> populateBlockIdListForDriverBlockPruning() {
Set<String> columnFilterList = new HashSet<>(implicitColumnFilterList.size());
String blockId;
for (String blockletId : implicitColumnFilterList) {
blockId =
blockletId.substring(0, blockletId.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
columnFilterList.add(blockId);
}
return columnFilterList;
}
}
Expand Up @@ -71,6 +71,7 @@
import org.apache.carbondata.core.scan.expression.ExpressionResult;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
import org.apache.carbondata.core.scan.expression.conditional.ImplicitExpression;
import org.apache.carbondata.core.scan.expression.conditional.InExpression;
import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
Expand Down Expand Up @@ -1996,16 +1997,16 @@ public static void updateIndexOfColumnExpression(Expression exp, int dimOridnalM
* This method will get the no dictionary data based on filters and same
* will be in DimColumnFilterInfo
*
* @param evaluateResultListFinal
* @param implicitColumnFilterList
* @param isIncludeFilter
* @return
*/
public static ColumnFilterInfo getImplicitColumnFilterList(List<String> evaluateResultListFinal,
boolean isIncludeFilter) {
public static ColumnFilterInfo getImplicitColumnFilterList(
Map<String, Set<Integer>> implicitColumnFilterList, boolean isIncludeFilter) {
ColumnFilterInfo columnFilterInfo = new ColumnFilterInfo();
columnFilterInfo.setIncludeFilter(isIncludeFilter);
if (null != evaluateResultListFinal) {
columnFilterInfo.setImplicitColumnFilterList(evaluateResultListFinal);
if (null != implicitColumnFilterList) {
columnFilterInfo.setImplicitColumnFilterBlockToBlockletsMap(implicitColumnFilterList);
}
return columnFilterInfo;
}
Expand All @@ -2019,6 +2020,44 @@ public static ColumnFilterInfo getImplicitColumnFilterList(List<String> evaluate
* @param expression
*/
public static void removeInExpressionNodeWithPositionIdColumn(Expression expression) {
if (null != getImplicitFilterExpression(expression)) {
setTrueExpressionAsRightChild(expression);
}
}

/**
* This method will check for ColumnExpression with column name positionID and if found will
* replace the InExpression with true expression. This is done to stop serialization of List
* expression which is right children of InExpression as it can impact the query performance
* as the size of list grows bigger.
*
* @param expression
*/
public static void setTrueExpressionAsRightChild(Expression expression) {
setNewExpressionForRightChild(expression, new TrueExpression(null));
}

/**
* Method to remove right child of the AND expression and set new expression for right child
*
* @param expression
* @param rightChild
*/
public static void setNewExpressionForRightChild(Expression expression, Expression rightChild) {
// Remove the right expression node and point the expression to left node expression
expression.findAndSetChild(((AndExpression) expression).getRight(), rightChild);
LOGGER.info("In expression removed from the filter expression list to prevent it from"
+ " serializing on executor");
}

/**
* This methdd will check if ImplictFilter is present or not
* if it is present then return that ImplicitFilterExpression
*
* @param expression
* @return
*/
public static Expression getImplicitFilterExpression(Expression expression) {
ExpressionType filterExpressionType = expression.getFilterExpressionType();
if (ExpressionType.AND == filterExpressionType) {
Expression rightExpression = ((AndExpression) expression).getRight();
Expand All @@ -2030,14 +2069,30 @@ public static void removeInExpressionNodeWithPositionIdColumn(Expression express
if (childExpression instanceof ColumnExpression && ((ColumnExpression) childExpression)
.getColumnName().equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) {
// Remove the right expression node and point the expression to left node expression
expression
.findAndSetChild(((AndExpression) expression).getRight(), new TrueExpression(null));
LOGGER.info("In expression removed from the filter expression list to prevent it from"
+ " serializing on executor");
// if 1st children is implict column positionID then 2nd children will be
// implicit filter list
return children.get(1);
}
}
}
}
return null;
}

/**
* This method will create implicit expression and set as right child in the current expression
*
* @param expression
* @param blockIdToBlockletIdMapping
*/
public static void createImplicitExpressionAndSetAsRightChild(Expression expression,
Map<String, Set<Integer>> blockIdToBlockletIdMapping) {
ColumnExpression columnExpression =
new ColumnExpression(CarbonCommonConstants.POSITION_ID, DataTypes.STRING);
ImplicitExpression implicitExpression = new ImplicitExpression(blockIdToBlockletIdMapping);
InExpression inExpression = new InExpression(columnExpression, implicitExpression);
setNewExpressionForRightChild(expression, inExpression);
LOGGER.info("Implicit expression added to the filter expression");
}

/**
Expand Down
Expand Up @@ -19,7 +19,9 @@

import java.io.IOException;
import java.util.BitSet;
import java.util.Set;

import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
Expand Down Expand Up @@ -85,13 +87,24 @@ public BitSet isFilterValuesPresentInBlockOrBlocklet(byte[][] maxValue, byte[][]
boolean isScanRequired = false;
String shortBlockId = CarbonTablePath.getShortBlockId(uniqueBlockPath);
if (uniqueBlockPath.endsWith(".carbondata")) {
if (dimColumnEvaluatorInfo.getFilterValues().getImplicitDriverColumnFilterList()
.contains(shortBlockId)) {
if (dimColumnEvaluatorInfo.getFilterValues().getImplicitColumnFilterBlockToBlockletsMap()
.containsKey(shortBlockId)) {
isScanRequired = true;
}
} else if (dimColumnEvaluatorInfo.getFilterValues().getImplicitColumnFilterList()
.contains(shortBlockId)) {
isScanRequired = true;
} else {
// in case of CACHE_LEVEL = BLOCKLET, shortBlockId contains both block id and blocklet id
// so separating out block id for look up in implicit filter
String blockId =
shortBlockId.substring(0, shortBlockId.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
Set<Integer> blockletIds =
dimColumnEvaluatorInfo.getFilterValues().getImplicitColumnFilterBlockToBlockletsMap()
.get(blockId);
if (null != blockletIds) {
int idInUniqueBlockPath = Integer.parseInt(shortBlockId.substring(blockId.length() + 1));
if (blockletIds.contains(idInUniqueBlockPath)) {
isScanRequired = true;
}
}
}
if (isScanRequired) {
bitSet.set(0);
Expand Down
Expand Up @@ -43,5 +43,6 @@ public enum ExpressionType {
STARTSWITH,
ENDSWITH,
CONTAINSWITH,
TEXT_MATCH
TEXT_MATCH,
IMPLICIT
}

0 comments on commit bc1e944

Please sign in to comment.