Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ArrayIndexOutOfBoundsException on Spark SQL with 2.1.0.rc1 #482

Closed
sjoerdmulder opened this issue Jun 23, 2015 · 15 comments
Closed

ArrayIndexOutOfBoundsException on Spark SQL with 2.1.0.rc1 #482

sjoerdmulder opened this issue Jun 23, 2015 · 15 comments

Comments

@sjoerdmulder
Copy link

Using ES-hadoop 2.1.0.rc1, Spark 1.4.0. Elasticsearch 1.6.0

The ES index that we use contains various events with a variaty of fields but the (custom) schema that we defined has the "common" fields that the SQL query will use.

Somehow it still tries to map a field that is not in the schema nor used in the SQL causing a ArrayIndexOutOfBoundsException since the indexOf is returning -1

2015-06-23T17:09:28.060+0200 ERROR Executor.logError() - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ArrayIndexOutOfBoundsException: -1
    at scala.collection.mutable.ResizableArray$class.update(ResizableArray.scala:49)
    at scala.collection.mutable.ArrayBuffer.update(ArrayBuffer.scala:48)
    at org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:29)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaEsRowValueReader.scala:13)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaEsRowValueReader.scala:39)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
    at org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:636)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:559)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:358)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:293)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:188)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:167)
    at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
    at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:413)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:369)
    at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$7.apply(Aggregate.scala:154)
    at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$7.apply(Aggregate.scala:149)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
@costin
Copy link
Member

costin commented Jun 23, 2015

@sjoerdmulder I'm having issues following the report. Can you please provide an actual reproducible example?
What are you trying to do in Spark and what is the custom mapping you are mentioning here? Do note that the connector doesn't use the schema defined in Spark rather the one in Elastic.
In other words, if you define a "common" schema (whatever that means) in Spark SQL, that is used by Spark alone and not the connector which simply relies on the actual index mapping.

@costin
Copy link
Member

costin commented Jun 23, 2015

Oh, and please turn on logging (see the docs) to TRACE in the rest package and upload them somewhere; this provides useful information in diagnosing the problem.

@sjoerdmulder
Copy link
Author

@costin Thanks for the quick response, ignore the initial mapping story :) I cannot provide you an example. But have investigated the problem:

In our data the actual field in ES was always set to an empty array. This doesn't seem to update the mapping in ES. But it does exist in the document, so then ES Spark will try to detect the mapping for a field that doesn't have a mapping in ES.

When adding a value in the array the mapping is updated and the ES spark query works.

@costin
Copy link
Member

costin commented Jun 23, 2015

I see - so if I understand correctly, you have a DataFrame with a schema that is not fully available in Elasticsearch right? If that's the case then yes, things will not work simply because the connector expects the mapping in ES to exist prior to running any query.

I'd still like to understand how you ended up in this situation so maybe while you cannot replicate your environment, provide a sample Spark script just so I understand better the situation?
Thanks,

@sjoerdmulder
Copy link
Author

Here you go

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark._

object ESMappingIssue {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ESMappingIssue").setMaster("local[1]")

    val sc = new SparkContext(conf)

    val json = """{"foo" : 5, "nested": { "bar" : [], "what": "now" } }"""

    sc.makeRDD(Seq(json)).saveJsonToEs("spark/mappingtest")

    val df = new SQLContext(sc).read.format("org.elasticsearch.spark.sql").load("spark/mappingtest")
    df.collect().foreach(println)

    sc.stop()
  }

}

@jeffsteinmetz
Copy link

the java.lang.IndexOutOfBoundsException also occurs if the "bar" array contains valid json objects.
A pattern we use in many of our documents in production.

val json = """{"foo" : 5, "nested": { "bar" : [{"test":"1", "age":20},{"test":"2", "age":21}], "what": "now" } }"""

https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-array-type.html

@amajumdarinfa
Copy link

Hi Costin,

We are getting the same java.lang.ArrayIndexOutOfBoundsException. We are using elasticsearch-hadoop-2.1.0.jar along with Spark 1.3 and ElasticSearch 1.4.4.

