Skip to content

Commit

Permalink
Merge f8237a9 into fcca6c5
Browse files Browse the repository at this point in the history
  • Loading branch information
BJangir committed May 28, 2019
2 parents fcca6c5 + f8237a9 commit 726a1bc
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 0 deletions.
Expand Up @@ -334,4 +334,12 @@ public boolean isFallbackJob() {
public boolean isJobToClearDataMaps() {
return isJobToClearDataMaps;
}

public FilterResolverIntf getFilterResolverIntf() {
return filterResolverIntf;
}

public void setFilterResolverIntf(FilterResolverIntf filterResolverIntf) {
this.filterResolverIntf = filterResolverIntf;
}
}
Expand Up @@ -42,6 +42,8 @@
import org.apache.carbondata.core.scan.expression.conditional.StartsWithExpression;
import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
import org.apache.carbondata.core.scan.expression.logical.AndExpression;
import org.apache.carbondata.core.scan.expression.logical.OrExpression;
import org.apache.carbondata.core.scan.expression.logical.TrueExpression;
import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecutor;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
Expand Down Expand Up @@ -487,4 +489,45 @@ public static boolean isScanRequired(FilterExecuter filterExecuter, byte[][] max
return !bitSet.isEmpty();
}
}

/**
* Remove UnknownExpression and change to TrueExpression
*
* @param expressionTree
* @return expressionTree without UnknownExpression
*/
public Expression removeUnknownExpression(Expression expressionTree) {
ExpressionType filterExpressionType = expressionTree.getFilterExpressionType();
BinaryExpression currentExpression = null;
switch (filterExpressionType) {
case OR:
currentExpression = (BinaryExpression) expressionTree;
return new OrExpression(
removeUnknownExpression(currentExpression.getLeft()),
removeUnknownExpression(currentExpression.getRight())
);
case AND:
currentExpression = (BinaryExpression) expressionTree;
return new AndExpression(
removeUnknownExpression(currentExpression.getLeft()),
removeUnknownExpression(currentExpression.getRight())
);
case UNKNOWN:
return new TrueExpression(null);
default:
return expressionTree;
}
}

/**
* Change UnknownReslover to TrueExpression Reslover.
*
* @param tableIdentifier
* @return
*/
public FilterResolverIntf changeUnknownResloverToTrue(AbsoluteTableIdentifier tableIdentifier) {
return getFilterResolverBasedOnExpressionType(ExpressionType.TRUE, false,
new TrueExpression(null), tableIdentifier, new TrueExpression(null));

}
}
Expand Up @@ -26,6 +26,11 @@ import org.apache.spark.util.SizeEstimator
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.datamap.{AbstractDataMapJob, DistributableDataMapFormat}
import org.apache.carbondata.core.indexstore.ExtendedBlocklet
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.scan.expression.BinaryExpression
import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor
import org.apache.carbondata.core.scan.filter.intf.ExpressionType
import org.apache.carbondata.core.scan.filter.resolver.{FilterResolverIntf, LogicalFilterResolverImpl, RowLevelFilterResolverImpl}
import org.apache.carbondata.spark.util.CarbonScalaUtil.logTime

/**
Expand All @@ -42,11 +47,45 @@ class DistributedDataMapJob extends AbstractDataMapJob {
LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
}
val (resonse, time) = logTime {
var filterInf = dataMapFormat.getFilterResolverIntf
val filterProcessor = new FilterExpressionProcessor
filterInf = removeSparkUnknown(filterInf,
dataMapFormat.getCarbonTable.getAbsoluteTableIdentifier, filterProcessor)
dataMapFormat.setFilterResolverIntf(filterInf)
IndexServer.getClient.getSplits(dataMapFormat).toList.asJava
}
LOGGER.info(s"Time taken to get response from server: $time ms")
resonse
}

/**
* Iterate over FiltersReslover,
* a. Change only RowLevelFilterResolverImpl because SparkUnkown is part of it
* and others FilterReslover like ConditionalFilterResolverImpl so directly return.
* b. Change SparkUnkownExpression to TrueExpression so that isScanRequired
* selects block/blocklet.
*
* @param filterInf FiltersReslover to be changed
* @param tableIdentifer AbsoluteTableIdentifier object
* @param filterProcessor changed FiltersReslover.
* @return
*/
def removeSparkUnknown(filterInf: FilterResolverIntf,
tableIdentifer: AbsoluteTableIdentifier,
filterProcessor: FilterExpressionProcessor): FilterResolverIntf = {
if (filterInf.isInstanceOf[LogicalFilterResolverImpl]) {
return new LogicalFilterResolverImpl(
removeSparkUnknown(filterInf.getLeft, tableIdentifer, filterProcessor),
removeSparkUnknown(filterInf.getRight, tableIdentifer, filterProcessor),
filterProcessor.removeUnknownExpression(filterInf.getFilterExpression).
asInstanceOf[BinaryExpression])
}
if (filterInf.isInstanceOf[RowLevelFilterResolverImpl] &&
filterInf.getFilterExecuterType == ExpressionType.UNKNOWN) {
return filterProcessor.changeUnknownResloverToTrue(tableIdentifer)
}
return filterInf;
}
}

/**
Expand Down

0 comments on commit 726a1bc

Please sign in to comment.