From 64270156e72b2ceefe6a7ace16fe28b557bad20c Mon Sep 17 00:00:00 2001 From: BJangir Date: Fri, 2 Feb 2018 16:33:45 +0530 Subject: [PATCH] [CARBONDATA-1454]false expression handling and block pruning for the wrong data --- .../filter/FilterExpressionProcessor.java | 3 +- .../core/scan/filter/FilterUtil.java | 3 + .../filter/executer/FalseFilterExecutor.java | 60 +++++++++++++++ .../scan/filter/intf/FilterExecuterType.java | 2 +- .../FalseConditionalResolverImpl.java | 61 +++++++++++++++ .../filterexpr/FilterProcessorTestCase.scala | 74 ++++++++++++++++++- .../spark/sql/CarbonBoundReference.scala | 4 + .../CastExpressionOptimization.scala | 60 ++++++++++++--- .../strategy/CarbonLateDecodeStrategy.scala | 2 + .../spark/sql/optimizer/CarbonFilters.scala | 4 + 10 files changed, 259 insertions(+), 14 deletions(-) create mode 100644 core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java create mode 100644 core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java index 5a1b7df2668..3e23aa340e3 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java @@ -63,6 +63,7 @@ import org.apache.carbondata.core.scan.filter.resolver.LogicalFilterResolverImpl; import org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl; import org.apache.carbondata.core.scan.filter.resolver.RowLevelRangeFilterResolverImpl; +import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.FalseConditionalResolverImpl; import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.TrueConditionalResolverImpl; import org.apache.carbondata.core.scan.partition.PartitionUtil; import org.apache.carbondata.core.scan.partition.Partitioner; @@ -398,7 +399,7 @@ private FilterResolverIntf getFilterResolverBasedOnExpressionType( ConditionalExpression condExpression = null; switch (filterExpressionType) { case FALSE: - return new RowLevelFilterResolverImpl(expression, false, false, tableIdentifier); + return new FalseConditionalResolverImpl(expression, false, false, tableIdentifier); case TRUE: return new TrueConditionalResolverImpl(expression, false, false, tableIdentifier); case EQUALS: diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java index 3268ca30d85..a08edc01b92 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java @@ -74,6 +74,7 @@ import org.apache.carbondata.core.scan.filter.executer.DimColumnExecuterFilterInfo; import org.apache.carbondata.core.scan.filter.executer.ExcludeColGroupFilterExecuterImpl; import org.apache.carbondata.core.scan.filter.executer.ExcludeFilterExecuterImpl; +import org.apache.carbondata.core.scan.filter.executer.FalseFilterExecutor; import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; import org.apache.carbondata.core.scan.filter.executer.ImplicitIncludeFilterExecutorImpl; import org.apache.carbondata.core.scan.filter.executer.IncludeColGroupFilterExecuterImpl; @@ -176,6 +177,8 @@ private static FilterExecuter createFilterExecuterTree( .getFilterRangeValues(segmentProperties), segmentProperties); case TRUE: return new TrueFilterExecutor(); + case FALSE: + return new FalseFilterExecutor(); case ROWLEVEL: default: return new RowLevelFilterExecuterImpl( diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java new file mode 100644 index 00000000000..2d2a15c5672 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/FalseFilterExecutor.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.filter.executer; + +import java.io.IOException; +import java.util.BitSet; + +import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.filter.intf.RowIntf; +import org.apache.carbondata.core.scan.processor.BlocksChunkHolder; +import org.apache.carbondata.core.util.BitSetGroup; + +/** + * API will apply filter based on resolver instance + * + * @return + * @throws FilterUnsupportedException + */ +public class FalseFilterExecutor implements FilterExecuter { + + @Override + public BitSetGroup applyFilter(BlocksChunkHolder blocksChunkHolder, boolean useBitsetPipeline) + throws FilterUnsupportedException, IOException { + int numberOfPages = blocksChunkHolder.getDataBlock().numberOfPages(); + BitSetGroup group = new BitSetGroup(numberOfPages); + for (int i = 0; i < numberOfPages; i++) { + BitSet set = new BitSet(); + group.setBitSet(set, i); + } + return group; + } + + @Override public boolean applyFilter(RowIntf value, int dimOrdinalMax) + throws FilterUnsupportedException, IOException { + return false; + } + + @Override public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue) { + + return new BitSet(); + } + + @Override public void readBlocks(BlocksChunkHolder blockChunkHolder) throws IOException { + // Do Nothing + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java index 42defc6d27c..d10b2e5be8a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/FilterExecuterType.java @@ -21,6 +21,6 @@ public enum FilterExecuterType implements Serializable { INCLUDE, EXCLUDE, OR, AND, RESTRUCTURE, ROWLEVEL, RANGE, ROWLEVEL_GREATERTHAN, - ROWLEVEL_GREATERTHAN_EQUALTO, ROWLEVEL_LESSTHAN_EQUALTO, ROWLEVEL_LESSTHAN, TRUE + ROWLEVEL_GREATERTHAN_EQUALTO, ROWLEVEL_LESSTHAN_EQUALTO, ROWLEVEL_LESSTHAN, TRUE, FALSE } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java new file mode 100644 index 00000000000..eccda1e7214 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/resolver/resolverinfo/FalseConditionalResolverImpl.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.scan.filter.resolver.resolverinfo; + +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.filter.TableProvider; +import org.apache.carbondata.core.scan.filter.intf.FilterExecuterType; +import org.apache.carbondata.core.scan.filter.resolver.ConditionalFilterResolverImpl; + +/* The expression with If FALSE will be resolved setting empty bitset. */ +public class FalseConditionalResolverImpl extends ConditionalFilterResolverImpl { + + private static final long serialVersionUID = 4599541011924324041L; + + public FalseConditionalResolverImpl(Expression exp, boolean isExpressionResolve, + boolean isIncludeFilter, AbsoluteTableIdentifier tableIdentifier) { + super(exp, isExpressionResolve, isIncludeFilter, tableIdentifier, false); + } + + @Override public void resolve(AbsoluteTableIdentifier absoluteTableIdentifier, + TableProvider tableProvider) { + } + + /** + * This method will provide the executer type to the callee inorder to identify + * the executer type for the filter resolution, False Expresssion willl not execute anything. + * it will return empty bitset + */ + @Override public FilterExecuterType getFilterExecuterType() { + return FilterExecuterType.FALSE; + } + + /** + * Method will the read filter expression corresponding to the resolver. + * This method is required in row level executer inorder to evaluate the filter + * expression against spark, as mentioned above row level is a special type + * filter resolver. + * + * @return Expression + */ + public Expression getFilterExpresion() { + return exp; + } + +} + diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala index d54906f8acb..147756f6fff 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/FilterProcessorTestCase.scala @@ -17,9 +17,9 @@ package org.apache.carbondata.spark.testsuite.filterexpr -import java.sql.Timestamp +import java.sql.{Date, Timestamp} -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -132,6 +132,11 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll { sql(s"""LOAD DATA INPATH '$resourcesPath/big_int_Decimal.csv' INTO TABLE big_int_basicc_1 options ('DELIMITER'=',', 'QUOTECHAR'='\"', 'COMPLEX_DELIMITER_LEVEL_1'='$$','COMPLEX_DELIMITER_LEVEL_2'=':', 'FILEHEADER'= '')""") sql(s"load data local inpath '$resourcesPath/big_int_Decimal.csv' into table big_int_basicc_Hive") sql(s"load data local inpath '$resourcesPath/big_int_Decimal.csv' into table big_int_basicc_Hive_1") + + sql("create table if not exists date_test(name String, age int, dob date,doj timestamp) stored by 'carbondata' ") + sql("insert into date_test select 'name1',12,'2014-01-01','2014-01-01 00:00:00' ") + sql("insert into date_test select 'name2',13,'2015-01-01','2015-01-01 00:00:00' ") + sql("insert into date_test select 'name3',14,'2016-01-01','2016-01-01 00:00:00' ") } test("Is not null filter") { @@ -287,6 +292,70 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table if exists outofrange") } + test("check invalid date value") { + val df=sql("select * from date_test where dob=''") + assert(df.count()==0,"Wrong data are displayed on invalid date ") + } + + test("check invalid date with and filter value ") { + val df=sql("select * from date_test where dob='' and age=13") + assert(df.count()==0,"Wrong data are displayed on invalid date ") + } + + test("check invalid date with or filter value ") { + val df=sql("select * from date_test where dob='' or age=13") + checkAnswer(df,Seq(Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")))) + } + + test("check invalid date Geaterthan filter value ") { + val df=sql("select * from date_test where doj > '0' ") + checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")), + Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")), + Row("name3",14,Date.valueOf("2016-01-01"),Timestamp.valueOf("2016-01-01 00:00:00.0")))) + } + test("check invalid date Geaterthan and lessthan filter value ") { + val df=sql("select * from date_test where doj > '0' and doj < '2015-01-01' ") + checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")))) + } + test("check invalid date Geaterthan or lessthan filter value ") { + val df=sql("select * from date_test where doj > '0' or doj < '2015-01-01' ") + checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")), + Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")), + Row("name3",14,Date.valueOf("2016-01-01"),Timestamp.valueOf("2016-01-01 00:00:00.0")))) + } + + test("check invalid timestamp value") { + val df=sql("select * from date_test where dob=''") + assert(df.count()==0,"Wrong data are displayed on invalid timestamp ") + } + + test("check invalid timestamp with and filter value ") { + val df=sql("select * from date_test where doj='' and age=13") + assert(df.count()==0,"Wrong data are displayed on invalid timestamp ") + } + + test("check invalid timestamp with or filter value ") { + val df=sql("select * from date_test where doj='' or age=13") + checkAnswer(df,Seq(Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")))) + } + + test("check invalid timestamp Geaterthan filter value ") { + val df=sql("select * from date_test where doj > '0' ") + checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")), + Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")), + Row("name3",14,Date.valueOf("2016-01-01"),Timestamp.valueOf("2016-01-01 00:00:00.0")))) + } + test("check invalid timestamp Geaterthan and lessthan filter value ") { + val df=sql("select * from date_test where doj > '0' and doj < '2015-01-01 00:00:00' ") + checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")))) + } + test("check invalid timestamp Geaterthan or lessthan filter value ") { + val df=sql("select * from date_test where doj > '0' or doj < '2015-01-01 00:00:00' ") + checkAnswer(df,Seq(Row("name1",12,Date.valueOf("2014-01-01"),Timestamp.valueOf("2014-01-01 00:00:00.0")), + Row("name2",13,Date.valueOf("2015-01-01"),Timestamp.valueOf("2015-01-01 00:00:00.0")), + Row("name3",14,Date.valueOf("2016-01-01"),Timestamp.valueOf("2016-01-01 00:00:00.0")))) + } + test("like% test case with restructure") { sql("drop table if exists like_filter") sql( @@ -319,6 +388,7 @@ class FilterProcessorTestCase extends QueryTest with BeforeAndAfterAll { sql("DROP TABLE IF EXISTS filtertestTablesWithNullJoin") sql("drop table if exists like_filter") CompactionSupportGlobalSortBigFileTest.deleteFile(file1) + sql("drop table if exists date_test") CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala index a0433421b90..aa650e0b7b5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonBoundReference.scala @@ -29,6 +29,10 @@ case class CastExpr(expr: Expression) extends Filter { override def references: Array[String] = null } +case class FalseExpr() extends Filter { + override def references: Array[String] = null +} + case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nullable: Boolean) extends LeafExpression with NamedExpression with CodegenFallback { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala index 2ff8c428814..2de3fe69368 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} import org.apache.spark.sql.CastExpr +import org.apache.spark.sql.FalseExpr import org.apache.spark.sql.sources import org.apache.spark.sql.types._ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} @@ -48,7 +49,7 @@ object CastExpressionOptimization { CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) parser.setTimeZone(TimeZone.getTimeZone("GMT")) } else { - throw new UnsupportedOperationException ("Unsupported DataType being evaluated.") + throw new UnsupportedOperationException("Unsupported DataType being evaluated.") } try { val value = parser.parse(v.toString).getTime() * 1000L @@ -123,6 +124,7 @@ object CastExpressionOptimization { tempList.asScala } } + /** * This routines tries to apply rules on Cast Filter Predicates and if the rules applied and the * values can be toss back to native datatypes the cast is removed. Current two rules are applied @@ -238,7 +240,7 @@ object CastExpressionOptimization { case c@GreaterThan(Cast(a: Attribute, _), Literal(v, t)) => a.dataType match { case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => - updateFilterForTimeStamp(v, c, ts) + updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts)) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -248,7 +250,7 @@ object CastExpressionOptimization { case c@GreaterThan(Literal(v, t), Cast(a: Attribute, _)) => a.dataType match { case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => - updateFilterForTimeStamp(v, c, ts) + updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts)) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -258,7 +260,7 @@ object CastExpressionOptimization { case c@LessThan(Cast(a: Attribute, _), Literal(v, t)) => a.dataType match { case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => - updateFilterForTimeStamp(v, c, ts) + updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts)) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -268,7 +270,7 @@ object CastExpressionOptimization { case c@LessThan(Literal(v, t), Cast(a: Attribute, _)) => a.dataType match { case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => - updateFilterForTimeStamp(v, c, ts) + updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts)) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -278,7 +280,7 @@ object CastExpressionOptimization { case c@GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) => a.dataType match { case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => - updateFilterForTimeStamp(v, c, ts) + updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts)) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -288,7 +290,7 @@ object CastExpressionOptimization { case c@GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => a.dataType match { case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => - updateFilterForTimeStamp(v, c, ts) + updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts)) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -298,7 +300,7 @@ object CastExpressionOptimization { case c@LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) => a.dataType match { case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => - updateFilterForTimeStamp(v, c, ts) + updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts)) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -308,7 +310,7 @@ object CastExpressionOptimization { case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => a.dataType match { case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => - updateFilterForTimeStamp(v, c, ts) + updateFilterForNonEqualTimeStamp(v, c, updateFilterForTimeStamp(v, c, ts)) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -320,6 +322,7 @@ object CastExpressionOptimization { /** * the method removes the cast for short type columns + * * @param actualValue * @param exp * @return @@ -349,6 +352,41 @@ object CastExpressionOptimization { } } + /** + * + * @param actualValue actual value of filter + * @param exp expression + * @param filter Filter Expression + * @return return CastExpression or same Filter + */ + def updateFilterForNonEqualTimeStamp(actualValue: Any, exp: Expression, filter: Option[Filter]): + Option[sources.Filter] = { + filter.get match { + case FalseExpr() if (validTimeComparisionForSpark(actualValue)) => + Some(CastExpr(exp)) + case _ => + filter + } + } + + /** + * Spark compares data based on double also. + * Ex. slect * ...where time >0 , this will return all data + * So better give to Spark as Cast Expression. + * + * @param numericTimeValue + * @return if valid double return true,else false + */ + def validTimeComparisionForSpark(numericTimeValue: Any): Boolean = { + try { + numericTimeValue.toString.toDouble + true + } catch { + case _ => false + } + } + + /** * the method removes the cast for timestamp type columns * @@ -362,10 +400,12 @@ object CastExpressionOptimization { if (!newValue.equals(actualValue)) { updateFilterBasedOnFilterType(exp, newValue) } else { - Some(CastExpr(exp)) + Some(FalseExpr()) } + } + /** * the method removes the cast for the respective filter type * diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 4b1d11b7c2c..544c4942b84 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -616,6 +616,8 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { Some(CarbonEndsWith(c)) case c@Contains(a: Attribute, Literal(v, t)) => Some(CarbonContainsWith(c)) + case c@Literal(v, t) if (v == null) => + Some(FalseExpr()) case others => None } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index 4d913757cc7..c7767cebe56 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -135,6 +135,8 @@ object CarbonFilters { })) case CastExpr(expr: Expression) => Some(transformExpression(expr)) + case FalseExpr() => + Some(new FalseExpression(null)) case _ => None } } @@ -269,6 +271,8 @@ object CarbonFilters { Some(CarbonContainsWith(c)) case c@Cast(a: Attribute, _) => Some(CastExpr(c)) + case c@Literal(v, t) if v == null => + Some(FalseExpr()) case others => if (!or) { others.collect {