Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception in SparkSQL when es.read.metadata=true #408

Closed
difin opened this issue Mar 30, 2015 · 6 comments
Closed

Exception in SparkSQL when es.read.metadata=true #408

difin opened this issue Mar 30, 2015 · 6 comments

Comments

@difin
Copy link

difin commented Mar 30, 2015

Hi,

elasticsearch-hadoop 1.2.0.BUILD_SNAPSHOT
spark 1.2.0

I would like to include documents' metadata (more specifically the _id field) in Spark SQL Select query. I didn't find in elasticsearch documentation how exactly to do that when using Spark SQL, but I've read that in order to include documents metadata fields in the response, one needs to set es.read.metadata=true. After doing that, the following Exception is thrown:

2015-03-30 10:53:20,046 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 1 in stage 0.0 failed 1 times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.util.NoSuchElementException: key not found: _index
    at scala.collection.MapLike$class.default(MapLike.scala:228)
    at scala.collection.AbstractMap.default(Map.scala:58)
    at scala.collection.MapLike$class.apply(MapLike.scala:141)
    at scala.collection.AbstractMap.apply(Map.scala:58)
    at org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:32)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaRowValueReader.scala:9)
    at org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaRowValueReader.scala:16)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:316)
    at org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:290)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186)
    at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165)
    at org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
    at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
    at org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
    at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.hasNext(InMemoryColumnarTableScan.scala:138)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.sql.SchemaRDD.compute(SchemaRDD.scala:120)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The code:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._

object Percolator
{
  def main(args: Array[String]) 
  {  
    val year = 2015;
    val month = 1;
    val day = 14;
    val hour = 17;
    val interval = 3;

    val sparkConf = new SparkConf().setAppName("Percolator")
    sparkConf.set("es.read.metadata", "true")

    val sc =  new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    sqlContext.sql("CREATE TEMPORARY TABLE INTERVALS    " +
                   "USING org.elasticsearch.spark.sql " +
                   "OPTIONS (resource 'events/intervals') " )

    sqlContext.cacheTable("INTERVALS")        

    val intervSchemaRDD = 
      sqlContext.sql("SELECT User, Year, Month, Day, Hour, DataStore, DbName, SchemaName, TableName, ColumnName, " +
                     "       0 as ResponseCount " +
                     "FROM INTERVALS " +
                     "WHERE Year     = " + year +
                     "AND   Month    = " + month +
                     "AND   Day      = " + day +
                     "AND   Hour     = " + hour +
                     "AND   Interval = " + interval )

    val mapRDD = intervSchemaRDD.map { x => Map( "User"          -> x(0), 
                                                 "Year"          -> x(1),
                                                 "Month"         -> x(2),
                                                 "Day"           -> x(3),
                                                 "Hour"          -> x(4),
                                                 "DataStore"     -> x(5),
                                                 "DbName"        -> x(6),
                                                 "SchemaName"    -> x(7),
                                                 "TableName"     -> x(8),
                                                 "ColumnName"    -> x(9),
                                                 "ResponseCount" -> x(10))}

    mapRDD.foreach(elem => print(elem + "\n\n"))
    mapRDD.saveToEs("events/new")
  }
}

Can you please advise if it is a bug or am I going in a wrong way to get the documents' metadata fields in the Spark SQL select query's output? What is the correct way to do that?

Also, can you please advise where to find the Mailing List for asking questions?

Thanks,
Dmitriy Fingerman

@costin
Copy link
Member

costin commented Mar 30, 2015

I'm not sure what causes the exception so that one looks like a bug.
The document id is read anyway since the returned RDD is a PairRDD with the id as the key and the document as its value. Which one of your selects fails? Does the presence of es.read.metadata influences the way the queries run?

As for the mailing list, you can find it in the resource page in the docs; I'll update the project README so it can be found easily.

@difin
Copy link
Author

difin commented Mar 30, 2015

Thanks for your response.
I am not sure which line of code of my program causes the exception because the exception stack trace doesn't show it, however, there is only one SELECT statement in the program. This happens only when the following line is present in the program: sparkConf.set("es.read.metadata", "true")

You mentioned that the document id is read anyway since the returned RDD is a PairRDD, but the result of SELECT statement is SchemaRDD[Row]. Can you please clarify about the PairRDD?

Thanks,
Dmitriy Fingerman

@costin
Copy link
Member

costin commented Apr 27, 2015

Hi,

Fixed this in master for Spark 1.3.Spark 1.2 will follow shortly.

costin added a commit that referenced this issue Apr 27, 2015
@costin
Copy link
Member

costin commented Apr 27, 2015

Addressed 1.2 as well. Just published a nightly build - please try them out.

@costin
Copy link
Member

costin commented Apr 28, 2015

Marking this is a close. Please open a new issue if the problem persists.

@costin costin closed this as completed Apr 28, 2015
@LeeYongL
Copy link

LeeYongL commented Nov 6, 2015

"java.util.NoSuchElementException: key not found: 1",this problem still occurs in spark 1.5.1. . I encountered it when using graph.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants