Skip to content

Commit

Permalink
[SPARK] Polishing plus tests
Browse files Browse the repository at this point in the history
closes elastic#569
relates elastic#572
  • Loading branch information
costin committed Oct 14, 2015
1 parent 00bd1ed commit a7d027a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 6 deletions.
Expand Up @@ -465,6 +465,20 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertEquals("feb", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown015NullSafeEqualTo() {
val df = esDataSource("pd_nullsafeequalto")
val filter = df.filter(df("airport").eqNullSafe("OTP"))
if (strictPushDown) {
assertEquals(0, filter.count())
// however if we change the arguments to be lower cased, it will be Spark who's going to filter out the data
return
}

assertEquals(1, filter.count())
assertEquals("feb", filter.select("tag").take(1)(0)(0))
}

@Test
def testDataSourcePushDown02GT() {
val df = esDataSource("pd_gt")
Expand Down
Expand Up @@ -148,6 +148,11 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
filter match {

case EqualTo(attribute, value) => {
// if we get a null, translate it into a missing query (we're extra careful - Spark should translate the equals into isMissing anyway)
if (value == null || value == None || value == Unit) {
return s"""{"missing":{"field":"$attribute"}}"""
}

if (strictPushDown) s"""{"term":{"$attribute":${extract(value)}}}"""
else s"""{"query":{"match":{"$attribute":${extract(value)}}}}"""
}
Expand Down Expand Up @@ -205,12 +210,6 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @

//
// [[EqualNullSafe]] Filter notes:
//
// To work this function clearly alone, null-safety check should be added and
// return a query string using "missing" filter (identically with [[IsNull]]).
// However, Spark does not pass [[EqualNullSafe]] filter having null value but
// instead [[IsNull]]. To make sure, we might have to add null-safety check logic
// here as well as String filter. For now, it works identical with [[EqualTo]].

case f:Product if isClass(f, "org.apache.spark.sql.sources.EqualNullSafe") => {
var arg = extract(f.productElement(1))
Expand Down

0 comments on commit a7d027a

Please sign in to comment.