It turned out that the one of the key inside a nested object in the JSON document was empty string and that caused the issue.

For example: the document was something like

         {
                   "EventUid": "-581640745",
                   "TimeOfEventUTC": "2015-09-18-03-09-49",
                   "User": {
                      "User": "asmith@info.com",
                      "UserGroup": "",
                      "Location": "San Francisco",
                      "Department": "Sales"
                   }
        }

Note that the User.UserGroup was empty. If it is non-empty, spark-sql query works fine. This is similar to the empty array (bar) in the example given by sjoerdmulder.

As a work-around we are keeping a default value for all the keys, but my question is --

Is there any known solution / work-around for this?

@jeffsteinmetz
Copy link

Checking in to see if this was replicated. We've been unable to use SQLContext due to the issues outlined by the helpful @sjoerdmulder and others.

@jeffsteinmetz
Copy link

is this related to: #484
?

@costin
Copy link
Member

costin commented Oct 15, 2015

It looks like it might be yes. Have you tried the latest dev builds?

@jeffsteinmetz
Copy link

Just tried the latest dev build. The ArrayIndexOutOfBoundsException no longer occurs.
However, It is now throwing this error:

scala.MatchError: [Buffer([20,2015-01-01 00:00:00.0], [20,2015-01-01 00:00:00.0]),now] (of class org.elasticsearch.spark.sql.ScalaEsRow)

Here is the scala test that I'm running:

    val conf = new SparkConf()
      .setAppName("essql")
      .setMaster("local")
    conf.set("es.nodes", "localhost")
    conf.set("es.input.json", "true")
    conf.set("es.index.auto.create", "true")
    conf.set("es.field.read.as.array.include", "nested.bar")

    val sc = new SparkContext(conf)
    val json = """{"foo" : 5, "nested": { "bar" : [{"date":"2015-01-01", "age":20},{"date":"2015-01-01", "age":20}], "what": "now" } }"""
    sc.makeRDD(Seq(json)).saveJsonToEs("spark/mappingtest")
    val df = new SQLContext(sc).read.format("org.elasticsearch.spark.sql").load("spark/mappingtest")
    println(df.collect().toList)
    sc.stop()

@jeffsteinmetz
Copy link

btw, this works

conf.set("es.field.read.as.array.include", "bar")

HOWEVER, this can be broken as well. Fully qualified field names using dot notation similar to field name filters would be more robust, to avoid ambiguous field names, since this then breaks it.

val json = """{"bar" : 5, "nested": { "bar" : [{"date":"2015-01-01", "age":20},{"date":"2015-01-01", "age":20}], "what": "now" } }"""

costin added a commit that referenced this issue Oct 18, 2015
Refactor logic to use absolute field names as the relative name are
prone to overwrites
Improve field filtering to allow partial or strict matching

relates #482 #484
@costin
Copy link
Member

costin commented Oct 18, 2015

@jeffsteinmetz I've refactored the logic and pushed the updates into master and also published the new dev builds.
Can you please try them out and report back? Thanks!

@costin costin added the v2.1.2 label Oct 18, 2015
costin added a commit that referenced this issue Oct 18, 2015
Refactor logic to use absolute field names as the relative name are
prone to overwrites
Improve field filtering to allow partial or strict matching

relates #482 #484

(cherry picked from commit 6222363)
@jeffsteinmetz
Copy link

Confirmed. Thank you. This worked (I tried multiple array fields):

    val sc = new SparkContext(conf)
    val cfg = collection.mutable.Map("es.field.read.as.array.include" -> "nested.bar, foo")
    val json = """{"foo" : [5,6], "nested": { "bar" : [{"date":"2015-01-01", "age":20},{"date":"2015-01-01", "age":20}], "what": "now" } }"""
    sc.makeRDD(Seq(json)).saveJsonToEs("spark/mappingtest")
    val df = new SQLContext(sc).read
      .options(cfg)
      .format("org.elasticsearch.spark.sql")
      .load("spark/mappingtest")
    println(df.collect().toList)

@costin
Copy link
Member

costin commented Oct 28, 2015

Closing the issue...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants