Closed
Description
I have some docs that looks like this:
{"@id":"2014-10-02-client_(select(0)from(select(sleep(4)))v)/*\u0027+(select(0)from(select(sleep(4)))v)+\u0027\"+(select(0)from(select(sleep(4)))v)+\"*/-d6c81da88d482fa636e2cc5b19abb3a6","@key":"client_(select(0)from(select(sleep(4)))v)/*\u0027+(select(0)from(select(sleep(4)))v)+\u0027\"+(select(0)from(select(sleep(4)))v)+\"*/","@timestamp":"2014-10-02","@value":1}
{"@id":"2014-10-02-client_(select(0)from(select(sleep(9)))v)/*\u0027+(select(0)from(select(sleep(9)))v)+\u0027\"+(select(0)from(select(sleep(9)))v)+\"*/-caba33866b9b6c36c0791fd8f622802a","@key":"client_(select(0)from(select(sleep(9)))v)/*\u0027+(select(0)from(select(sleep(9)))v)+\u0027\"+(select(0)from(select(sleep(9)))v)+\"*/","@timestamp":"2014-10-02","@value":1}
{"@id":"2014-10-02-client_(select(0)from(select(sleep(15)))v)/*\u0027+(select(0)from(select(sleep(15)))v)+\u0027\"+(select(0)from(select(sleep(15)))v)+\"*/-a1ae7275a600d2b9b77c1a35cf2a2203","@key":"client_(select(0)from(select(sleep(15)))v)/*\u0027+(select(0)from(select(sleep(15)))v)+\u0027\"+(select(0)from(select(sleep(15)))v)+\"*/","@timestamp":"2014-10-02","@value":1}
They are actually garbage that some malware generated, but that's not the point.
The point is that es-hadoop on top of spark+mesos breaks when I index these docs:
JavaEsSpark.saveJsonToEs(compactedRDD, output + "/" + type, ImmutableMap.of("es.mapping.id", "@id"));
When indexing with curl one by one does not produce any errors:
web245 ~ # curl -X PUT http://web245:9200/wtf/events/hey?pretty -d '{"@id":"2014-10-02-client_(select(0)from(select(sleep(4)))v)/*\u0027+(select(0)from(select(sleep(4)))v)+\u0027"+(select(0)from(select(sleep(4)))v)+\"*/-d6c81da88d482fa636e2cc5b19abb3a6","@key":"client_(select(0)from(select(sleep(4)))v)/*\u0027+(select(0)from(select(sleep(4)))v)+\u0027\"+(select(0)from(select(sleep(4)))v)+\"*/","@timestamp":"2014-10-02","@value":1}'
{
"_index" : "wtf",
"_type" : "events",
"_id" : "hey",
"_version" : 1,
"created" : true
}
web245 ~ # curl -X PUT http://web245:9200/wtf/events/hey2?pretty -d '{"@id":"2014-10-02-client_(select(0)from(select(sleep(9)))v)/*\u0027+(select(0)from(select(sleep(9)))v)+\u0027\"+(select(0)from(select(sleep(9)))v)+\"*/-caba33866b9b6c36c0791fd8f622802a","@key":"client_(select(0)from(select(sleep(9)))v)/*\u0027+(select(0)from(select(sleep(9)))v)+\u0027\"+(select(0)from(select(sleep(9)))v)+\"*/","@timestamp":"2014-10-02","@value":1}'
{
"_index" : "wtf",
"_type" : "events",
"_id" : "hey2",
"_version" : 1,
"created" : true
}
web245 ~ # curl -X PUT http://web245:9200/wtf/events/hey3?pretty -d '{"@id":"2014-10-02-client_(select(0)from(select(sleep(15)))v)/*\u0027+(select(0)from(select(sleep(15)))v)+\u0027\"+(select(0)from(select(sleep(15)))v)+\"*/-a1ae7275a600d2b9b77c1a35cf2a2203","@key":"client_(select(0)from(select(sleep(15)))v)/*\u0027+(select(0)from(select(sleep(15)))v)+\u0027\"+(select(0)from(select(sleep(15)))v)+\"*/","@timestamp":"2014-10-02","@value":1}'
{
"_index" : "wtf",
"_type" : "events",
"_id" : "hey3",
"_version" : 1,
"created" : true
}
There is probably some error in bulk indexing code.
The errors look like this:
14/10/26 14:18:15 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 2.0 (TID 12, web365): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: JsonParseException[Unexpected character ('+' (code 43)): was expecting comma to separate OBJECT entries
at [Source: [B@5b7a8bc; line: 1, column: 112]]; fragment[d":"2014-10-02-clien]
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:322)
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:299)
org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:149)
org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:199)
org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:223)
org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236)
org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:124)
org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33)
org.apache.spark.TaskContext$$anon$2.onTaskCompletion(TaskContext.scala:99)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:107)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
14/10/26 14:18:16 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 2.0 (TID 13) in 680 ms on web345 (3/5)
14/10/26 14:18:21 INFO scheduler.TaskSetManager: Starting task 3.1 in stage 2.0 (TID 14, web562, ANY, 997 bytes)
14/10/26 14:18:21 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 15, web169, ANY, 997 bytes)
14/10/26 14:18:29 INFO storage.BlockManagerMasterActor: Registering block manager web562:40494 with 1060.3 MB RAM
14/10/26 14:18:31 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on web562:40494 (size: 2.4 KB, free: 1060.3 MB)
14/10/26 14:18:32 INFO storage.BlockManagerMasterActor: Registering block manager web169:38585 with 1060.3 MB RAM
14/10/26 14:18:32 WARN scheduler.TaskSetManager: Lost task 3.1 in stage 2.0 (TID 14, web562): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: JsonParseException[Unexpected character ('+' (code 43)): was expecting comma to separate OBJECT entries
at [Source: [B@ee220ae; line: 1, column: 112]]; fragment[d":"2014-10-02-clien]
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:322)
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:299)
org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:149)
org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:199)
org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:223)
org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236)
org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:124)
org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33)
org.apache.spark.TaskContext$$anon$2.onTaskCompletion(TaskContext.scala:99)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:107)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
14/10/26 14:18:32 INFO scheduler.TaskSetManager: Starting task 3.2 in stage 2.0 (TID 16, web305, ANY, 997 bytes)
14/10/26 14:18:32 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on web305:46567 (size: 2.4 KB, free: 1060.3 MB)
14/10/26 14:18:32 WARN scheduler.TaskSetManager: Lost task 3.2 in stage 2.0 (TID 16, web305): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: JsonParseException[Unexpected character ('+' (code 43)): was expecting comma to separate OBJECT entries
at [Source: [B@33438f64; line: 1, column: 112]]; fragment[d":"2014-10-02-clien]
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:322)
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:299)
org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:149)
org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:199)
org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:223)
org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236)
org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:124)
org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33)
org.apache.spark.TaskContext$$anon$2.onTaskCompletion(TaskContext.scala:99)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:107)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
14/10/26 14:18:32 INFO scheduler.TaskSetManager: Starting task 3.3 in stage 2.0 (TID 17, web365, PROCESS_LOCAL, 997 bytes)
14/10/26 14:18:32 WARN scheduler.TaskSetManager: Lost task 3.3 in stage 2.0 (TID 17, web365): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: JsonParseException[Unexpected character ('+' (code 43)): was expecting comma to separate OBJECT entries
at [Source: [B@356ce39f; line: 1, column: 112]]; fragment[d":"2014-10-02-clien]
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:322)
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:299)
org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:149)
org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:199)
org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:223)
org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236)
org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:124)
org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33)
org.apache.spark.TaskContext$$anon$2.onTaskCompletion(TaskContext.scala:99)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:107)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
14/10/26 14:18:32 ERROR scheduler.TaskSetManager: Task 3 in stage 2.0 failed 4 times; aborting job
14/10/26 14:18:32 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2
14/10/26 14:18:32 INFO scheduler.DAGScheduler: Could not cancel tasks for stage 2
java.lang.UnsupportedOperationException
at org.apache.spark.scheduler.SchedulerBackend$class.killTask(SchedulerBackend.scala:32)
at org.apache.spark.scheduler.cluster.mesos.MesosSchedulerBackend.killTask(MesosSchedulerBackend.scala:41)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply$mcVJ$sp(TaskSchedulerImpl.scala:194)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:192)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3$$anonfun$apply$1.apply(TaskSchedulerImpl.scala:192)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:192)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$cancelTasks$3.apply(TaskSchedulerImpl.scala:185)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.TaskSchedulerImpl.cancelTasks(TaskSchedulerImpl.scala:185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply$mcVI$sp(DAGScheduler.scala:1211)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1197)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages$1.apply(DAGScheduler.scala:1197)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1197)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/10/26 14:18:34 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on web169:38585 (size: 2.4 KB, free: 1060.3 MB)
14/10/26 14:18:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 2.0 (TID 15, web169): org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: JsonParseException[Unexpected character ('+' (code 43)): was expecting comma to separate OBJECT entries
at [Source: [B@350555c9; line: 1, column: 112]]; fragment[eep(4)))v)+'"+(selec]
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:322)
org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:299)
org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:149)
org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:199)
org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:223)
org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:236)
org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:124)
org.elasticsearch.spark.rdd.EsRDDWriter$$anonfun$write$1.apply$mcV$sp(EsRDDWriter.scala:33)
org.apache.spark.TaskContext$$anon$2.onTaskCompletion(TaskContext.scala:99)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
org.apache.spark.TaskContext$$anonfun$markTaskCompleted$1.apply(TaskContext.scala:107)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
org.apache.spark.TaskContext.markTaskCompleted(TaskContext.scala:107)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:64)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
14/10/26 14:18:34 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
and spark hangs after that (it should exit, I suppose).
This also reminds me about #217, because it is very hard to tell what is wrong looking at the error messages.
I'm on spark 1.1.0 and es-hadoop 2.1.Beta2.