New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with Spark 1.5.1: es-hadoop "Connection error (check network and/or proxy settings)- all nodes failed" #591

Closed
ArcTheMaster opened this Issue Nov 4, 2015 · 16 comments

Comments

Projects
None yet
6 participants
@ArcTheMaster

ArcTheMaster commented Nov 4, 2015

Hi,
I compiled Spark 1.5.1 with Hive and SparkR with the following command:

mvn -Pyarn -Phive -Phive-thriftserver -PsparkR -DskipTests -X clean package

After its installation, the file "hive-site.xml" has been added in Spark's conf directory (this is not an hard copy, it's a symbolic link). I also created a new directory in Spark which includes the elasticsearch-hadoop.jar connector ($SPARK_HOME/elastic/jar/elasticsearch-hadoop-2.1.1.jar).

In order to start the spark-shell with the connector, I'm using the following command:

spark-shell -v --jars /opt/application/Spark/current/elastic/jar/elasticsearch-hadoop-2.1.1.jar --name elasticsearch-hadoop --master yarn-client --conf spark.es.net.ssl=true --conf spark.es.net.http.auth.user=username --conf spark.es.net.http.auth.pass=password --conf spark.es.nodes=loadbalancer.fr --conf spark.es.port=9200 --conf spark.es.field.read.empty.as.null=true --conf spark.es.resource=index/type

Once Spark fully loaded and the SparkContext properly created, I can successfuly do:

scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._

scala> val RDD = sc.esRDD("index/type/", "?q=Ge*")
RDD: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[0] at RDD at AbstractEsRDD.scala:17

scala> RDD.count
res1: Long = 39

The result returned by "RDD.count" is correct and is the same as the one I can get when I do a curl request

curl -XGET -k "https://loadbalancer.fr:9200/orange/bank/_search?q=Ge*&pretty" -u username:password

Nevertheless, if I try to get the document sources (with the es-hadoop connector), I'm getting the following errors:

scala> RDD.collect().foreach(println)
[Stage 0:>                                                          (0 + 2) / 5]15/11/04 09:43:17 ERROR TaskSetManager: Task 1 in stage 0.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 9, node07.fr): org.apache.spark.util.TaskCompletionListenerException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[node01.fr:9200, loadbalancer.fr:9200, node02.fr:9200, node03.fr:9200, node04.fr:9200, node05.fr:9200, node06.fr:9200, node07.fr:9200]]
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:90)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:21)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:26)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:28)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $iwC$$iwC$$iwC.<init>(<console>:34)
        at $iwC$$iwC.<init>(<console>:36)
        at $iwC.<init>(<console>:38)
        at <init>(<console>:40)
        at .<init>(<console>:44)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.util.TaskCompletionListenerException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[node01.fr:9200, loadbalancer.fr:9200, node02.fr:9200, node03.fr:9200, node04.fr:9200, node05.fr:9200, node06.fr:9200, node07.fr:9200]]
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:90)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

This issue also happens when the "hive-site.xml" is deleted. I can reproduce the issue at 100%. The only way to get the document sources is to deactivate SSL and the authentication through Searchguard. You'll understand that this solution is an unacceptable option.

Also important, there is nothing in the logs related to this error, neither in the loadbalancer nor in the nodes.

Note: I also tried to get the sources with the last beta connector and the result is the same.

Here are the specs of my environment:

  • Red Hat ES 6.7 x86_64
  • Spark 1.5.1, Scala 2.10.4, Java 1.7.0_85, Hive 1.2.1
  • Global authentication done through Kerberos
  • Elasticsearch 1.6.2, Lucene 4.10.4
  • SSL activated and authentication done through searchguard
@vanjor

This comment has been minimized.

Show comment
Hide comment
@vanjor

vanjor Nov 6, 2015

Me too, using hive 1.2.1 loading data from hive to ES,

Hadoop job throws error, and stopped:

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[10.1.23.134:9200, es.op.koudai.com:9200, 10.1.23.132:9200, 10.1.23.131:9200, 10.1.23.130:9200, 10.1.23.133:9200]]

2015-11-06 15:40:55,230 ERROR [main] org.elasticsearch.hadoop.rest.NetworkClient: Node [Read timed out] failed (10.1.23.133:9200); no other nodes left - aborting...
2015-11-06 15:40:55,259 FATAL [main] org.apache.hadoop.hive.ql.exec.mr.ExecMapper: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"user_id":"492923825","is_register":null,"register_time":null"}
    at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:518)
    at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:163)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[10.1.23.134:9200, es.op.koudai.com:9200, 10.1.23.132:9200, 10.1.23.131:9200, 10.1.23.130:9200, 10.1.23.133:9200]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:317)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:313)
    at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:150)
    at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:209)
    at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:232)
    at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:185)
    at org.elasticsearch.hadoop.rest.RestRepository.writeProcessedToIndex(RestRepository.java:164)
    at org.elasticsearch.hadoop.hive.EsHiveOutputFormat$EsHiveRecordWriter.write(EsHiveOutputFormat.java:63)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:753)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837)
    at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:88)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837)
    at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:97)
    at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:162)
    at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:508)
    ... 9 more

2015-11-06 15:40:55,259 INFO [main] org.apache.hadoop.hive.ql.exec.MapOperator: 3 finished. closing... 
2015-11-06 15:40:55,259 INFO [main] org.apache.hadoop.hive.ql.exec.MapOperator: RECORDS_IN:879999
2015-11-06 15:40:55,259 INFO [main] org.apache.hadoop.hive.ql.exec.MapOperator: DESERIALIZE_ERRORS:0

vanjor commented Nov 6, 2015

Me too, using hive 1.2.1 loading data from hive to ES,

Hadoop job throws error, and stopped:

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[10.1.23.134:9200, es.op.koudai.com:9200, 10.1.23.132:9200, 10.1.23.131:9200, 10.1.23.130:9200, 10.1.23.133:9200]]

2015-11-06 15:40:55,230 ERROR [main] org.elasticsearch.hadoop.rest.NetworkClient: Node [Read timed out] failed (10.1.23.133:9200); no other nodes left - aborting...
2015-11-06 15:40:55,259 FATAL [main] org.apache.hadoop.hive.ql.exec.mr.ExecMapper: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row {"user_id":"492923825","is_register":null,"register_time":null"}
    at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:518)
    at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.map(ExecMapper.java:163)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
Caused by: org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[10.1.23.134:9200, es.op.koudai.com:9200, 10.1.23.132:9200, 10.1.23.131:9200, 10.1.23.130:9200, 10.1.23.133:9200]] 
    at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:317)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:313)
    at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:150)
    at org.elasticsearch.hadoop.rest.RestRepository.tryFlush(RestRepository.java:209)
    at org.elasticsearch.hadoop.rest.RestRepository.flush(RestRepository.java:232)
    at org.elasticsearch.hadoop.rest.RestRepository.doWriteToIndex(RestRepository.java:185)
    at org.elasticsearch.hadoop.rest.RestRepository.writeProcessedToIndex(RestRepository.java:164)
    at org.elasticsearch.hadoop.hive.EsHiveOutputFormat$EsHiveRecordWriter.write(EsHiveOutputFormat.java:63)
    at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:753)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837)
    at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:88)
    at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:837)
    at org.apache.hadoop.hive.ql.exec.TableScanOperator.process(TableScanOperator.java:97)
    at org.apache.hadoop.hive.ql.exec.MapOperator$MapOpCtx.forward(MapOperator.java:162)
    at org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:508)
    ... 9 more

2015-11-06 15:40:55,259 INFO [main] org.apache.hadoop.hive.ql.exec.MapOperator: 3 finished. closing... 
2015-11-06 15:40:55,259 INFO [main] org.apache.hadoop.hive.ql.exec.MapOperator: RECORDS_IN:879999
2015-11-06 15:40:55,259 INFO [main] org.apache.hadoop.hive.ql.exec.MapOperator: DESERIALIZE_ERRORS:0
@ArcTheMaster

This comment has been minimized.

Show comment
Hide comment
@ArcTheMaster

ArcTheMaster Nov 6, 2015

@vanjor, for my part the ES connector is working on Hive. I only have the issue with Spark.
Here is what I did to make it work with Hive:

  • create a symbolic link in the Hive conf directory to "spark-default.conf" (eg: spark-defaults.conf -> /opt/application/Spark/current/conf/spark-defaults.conf)
  • create a directory in Hive which will have the ES connector jar (eg: /opt/application/Hive/current//elastic/jar/elasticsearch-hadoop-2.1.1.jar)
  • start hive
  • import the ES connector jar as below:
hive (yolo)> add jar /opt/application/Hive/current/elastic/jar/elasticsearch-hadoop-2.1.1.jar;
  • create a Hive table wich will do the connection between Hive and ES (its very close to the Hive/HBase connection). Caution, you must set the same table parameters as the mapping used in the index. For example:

Hive parameter1 = account_number BIGINT
Hive parameter3 = firstname STRING

ES mapping1 = account LONG
ES mapping1 = firstname STRING

hive (yolo) > CREATE EXTERNAL TABLE elastic_table_1(account_number BIGINT, balance BIGINT, firstname STRING, lastname STRING, age BIGINT, gender STRING, address STRING, employer STRING, email STRING, city STRING, state STRING) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES ("es.net.ssl" = "true", "es.net.http.auth.user" = "username", "es.net.http.auth.pass" = "password", "es.nodes" = "node01.fr", "es.resource" = "index/type", "es.index.auto.create" = "false", "es.query" = "?q=*");
  • then you will be able to load you data
hive (yolo) > SELECT * FROM elastic_table_1;

I hope this will help you.

ArcTheMaster commented Nov 6, 2015

@vanjor, for my part the ES connector is working on Hive. I only have the issue with Spark.
Here is what I did to make it work with Hive:

  • create a symbolic link in the Hive conf directory to "spark-default.conf" (eg: spark-defaults.conf -> /opt/application/Spark/current/conf/spark-defaults.conf)
  • create a directory in Hive which will have the ES connector jar (eg: /opt/application/Hive/current//elastic/jar/elasticsearch-hadoop-2.1.1.jar)
  • start hive
  • import the ES connector jar as below:
hive (yolo)> add jar /opt/application/Hive/current/elastic/jar/elasticsearch-hadoop-2.1.1.jar;
  • create a Hive table wich will do the connection between Hive and ES (its very close to the Hive/HBase connection). Caution, you must set the same table parameters as the mapping used in the index. For example:

Hive parameter1 = account_number BIGINT
Hive parameter3 = firstname STRING

ES mapping1 = account LONG
ES mapping1 = firstname STRING

hive (yolo) > CREATE EXTERNAL TABLE elastic_table_1(account_number BIGINT, balance BIGINT, firstname STRING, lastname STRING, age BIGINT, gender STRING, address STRING, employer STRING, email STRING, city STRING, state STRING) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES ("es.net.ssl" = "true", "es.net.http.auth.user" = "username", "es.net.http.auth.pass" = "password", "es.nodes" = "node01.fr", "es.resource" = "index/type", "es.index.auto.create" = "false", "es.query" = "?q=*");
  • then you will be able to load you data
hive (yolo) > SELECT * FROM elastic_table_1;

I hope this will help you.

@vanjor

This comment has been minimized.

Show comment
Hide comment
@vanjor

vanjor Nov 9, 2015

@ArcTheMaster thanks for your help, I was succeeded to load billions of data into ES by using hive, but when I try to update the data into ES, the job will failed after several hours.
Briefly, It okay to loading data into an empty index by using hive, but it will failed in the midway when updating those large scale ES data by using hive.

my mapping config is

CREATE EXTERNAL TABLE es.buyer_es (
  user_id  string,
  is_register int,
  register_time  string,
  xxx
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'xxx/buyers','es.nodes'= 'xxx',
              'es.port'= '9200','es.mapping.id' = 'user_id','es.index.auto.create' = 'true',
              'es.batch.size.entries' = '1000','es.batch.write.retry.count' = '10000','es.batch.write.retry.wait' = '10s',
              'es.batch.write.refresh' = 'false','es.nodes.discovery' = 'true','es.nodes.client.only' = 'false'
             );

my insert&update script is

INSERT OVERWRITE TABLE es.buyer_es 
select * from xxx

vanjor commented Nov 9, 2015

@ArcTheMaster thanks for your help, I was succeeded to load billions of data into ES by using hive, but when I try to update the data into ES, the job will failed after several hours.
Briefly, It okay to loading data into an empty index by using hive, but it will failed in the midway when updating those large scale ES data by using hive.

my mapping config is

CREATE EXTERNAL TABLE es.buyer_es (
  user_id  string,
  is_register int,
  register_time  string,
  xxx
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.resource' = 'xxx/buyers','es.nodes'= 'xxx',
              'es.port'= '9200','es.mapping.id' = 'user_id','es.index.auto.create' = 'true',
              'es.batch.size.entries' = '1000','es.batch.write.retry.count' = '10000','es.batch.write.retry.wait' = '10s',
              'es.batch.write.refresh' = 'false','es.nodes.discovery' = 'true','es.nodes.client.only' = 'false'
             );

my insert&update script is

INSERT OVERWRITE TABLE es.buyer_es 
select * from xxx
@costin

This comment has been minimized.

Show comment
Hide comment
@costin

costin Nov 9, 2015

Member

@ArcTheMaster First off, thanks for the detailed ticket - appreciated.
Interesting to see that count works (that's because it's a straight call to ES) while iterating does not. Can you try 2.1.2 or potentially 2.2.0.beta1? If the issue still occurs please enable logging on the REST package (to TRACE) and upload the gist somewhere.

Likely this has something to do with the SSL/load balancer. By using a load balancer (not sure why you need that but that's a different discussion), you require that all clients have to go through that 'check point'. ES-Hadoop makes direct connections to each shard based on the number of running tasks so in this case, so likely after discovering the shards, the connector tries to access the them but fails (as they are not exposed).

This scenario is supported in 2.2 only through the wan feature as explained here. Do note that this will impact performance significantly.

@vanjor You are experiencing a typical case of cluster overload. You define a retry of 10K with a wait of 10s in between - no wonder your jobs take hours. Increasing retries simply postpones the job failing if the cluster lags behind - and there are several reasons why this can happen from too many map/reduce jobs to too little resources on the ES side.
There's a dedicated chapter on the topic.
Last but not least, please open a different issue or better yet a topic on the forum if you'd like to continue the conversation. I'd like to keep this ticket on @ArcTheMaster's topic.

@vanjor @ArcTheMaster Guys, I'm glad to see folks helping each other out yet at the same ticket, it looks like two different tickets are being discussed in the same thread. Would you mind opening a new ticket or potentially discussing over the forums? Reduces confusion for everyone involved.

Member

costin commented Nov 9, 2015

@ArcTheMaster First off, thanks for the detailed ticket - appreciated.
Interesting to see that count works (that's because it's a straight call to ES) while iterating does not. Can you try 2.1.2 or potentially 2.2.0.beta1? If the issue still occurs please enable logging on the REST package (to TRACE) and upload the gist somewhere.

Likely this has something to do with the SSL/load balancer. By using a load balancer (not sure why you need that but that's a different discussion), you require that all clients have to go through that 'check point'. ES-Hadoop makes direct connections to each shard based on the number of running tasks so in this case, so likely after discovering the shards, the connector tries to access the them but fails (as they are not exposed).

This scenario is supported in 2.2 only through the wan feature as explained here. Do note that this will impact performance significantly.

@vanjor You are experiencing a typical case of cluster overload. You define a retry of 10K with a wait of 10s in between - no wonder your jobs take hours. Increasing retries simply postpones the job failing if the cluster lags behind - and there are several reasons why this can happen from too many map/reduce jobs to too little resources on the ES side.
There's a dedicated chapter on the topic.
Last but not least, please open a different issue or better yet a topic on the forum if you'd like to continue the conversation. I'd like to keep this ticket on @ArcTheMaster's topic.

@vanjor @ArcTheMaster Guys, I'm glad to see folks helping each other out yet at the same ticket, it looks like two different tickets are being discussed in the same thread. Would you mind opening a new ticket or potentially discussing over the forums? Reduces confusion for everyone involved.

@ArcTheMaster

This comment has been minimized.

Show comment
Hide comment
@ArcTheMaster

ArcTheMaster Nov 10, 2015

@costin Thank you for your reply and all the input you provided.

I did a lot of tests today, with multiple ES-Hadoop versions. I enabled logging on the REST package as you said.
I also disabled SSL but the authentication through SearchGuard was maintained. I wanted to check the network traces, particularly the authentication process between the driver and the executors. With SSL activated, this won't be possible.

Here are my global results for each ES-Hadoop connector:

  • elasticsearch-hadoop-2.1.1.jar (ES-Hadoop 2.1.1)
val RDD2 = sc.esRDD("my_index/my_type", "?q=Ge*")
RDD2.count ===> functional
RDD2.collect().foreach(println) ===> not functional
  • elasticsearch-hadoop-2.1.2.jar (ES-Hadoop 2.1.2)
val RDD2 = sc.esRDD("my_index/my_type", "?q=Ge*")
RDD2.count ===> not functional
RDD2.collect().foreach(println) ===> not functional
  • elasticsearch-hadoop-2.2.0-beta1.jar (ES-Hadoop 2.2.0 Beta)
val RDD2 = sc.esRDD("my_index/my_type", "?q=Ge*")
RDD2.count ===> not functional
RDD2.collect().foreach(println) ===> not functional

As you can see, I have a different behaviour between version 2.1.1 and 2.1.2 or 2.2.0. The count was functional before which is not the case now.
In all cases, when the request is not functional I have the following logs:

scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._

scala> val RDD2 = sc.esRDD("my_index/my_type", "?q=Ge*")
RDD2: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[0] at RDD at AbstractEsRDD.scala:17

scala> RDD2.collect().foreach(println)
15/11/10 15:24:13 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
15/11/10 15:24:13 TRACE CommonsHttpTransport: Opening HTTP transport to loadbalancer01.fr:9200
15/11/10 15:24:13 TRACE CommonsHttpTransport: Tx [GET]@[loadbalancer01.fr:9200][] w/ payload [null]
15/11/10 15:24:14 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{
  "status" : 200,
  "name" : "loadbalancer01.fr",
  "cluster_name" : "MYCLUSTER.PEI",
  "version" : {
    "number" : "1.6.2",
    "build_hash" : "622039121e53e5f520b5ff8720fdbd3d0cb5326b",
    "build_timestamp" : "2015-07-29T09:24:47Z",
    "build_snapshot" : false,
    "lucene_version" : "4.10.4"
  },
  "tagline" : "You Know, for Search"
}
]
15/11/10 15:24:14 TRACE CommonsHttpTransport: Closing HTTP transport to loadbalancer01.fr:9200
15/11/10 15:24:14 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
15/11/10 15:24:14 TRACE CommonsHttpTransport: Opening HTTP transport to loadbalancer01.fr:9200
15/11/10 15:24:14 TRACE CommonsHttpTransport: Tx [GET]@[loadbalancer01.fr:9200][_nodes/transport] w/ payload [null]
15/11/10 15:24:15 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"cluster_name":"ELASTIC.PEI","nodes":{"innll3d8QMuo_iYqQCBm_Q":{"name":"loadbalancer01.fr","transport_address":"inet[loadbalancer01.fr/192.168.200.213:9300]","host":"loadbalancer01.fr","ip":"192.168.200.213","version":"1.6.2","build":"6220391","http_address":"inet[loadbalancer01.fr/192.168.200.213:9200]","attributes":{"data":"false","master":"false"},"transport":{"bound_address":"inet[/0.0.0.0:9300]","publish_address":"inet[loadbalancer01.fr/192.168.200.213:9300]","profiles":{}}},"HuZ-Vpy0QwSTzRkclkMpRg":{"name":"node05.fr","transport_address":"inet[/192.168.200.29:9300]","host":"node05.fr","ip":"192.168.200.29","version":"1.6.2","build":"6220391","http_address":"inet[node05.fr/192.168.200.29:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.29:9300]","publish_address":"inet[/192.168.200.29:9300]","profiles":{}}},"qOaZ0CXsRL-80-LnbEHk1Q":{"name":"uabigname01.fr","transport_address":"inet[/192.168.200.23:9300]","host":"uabigname01.fr","ip":"192.168.200.23","version":"1.6.2","build":"6220391","attributes":{"data":"false","master":"true"},"transport":{"bound_address":"inet[/192.168.200.23:9300]","publish_address":"inet[/192.168.200.23:9300]","profiles":{}}},"1RKHMmjcQgGyl9n_6VsDuw":{"name":"node06.fr","transport_address":"inet[/192.168.200.30:9300]","host":"node06.fr","ip":"192.168.200.30","version":"1.6.2","build":"6220391","http_address":"inet[node06.fr/192.168.200.30:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.30:9300]","publish_address":"inet[/192.168.200.30:9300]","profiles":{}}},"VQEqVFm4RUGUVtJufmJdAQ":{"name":"node01.fr","transport_address":"inet[/192.168.200.25:9300]","host":"node01.fr","ip":"192.168.200.25","version":"1.6.2","build":"6220391","http_address":"inet[node01.fr/192.168.200.25:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.25:9300]","publish_address":"inet[/192.168.200.25:9300]","profiles":{}}},"MwZk6zsfRX65xkc7vnNj-A":{"name":"node04.fr","transport_address":"inet[/192.168.200.28:9300]","host":"node04.fr","ip":"192.168.200.28","version":"1.6.2","build":"6220391","http_address":"inet[node04.fr/192.168.200.28:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.28:9300]","publish_address":"inet[/192.168.200.28:9300]","profiles":{}}},"ujZ7vRpZTyKcWyBsrI72vg":{"name":"node03.fr","transport_address":"inet[/192.168.200.27:9300]","host":"node03.fr","ip":"192.168.200.27","version":"1.6.2","build":"6220391","http_address":"inet[node03.fr/192.168.200.27:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.27:9300]","publish_address":"inet[/192.168.200.27:9300]","profiles":{}}}}}]
15/11/10 15:24:15 TRACE CommonsHttpTransport: Closing HTTP transport to loadbalancer01.fr:9200
15/11/10 15:24:15 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
15/11/10 15:24:15 TRACE CommonsHttpTransport: Opening HTTP transport to 192.168.200.29:9200
15/11/10 15:24:15 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.29:9200][_nodes/http] w/ payload [null]
15/11/10 15:24:16 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"cluster_name":"ELASTIC.PEI","nodes":{"innll3d8QMuo_iYqQCBm_Q":{"name":"loadbalancer01.fr","transport_address":"inet[/192.168.200.213:9300]","host":"loadbalancer01.fr","ip":"192.168.200.213","version":"1.6.2","build":"6220391","http_address":"inet[loadbalancer01.fr/192.168.200.213:9200]","attributes":{"data":"false","master":"false"},"http":{"bound_address":"inet[/0.0.0.0:9200]","publish_address":"inet[/192.168.200.213:9200]","max_content_length_in_bytes":104857600}},"HuZ-Vpy0QwSTzRkclkMpRg":{"name":"node05.fr","transport_address":"inet[node05.fr/192.168.200.29:9300]","host":"node05.fr","ip":"192.168.200.29","version":"1.6.2","build":"6220391","http_address":"inet[node05.fr/192.168.200.29:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.29:9200]","publish_address":"inet[node05.fr/192.168.200.29:9200]","max_content_length_in_bytes":104857600}},"qOaZ0CXsRL-80-LnbEHk1Q":{"name":"uabigname01.fr","transport_address":"inet[/192.168.200.23:9300]","host":"uabigname01.fr","ip":"192.168.200.23","version":"1.6.2","build":"6220391","attributes":{"data":"false","master":"true"}},"1RKHMmjcQgGyl9n_6VsDuw":{"name":"node06.fr","transport_address":"inet[/192.168.200.30:9300]","host":"node06.fr","ip":"192.168.200.30","version":"1.6.2","build":"6220391","http_address":"inet[node06.fr/192.168.200.30:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.30:9200]","publish_address":"inet[/192.168.200.30:9200]","max_content_length_in_bytes":104857600}},"VQEqVFm4RUGUVtJufmJdAQ":{"name":"node01.fr","transport_address":"inet[/192.168.200.25:9300]","host":"node01.fr","ip":"192.168.200.25","version":"1.6.2","build":"6220391","http_address":"inet[node01.fr/192.168.200.25:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.25:9200]","publish_address":"inet[/192.168.200.25:9200]","max_content_length_in_bytes":104857600}},"MwZk6zsfRX65xkc7vnNj-A":{"name":"node04.fr","transport_address":"inet[/192.168.200.28:9300]","host":"node04.fr","ip":"192.168.200.28","version":"1.6.2","build":"6220391","http_address":"inet[node04.fr/192.168.200.28:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.28:9200]","publish_address":"inet[/192.168.200.28:9200]","max_content_length_in_bytes":104857600}},"ujZ7vRpZTyKcWyBsrI72vg":{"name":"node03.fr","transport_address":"inet[/192.168.200.27:9300]","host":"node03.fr","ip":"192.168.200.27","version":"1.6.2","build":"6220391","http_address":"inet[node03.fr/192.168.200.27:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.27:9200]","publish_address":"inet[/192.168.200.27:9200]","max_content_length_in_bytes":104857600}}}}]
15/11/10 15:24:16 TRACE CommonsHttpTransport: Closing HTTP transport to 192.168.200.29:9200
15/11/10 15:24:16 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
15/11/10 15:24:16 TRACE CommonsHttpTransport: Opening HTTP transport to 192.168.200.30:9200
15/11/10 15:24:16 TRACE CommonsHttpTransport: Tx [HEAD]@[192.168.200.30:9200][my_index/my_type] w/ payload [null]
15/11/10 15:24:17 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [null]
15/11/10 15:24:17 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.30:9200][my_index/_search_shards] w/ payload [null]
15/11/10 15:24:17 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"nodes":{"MwZk6zsfRX65xkc7vnNj-A":{"name":"node04.fr","transport_address":"inet[/192.168.200.28:9300]","attributes":{"master":"false"}},"1RKHMmjcQgGyl9n_6VsDuw":{"name":"node06.fr","transport_address":"inet[/192.168.200.30:9300]","attributes":{"master":"false"}},"ujZ7vRpZTyKcWyBsrI72vg":{"name":"node03.fr","transport_address":"inet[/192.168.200.27:9300]","attributes":{"master":"false"}},"HuZ-Vpy0QwSTzRkclkMpRg":{"name":"node05.fr","transport_address":"inet[/192.168.200.29:9300]","attributes":{"master":"false"}},"VQEqVFm4RUGUVtJufmJdAQ":{"name":"node01.fr","transport_address":"inet[/192.168.200.25:9300]","attributes":{"master":"false"}}},"shards":[[{"state":"STARTED","primary":false,"node":"HuZ-Vpy0QwSTzRkclkMpRg","relocating_node":null,"shard":0,"index":"my_index"},{"state":"STARTED","primary":true,"node":"1RKHMmjcQgGyl9n_6VsDuw","relocating_node":null,"shard":0,"index":"my_index"}],[{"state":"STARTED","primary":false,"node":"VQEqVFm4RUGUVtJufmJdAQ","relocating_node":null,"shard":1,"index":"my_index"},{"state":"STARTED","primary":true,"node":"ujZ7vRpZTyKcWyBsrI72vg","relocating_node":null,"shard":1,"index":"my_index"}],[{"state":"STARTED","primary":false,"node":"VQEqVFm4RUGUVtJufmJdAQ","relocating_node":null,"shard":2,"index":"my_index"},{"state":"STARTED","primary":true,"node":"MwZk6zsfRX65xkc7vnNj-A","relocating_node":null,"shard":2,"index":"my_index"}],[{"state":"STARTED","primary":false,"node":"HuZ-Vpy0QwSTzRkclkMpRg","relocating_node":null,"shard":3,"index":"my_index"},{"state":"STARTED","primary":true,"node":"ujZ7vRpZTyKcWyBsrI72vg","relocating_node":null,"shard":3,"index":"my_index"}],[{"state":"STARTED","primary":false,"node":"1RKHMmjcQgGyl9n_6VsDuw","relocating_node":null,"shard":4,"index":"my_index"},{"state":"STARTED","primary":true,"node":"MwZk6zsfRX65xkc7vnNj-A","relocating_node":null,"shard":4,"index":"my_index"}]]}]
15/11/10 15:24:17 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.30:9200][_nodes/http] w/ payload [null]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"cluster_name":"ELASTIC.PEI","nodes":{"innll3d8QMuo_iYqQCBm_Q":{"name":"loadbalancer01.fr","transport_address":"inet[/192.168.200.213:9300]","host":"loadbalancer01.fr","ip":"192.168.200.213","version":"1.6.2","build":"6220391","http_address":"inet[loadbalancer01.fr/192.168.200.213:9200]","attributes":{"data":"false","master":"false"},"http":{"bound_address":"inet[/0.0.0.0:9200]","publish_address":"inet[/192.168.200.213:9200]","max_content_length_in_bytes":104857600}},"HuZ-Vpy0QwSTzRkclkMpRg":{"name":"node05.fr","transport_address":"inet[/192.168.200.29:9300]","host":"node05.fr","ip":"192.168.200.29","version":"1.6.2","build":"6220391","http_address":"inet[node05.fr/192.168.200.29:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.29:9200]","publish_address":"inet[/192.168.200.29:9200]","max_content_length_in_bytes":104857600}},"qOaZ0CXsRL-80-LnbEHk1Q":{"name":"uabigname01.fr","transport_address":"inet[/192.168.200.23:9300]","host":"uabigname01.fr","ip":"192.168.200.23","version":"1.6.2","build":"6220391","attributes":{"data":"false","master":"true"}},"1RKHMmjcQgGyl9n_6VsDuw":{"name":"node06.fr","transport_address":"inet[node06.fr/192.168.200.30:9300]","host":"node06.fr","ip":"192.168.200.30","version":"1.6.2","build":"6220391","http_address":"inet[node06.fr/192.168.200.30:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.30:9200]","publish_address":"inet[node06.fr/192.168.200.30:9200]","max_content_length_in_bytes":104857600}},"VQEqVFm4RUGUVtJufmJdAQ":{"name":"node01.fr","transport_address":"inet[/192.168.200.25:9300]","host":"node01.fr","ip":"192.168.200.25","version":"1.6.2","build":"6220391","http_address":"inet[node01.fr/192.168.200.25:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.25:9200]","publish_address":"inet[/192.168.200.25:9200]","max_content_length_in_bytes":104857600}},"MwZk6zsfRX65xkc7vnNj-A":{"name":"node04.fr","transport_address":"inet[/192.168.200.28:9300]","host":"node04.fr","ip":"192.168.200.28","version":"1.6.2","build":"6220391","http_address":"inet[node04.fr/192.168.200.28:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.28:9200]","publish_address":"inet[/192.168.200.28:9200]","max_content_length_in_bytes":104857600}},"ujZ7vRpZTyKcWyBsrI72vg":{"name":"node03.fr","transport_address":"inet[/192.168.200.27:9300]","host":"node03.fr","ip":"192.168.200.27","version":"1.6.2","build":"6220391","http_address":"inet[node03.fr/192.168.200.27:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.27:9200]","publish_address":"inet[/192.168.200.27:9200]","max_content_length_in_bytes":104857600}}}}]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.30:9200][my_index/_aliases] w/ payload [null]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"my_index":{"aliases":{}}}]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.30:9200][my_index/my_type/_mapping] w/ payload [null]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"my_index":{"mappings":{"my_type":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}]
15/11/10 15:24:18 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/11/10 15:24:18 DEBUG RestRepository: Sending batch of [0] bytes/[0] entries
15/11/10 15:24:18 TRACE CommonsHttpTransport: Closing HTTP transport to 192.168.200.30:9200
[Stage 0:>                                                          (0 + 2) / 5]15/11/10 15:24:20 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, node03.fr): org.apache.spark.util.TaskCompletionListenerException: [POST] on [my_index/my_type/_search?search_type=scan&scroll=5&size=50&preference=_shards:0;_only_node:HuZ-Vpy0QwSTzRkclkMpRg] failed; server[192.168.200.29:9200] returned [401|Unauthorized:]
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:90)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
        at $iwC$$iwC$$iwC.<init>(<console>:44)
        at $iwC$$iwC.<init>(<console>:46)
        at $iwC.<init>(<console>:48)
        at <init>(<console>:50)
        at .<init>(<console>:54)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.util.TaskCompletionListenerException: [POST] on [my_index/my_type/_search?search_type=scan&scroll=5&size=50&preference=_shards:0;_only_node:HuZ-Vpy0QwSTzRkclkMpRg] failed; server[192.168.200.29:9200] returned [401|Unauthorized:]
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:90)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

The error is not the same as the one I posted at the beginning (maybe there was a mistake with an old "export http_proxy" in memory). Now this is a 401 unauthorized error.
I checked the network traces and I saw the following:

  1. The spark client machine (which is the Spark Driver) uses the basic auth given when spark-shell was started (--conf spark.es.net.http.auth.user=username --conf spark.es.net.http.auth.pass=password)
  2. The Driver is talking with the client node (I called it loadbalancer in my first post). The basic auth is used during this step.
REQUEST
GET / HTTP/1.1
Authorization: Basic c2ltbzEyMzQ6QWRtaW4xMjMt
User-Agent: Jakarta Commons-HttpClient/3.1
Host: loadbalancer01.fr:9200

RESPONSE
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 359

Three HTTP GET are visible. All of these requests are accepted with a 200 OK.

  • GET HTTP /
  • GET HTTP /_nodes/_transport
  • GET HTTP /_nodes/http
  1. The Driver is talking with a data node (node04.fr). The basic auth is also used here.
REQUEST
HEAD /my_index/my_type HTTP/1.1
Authorization: Basic c2ltbzEyMzQ6QWRtaW4xMjMt
User-Agent: Jakarta Commons-HttpClient/3.1
Host: node04.fr:9200

RESPONSE
HTTP/1.1 200 OK
Content-Type: text/plain; charset=UTF-8
Content-Length: 0

Five HTTP requests are visible. All of these requests are accepted with a 200 OK.

  • HEAD HTTP /my_index/my_type
  • GET HTTP /my_index/_search_shards
  • GET HTTP /_nodes/http
  • GET HTTP /my_index/_aliases
  • GET HTTP /my_index/my_type/_mapping
  1. Now the executors are working. For example, the node07 is talking with the node06. In that part the basic auth is not done.

Several HTTP POST are done repeatedly. All of these requests are refused with a 401 unauthorized.

REQUEST
POST /my_index/my_type/_search?search_type=scan&scroll=5&size=50&preference=_shards:0;_only_node:1RKHMmjcQgGyl9n_6VsDuw HTTP/1.1
User-Agent: Jakarta Commons-HttpClient/3.1
Host: 192.168.200.30:9200
Content-Length: 44
Content-Type: application/json; charset=UTF-8

RESPONSE
HTTP/1.1 401 Unauthorized
WWW-Authenticate: Basic realm="Search Guard"
Content-Type: text/plain; charset=UTF-8
Content-Length: 0

In think this is the origin of the issue. The ES-Hadoop connector doesn't seem to transmit the credentials to the executors and that's why I get the 401.

ArcTheMaster commented Nov 10, 2015

@costin Thank you for your reply and all the input you provided.

I did a lot of tests today, with multiple ES-Hadoop versions. I enabled logging on the REST package as you said.
I also disabled SSL but the authentication through SearchGuard was maintained. I wanted to check the network traces, particularly the authentication process between the driver and the executors. With SSL activated, this won't be possible.

Here are my global results for each ES-Hadoop connector:

  • elasticsearch-hadoop-2.1.1.jar (ES-Hadoop 2.1.1)
val RDD2 = sc.esRDD("my_index/my_type", "?q=Ge*")
RDD2.count ===> functional
RDD2.collect().foreach(println) ===> not functional
  • elasticsearch-hadoop-2.1.2.jar (ES-Hadoop 2.1.2)
val RDD2 = sc.esRDD("my_index/my_type", "?q=Ge*")
RDD2.count ===> not functional
RDD2.collect().foreach(println) ===> not functional
  • elasticsearch-hadoop-2.2.0-beta1.jar (ES-Hadoop 2.2.0 Beta)
val RDD2 = sc.esRDD("my_index/my_type", "?q=Ge*")
RDD2.count ===> not functional
RDD2.collect().foreach(println) ===> not functional

As you can see, I have a different behaviour between version 2.1.1 and 2.1.2 or 2.2.0. The count was functional before which is not the case now.
In all cases, when the request is not functional I have the following logs:

scala> import org.elasticsearch.spark._
import org.elasticsearch.spark._

scala> val RDD2 = sc.esRDD("my_index/my_type", "?q=Ge*")
RDD2: org.apache.spark.rdd.RDD[(String, scala.collection.Map[String,AnyRef])] = ScalaEsRDD[0] at RDD at AbstractEsRDD.scala:17

scala> RDD2.collect().foreach(println)
15/11/10 15:24:13 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
15/11/10 15:24:13 TRACE CommonsHttpTransport: Opening HTTP transport to loadbalancer01.fr:9200
15/11/10 15:24:13 TRACE CommonsHttpTransport: Tx [GET]@[loadbalancer01.fr:9200][] w/ payload [null]
15/11/10 15:24:14 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{
  "status" : 200,
  "name" : "loadbalancer01.fr",
  "cluster_name" : "MYCLUSTER.PEI",
  "version" : {
    "number" : "1.6.2",
    "build_hash" : "622039121e53e5f520b5ff8720fdbd3d0cb5326b",
    "build_timestamp" : "2015-07-29T09:24:47Z",
    "build_snapshot" : false,
    "lucene_version" : "4.10.4"
  },
  "tagline" : "You Know, for Search"
}
]
15/11/10 15:24:14 TRACE CommonsHttpTransport: Closing HTTP transport to loadbalancer01.fr:9200
15/11/10 15:24:14 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
15/11/10 15:24:14 TRACE CommonsHttpTransport: Opening HTTP transport to loadbalancer01.fr:9200
15/11/10 15:24:14 TRACE CommonsHttpTransport: Tx [GET]@[loadbalancer01.fr:9200][_nodes/transport] w/ payload [null]
15/11/10 15:24:15 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"cluster_name":"ELASTIC.PEI","nodes":{"innll3d8QMuo_iYqQCBm_Q":{"name":"loadbalancer01.fr","transport_address":"inet[loadbalancer01.fr/192.168.200.213:9300]","host":"loadbalancer01.fr","ip":"192.168.200.213","version":"1.6.2","build":"6220391","http_address":"inet[loadbalancer01.fr/192.168.200.213:9200]","attributes":{"data":"false","master":"false"},"transport":{"bound_address":"inet[/0.0.0.0:9300]","publish_address":"inet[loadbalancer01.fr/192.168.200.213:9300]","profiles":{}}},"HuZ-Vpy0QwSTzRkclkMpRg":{"name":"node05.fr","transport_address":"inet[/192.168.200.29:9300]","host":"node05.fr","ip":"192.168.200.29","version":"1.6.2","build":"6220391","http_address":"inet[node05.fr/192.168.200.29:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.29:9300]","publish_address":"inet[/192.168.200.29:9300]","profiles":{}}},"qOaZ0CXsRL-80-LnbEHk1Q":{"name":"uabigname01.fr","transport_address":"inet[/192.168.200.23:9300]","host":"uabigname01.fr","ip":"192.168.200.23","version":"1.6.2","build":"6220391","attributes":{"data":"false","master":"true"},"transport":{"bound_address":"inet[/192.168.200.23:9300]","publish_address":"inet[/192.168.200.23:9300]","profiles":{}}},"1RKHMmjcQgGyl9n_6VsDuw":{"name":"node06.fr","transport_address":"inet[/192.168.200.30:9300]","host":"node06.fr","ip":"192.168.200.30","version":"1.6.2","build":"6220391","http_address":"inet[node06.fr/192.168.200.30:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.30:9300]","publish_address":"inet[/192.168.200.30:9300]","profiles":{}}},"VQEqVFm4RUGUVtJufmJdAQ":{"name":"node01.fr","transport_address":"inet[/192.168.200.25:9300]","host":"node01.fr","ip":"192.168.200.25","version":"1.6.2","build":"6220391","http_address":"inet[node01.fr/192.168.200.25:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.25:9300]","publish_address":"inet[/192.168.200.25:9300]","profiles":{}}},"MwZk6zsfRX65xkc7vnNj-A":{"name":"node04.fr","transport_address":"inet[/192.168.200.28:9300]","host":"node04.fr","ip":"192.168.200.28","version":"1.6.2","build":"6220391","http_address":"inet[node04.fr/192.168.200.28:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.28:9300]","publish_address":"inet[/192.168.200.28:9300]","profiles":{}}},"ujZ7vRpZTyKcWyBsrI72vg":{"name":"node03.fr","transport_address":"inet[/192.168.200.27:9300]","host":"node03.fr","ip":"192.168.200.27","version":"1.6.2","build":"6220391","http_address":"inet[node03.fr/192.168.200.27:9200]","attributes":{"master":"false"},"transport":{"bound_address":"inet[/192.168.200.27:9300]","publish_address":"inet[/192.168.200.27:9300]","profiles":{}}}}}]
15/11/10 15:24:15 TRACE CommonsHttpTransport: Closing HTTP transport to loadbalancer01.fr:9200
15/11/10 15:24:15 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
15/11/10 15:24:15 TRACE CommonsHttpTransport: Opening HTTP transport to 192.168.200.29:9200
15/11/10 15:24:15 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.29:9200][_nodes/http] w/ payload [null]
15/11/10 15:24:16 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"cluster_name":"ELASTIC.PEI","nodes":{"innll3d8QMuo_iYqQCBm_Q":{"name":"loadbalancer01.fr","transport_address":"inet[/192.168.200.213:9300]","host":"loadbalancer01.fr","ip":"192.168.200.213","version":"1.6.2","build":"6220391","http_address":"inet[loadbalancer01.fr/192.168.200.213:9200]","attributes":{"data":"false","master":"false"},"http":{"bound_address":"inet[/0.0.0.0:9200]","publish_address":"inet[/192.168.200.213:9200]","max_content_length_in_bytes":104857600}},"HuZ-Vpy0QwSTzRkclkMpRg":{"name":"node05.fr","transport_address":"inet[node05.fr/192.168.200.29:9300]","host":"node05.fr","ip":"192.168.200.29","version":"1.6.2","build":"6220391","http_address":"inet[node05.fr/192.168.200.29:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.29:9200]","publish_address":"inet[node05.fr/192.168.200.29:9200]","max_content_length_in_bytes":104857600}},"qOaZ0CXsRL-80-LnbEHk1Q":{"name":"uabigname01.fr","transport_address":"inet[/192.168.200.23:9300]","host":"uabigname01.fr","ip":"192.168.200.23","version":"1.6.2","build":"6220391","attributes":{"data":"false","master":"true"}},"1RKHMmjcQgGyl9n_6VsDuw":{"name":"node06.fr","transport_address":"inet[/192.168.200.30:9300]","host":"node06.fr","ip":"192.168.200.30","version":"1.6.2","build":"6220391","http_address":"inet[node06.fr/192.168.200.30:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.30:9200]","publish_address":"inet[/192.168.200.30:9200]","max_content_length_in_bytes":104857600}},"VQEqVFm4RUGUVtJufmJdAQ":{"name":"node01.fr","transport_address":"inet[/192.168.200.25:9300]","host":"node01.fr","ip":"192.168.200.25","version":"1.6.2","build":"6220391","http_address":"inet[node01.fr/192.168.200.25:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.25:9200]","publish_address":"inet[/192.168.200.25:9200]","max_content_length_in_bytes":104857600}},"MwZk6zsfRX65xkc7vnNj-A":{"name":"node04.fr","transport_address":"inet[/192.168.200.28:9300]","host":"node04.fr","ip":"192.168.200.28","version":"1.6.2","build":"6220391","http_address":"inet[node04.fr/192.168.200.28:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.28:9200]","publish_address":"inet[/192.168.200.28:9200]","max_content_length_in_bytes":104857600}},"ujZ7vRpZTyKcWyBsrI72vg":{"name":"node03.fr","transport_address":"inet[/192.168.200.27:9300]","host":"node03.fr","ip":"192.168.200.27","version":"1.6.2","build":"6220391","http_address":"inet[node03.fr/192.168.200.27:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.27:9200]","publish_address":"inet[/192.168.200.27:9200]","max_content_length_in_bytes":104857600}}}}]
15/11/10 15:24:16 TRACE CommonsHttpTransport: Closing HTTP transport to 192.168.200.29:9200
15/11/10 15:24:16 INFO CommonsHttpTransport: Using detected HTTP Auth credentials...
15/11/10 15:24:16 TRACE CommonsHttpTransport: Opening HTTP transport to 192.168.200.30:9200
15/11/10 15:24:16 TRACE CommonsHttpTransport: Tx [HEAD]@[192.168.200.30:9200][my_index/my_type] w/ payload [null]
15/11/10 15:24:17 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [null]
15/11/10 15:24:17 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.30:9200][my_index/_search_shards] w/ payload [null]
15/11/10 15:24:17 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"nodes":{"MwZk6zsfRX65xkc7vnNj-A":{"name":"node04.fr","transport_address":"inet[/192.168.200.28:9300]","attributes":{"master":"false"}},"1RKHMmjcQgGyl9n_6VsDuw":{"name":"node06.fr","transport_address":"inet[/192.168.200.30:9300]","attributes":{"master":"false"}},"ujZ7vRpZTyKcWyBsrI72vg":{"name":"node03.fr","transport_address":"inet[/192.168.200.27:9300]","attributes":{"master":"false"}},"HuZ-Vpy0QwSTzRkclkMpRg":{"name":"node05.fr","transport_address":"inet[/192.168.200.29:9300]","attributes":{"master":"false"}},"VQEqVFm4RUGUVtJufmJdAQ":{"name":"node01.fr","transport_address":"inet[/192.168.200.25:9300]","attributes":{"master":"false"}}},"shards":[[{"state":"STARTED","primary":false,"node":"HuZ-Vpy0QwSTzRkclkMpRg","relocating_node":null,"shard":0,"index":"my_index"},{"state":"STARTED","primary":true,"node":"1RKHMmjcQgGyl9n_6VsDuw","relocating_node":null,"shard":0,"index":"my_index"}],[{"state":"STARTED","primary":false,"node":"VQEqVFm4RUGUVtJufmJdAQ","relocating_node":null,"shard":1,"index":"my_index"},{"state":"STARTED","primary":true,"node":"ujZ7vRpZTyKcWyBsrI72vg","relocating_node":null,"shard":1,"index":"my_index"}],[{"state":"STARTED","primary":false,"node":"VQEqVFm4RUGUVtJufmJdAQ","relocating_node":null,"shard":2,"index":"my_index"},{"state":"STARTED","primary":true,"node":"MwZk6zsfRX65xkc7vnNj-A","relocating_node":null,"shard":2,"index":"my_index"}],[{"state":"STARTED","primary":false,"node":"HuZ-Vpy0QwSTzRkclkMpRg","relocating_node":null,"shard":3,"index":"my_index"},{"state":"STARTED","primary":true,"node":"ujZ7vRpZTyKcWyBsrI72vg","relocating_node":null,"shard":3,"index":"my_index"}],[{"state":"STARTED","primary":false,"node":"1RKHMmjcQgGyl9n_6VsDuw","relocating_node":null,"shard":4,"index":"my_index"},{"state":"STARTED","primary":true,"node":"MwZk6zsfRX65xkc7vnNj-A","relocating_node":null,"shard":4,"index":"my_index"}]]}]
15/11/10 15:24:17 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.30:9200][_nodes/http] w/ payload [null]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"cluster_name":"ELASTIC.PEI","nodes":{"innll3d8QMuo_iYqQCBm_Q":{"name":"loadbalancer01.fr","transport_address":"inet[/192.168.200.213:9300]","host":"loadbalancer01.fr","ip":"192.168.200.213","version":"1.6.2","build":"6220391","http_address":"inet[loadbalancer01.fr/192.168.200.213:9200]","attributes":{"data":"false","master":"false"},"http":{"bound_address":"inet[/0.0.0.0:9200]","publish_address":"inet[/192.168.200.213:9200]","max_content_length_in_bytes":104857600}},"HuZ-Vpy0QwSTzRkclkMpRg":{"name":"node05.fr","transport_address":"inet[/192.168.200.29:9300]","host":"node05.fr","ip":"192.168.200.29","version":"1.6.2","build":"6220391","http_address":"inet[node05.fr/192.168.200.29:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.29:9200]","publish_address":"inet[/192.168.200.29:9200]","max_content_length_in_bytes":104857600}},"qOaZ0CXsRL-80-LnbEHk1Q":{"name":"uabigname01.fr","transport_address":"inet[/192.168.200.23:9300]","host":"uabigname01.fr","ip":"192.168.200.23","version":"1.6.2","build":"6220391","attributes":{"data":"false","master":"true"}},"1RKHMmjcQgGyl9n_6VsDuw":{"name":"node06.fr","transport_address":"inet[node06.fr/192.168.200.30:9300]","host":"node06.fr","ip":"192.168.200.30","version":"1.6.2","build":"6220391","http_address":"inet[node06.fr/192.168.200.30:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.30:9200]","publish_address":"inet[node06.fr/192.168.200.30:9200]","max_content_length_in_bytes":104857600}},"VQEqVFm4RUGUVtJufmJdAQ":{"name":"node01.fr","transport_address":"inet[/192.168.200.25:9300]","host":"node01.fr","ip":"192.168.200.25","version":"1.6.2","build":"6220391","http_address":"inet[node01.fr/192.168.200.25:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.25:9200]","publish_address":"inet[/192.168.200.25:9200]","max_content_length_in_bytes":104857600}},"MwZk6zsfRX65xkc7vnNj-A":{"name":"node04.fr","transport_address":"inet[/192.168.200.28:9300]","host":"node04.fr","ip":"192.168.200.28","version":"1.6.2","build":"6220391","http_address":"inet[node04.fr/192.168.200.28:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.28:9200]","publish_address":"inet[/192.168.200.28:9200]","max_content_length_in_bytes":104857600}},"ujZ7vRpZTyKcWyBsrI72vg":{"name":"node03.fr","transport_address":"inet[/192.168.200.27:9300]","host":"node03.fr","ip":"192.168.200.27","version":"1.6.2","build":"6220391","http_address":"inet[node03.fr/192.168.200.27:9200]","attributes":{"master":"false"},"http":{"bound_address":"inet[/192.168.200.27:9200]","publish_address":"inet[/192.168.200.27:9200]","max_content_length_in_bytes":104857600}}}}]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.30:9200][my_index/_aliases] w/ payload [null]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"my_index":{"aliases":{}}}]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Tx [GET]@[192.168.200.30:9200][my_index/my_type/_mapping] w/ payload [null]
15/11/10 15:24:18 TRACE CommonsHttpTransport: Rx @[192.168.200.208] [200-OK] [{"my_index":{"mappings":{"my_type":{"properties":{"account_number":{"type":"long"},"address":{"type":"string"},"age":{"type":"long"},"balance":{"type":"long"},"city":{"type":"string"},"email":{"type":"string"},"employer":{"type":"string"},"firstname":{"type":"string"},"gender":{"type":"string"},"lastname":{"type":"string"},"state":{"type":"string"}}}}}}]
15/11/10 15:24:18 DEBUG RestRepository: Closing repository and connection to Elasticsearch ...
15/11/10 15:24:18 DEBUG RestRepository: Sending batch of [0] bytes/[0] entries
15/11/10 15:24:18 TRACE CommonsHttpTransport: Closing HTTP transport to 192.168.200.30:9200
[Stage 0:>                                                          (0 + 2) / 5]15/11/10 15:24:20 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, node03.fr): org.apache.spark.util.TaskCompletionListenerException: [POST] on [my_index/my_type/_search?search_type=scan&scroll=5&size=50&preference=_shards:0;_only_node:HuZ-Vpy0QwSTzRkclkMpRg] failed; server[192.168.200.29:9200] returned [401|Unauthorized:]
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:90)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
        at $iwC$$iwC$$iwC.<init>(<console>:44)
        at $iwC$$iwC.<init>(<console>:46)
        at $iwC.<init>(<console>:48)
        at <init>(<console>:50)
        at .<init>(<console>:54)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.spark.util.TaskCompletionListenerException: [POST] on [my_index/my_type/_search?search_type=scan&scroll=5&size=50&preference=_shards:0;_only_node:HuZ-Vpy0QwSTzRkclkMpRg] failed; server[192.168.200.29:9200] returned [401|Unauthorized:]
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:90)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

The error is not the same as the one I posted at the beginning (maybe there was a mistake with an old "export http_proxy" in memory). Now this is a 401 unauthorized error.
I checked the network traces and I saw the following:

  1. The spark client machine (which is the Spark Driver) uses the basic auth given when spark-shell was started (--conf spark.es.net.http.auth.user=username --conf spark.es.net.http.auth.pass=password)
  2. The Driver is talking with the client node (I called it loadbalancer in my first post). The basic auth is used during this step.
REQUEST
GET / HTTP/1.1
Authorization: Basic c2ltbzEyMzQ6QWRtaW4xMjMt
User-Agent: Jakarta Commons-HttpClient/3.1
Host: loadbalancer01.fr:9200

RESPONSE
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Content-Length: 359

Three HTTP GET are visible. All of these requests are accepted with a 200 OK.

  • GET HTTP /
  • GET HTTP /_nodes/_transport
  • GET HTTP /_nodes/http
  1. The Driver is talking with a data node (node04.fr). The basic auth is also used here.
REQUEST
HEAD /my_index/my_type HTTP/1.1
Authorization: Basic c2ltbzEyMzQ6QWRtaW4xMjMt
User-Agent: Jakarta Commons-HttpClient/3.1
Host: node04.fr:9200

RESPONSE
HTTP/1.1 200 OK
Content-Type: text/plain; charset=UTF-8
Content-Length: 0

Five HTTP requests are visible. All of these requests are accepted with a 200 OK.

  • HEAD HTTP /my_index/my_type
  • GET HTTP /my_index/_search_shards
  • GET HTTP /_nodes/http
  • GET HTTP /my_index/_aliases
  • GET HTTP /my_index/my_type/_mapping
  1. Now the executors are working. For example, the node07 is talking with the node06. In that part the basic auth is not done.

Several HTTP POST are done repeatedly. All of these requests are refused with a 401 unauthorized.

REQUEST
POST /my_index/my_type/_search?search_type=scan&scroll=5&size=50&preference=_shards:0;_only_node:1RKHMmjcQgGyl9n_6VsDuw HTTP/1.1
User-Agent: Jakarta Commons-HttpClient/3.1
Host: 192.168.200.30:9200
Content-Length: 44
Content-Type: application/json; charset=UTF-8

RESPONSE
HTTP/1.1 401 Unauthorized
WWW-Authenticate: Basic realm="Search Guard"
Content-Type: text/plain; charset=UTF-8
Content-Length: 0

In think this is the origin of the issue. The ES-Hadoop connector doesn't seem to transmit the credentials to the executors and that's why I get the 401.

@costin

This comment has been minimized.

Show comment
Hide comment
@costin

costin Nov 15, 2015

Member

@ArcTheMaster Thanks again for the detailed response and for your time. I'll check why the configuration is not passed to the workers. I'm guessing you are specifying the configuration initially on the SparkConf/Context and that's it, right?
This might be caused by another issue that was reported a while back about the driver configuration not being properly sent to the drivers.
A quick workaround (albeit a verbose one) would be to specify the config as a Map and pass that to the ES-Hadoop/Spark methods.

By the way, your assesment and diagnostic (again, really appreciated) is correct - during the discovery (on the driver) the correct configuration is used and the cluster state is evaluated but on the workers, the config seems to be lost somehow. Need to investigate why.

Member

costin commented Nov 15, 2015

@ArcTheMaster Thanks again for the detailed response and for your time. I'll check why the configuration is not passed to the workers. I'm guessing you are specifying the configuration initially on the SparkConf/Context and that's it, right?
This might be caused by another issue that was reported a while back about the driver configuration not being properly sent to the drivers.
A quick workaround (albeit a verbose one) would be to specify the config as a Map and pass that to the ES-Hadoop/Spark methods.

By the way, your assesment and diagnostic (again, really appreciated) is correct - during the discovery (on the driver) the correct configuration is used and the cluster state is evaluated but on the workers, the config seems to be lost somehow. Need to investigate why.

@ArcTheMaster

This comment has been minimized.

Show comment
Hide comment
@ArcTheMaster

ArcTheMaster Nov 17, 2015

@costin Thanks for your feedback. You're right, I'm specifying the configuration on the SparkConf/Context. I'm not using spark-submit in this use case, only the spark-shell. I specify the configuration as follow:

spark-shell -v --jars /opt/application/Spark/current/elastic/jar/elasticsearch-hadoop-2.1.1.jar --name elasticsearch-hadoop --master yarn-client --conf spark.es.net.ssl=true --conf spark.es.net.http.auth.user=username --conf spark.es.net.http.auth.pass=password --conf spark.es.nodes=loadbalancer.fr --conf spark.es.port=9200 --conf spark.es.field.read.empty.as.null=true --conf spark.es.resource=index/type

I don't stop the SparkContext once spark-shell has started. I talked with the Spark JIRA support and they told me that stopping the SparkContext in the shell is not supported (it can work, but it can also fail). That's why I'm specifiying the ES connector configuration at startup.

About the workaround, I won't be able to test it now. When I find some time, I will give it a try.

Please let me know when the issue is corrected. Don't hesitate to ask me if you need further input.

ArcTheMaster commented Nov 17, 2015

@costin Thanks for your feedback. You're right, I'm specifying the configuration on the SparkConf/Context. I'm not using spark-submit in this use case, only the spark-shell. I specify the configuration as follow:

spark-shell -v --jars /opt/application/Spark/current/elastic/jar/elasticsearch-hadoop-2.1.1.jar --name elasticsearch-hadoop --master yarn-client --conf spark.es.net.ssl=true --conf spark.es.net.http.auth.user=username --conf spark.es.net.http.auth.pass=password --conf spark.es.nodes=loadbalancer.fr --conf spark.es.port=9200 --conf spark.es.field.read.empty.as.null=true --conf spark.es.resource=index/type

I don't stop the SparkContext once spark-shell has started. I talked with the Spark JIRA support and they told me that stopping the SparkContext in the shell is not supported (it can work, but it can also fail). That's why I'm specifiying the ES connector configuration at startup.

About the workaround, I won't be able to test it now. When I find some time, I will give it a try.

Please let me know when the issue is corrected. Don't hesitate to ask me if you need further input.

costin added a commit that referenced this issue Nov 18, 2015

[SPARK] Handle cmd-line passed properties when saving
When saving Spark properties, handle command line properties so they can be picked
up down the stream

relates #591
@costin

This comment has been minimized.

Show comment
Hide comment
@costin

costin Nov 18, 2015

Member

@ArcTheMaster I've committed a fix in master that should fix the issue. I've triggered the nightly build so hopefully in 20' max, one should be available with the fix it. Can you please try it out - it's the dev build in this case 2.2.0.BUILD-SNAPSHOT?

Member

costin commented Nov 18, 2015

@ArcTheMaster I've committed a fix in master that should fix the issue. I've triggered the nightly build so hopefully in 20' max, one should be available with the fix it. Can you please try it out - it's the dev build in this case 2.2.0.BUILD-SNAPSHOT?

costin added a commit that referenced this issue Nov 19, 2015

@ArcTheMaster

This comment has been minimized.

Show comment
Hide comment
@ArcTheMaster

ArcTheMaster Nov 20, 2015

@costin, wow I thank you for your reactivity on this subject. It's really appreciated.
I tested the dev build 2.2.0.BUILD-SNAPSHOT (that one elasticsearch-hadoop-2.2.0.BUILD-20151119.024605-154.zip to be precise) this morning and I wasn't able to reproduce the error during my tests. I think we can say it's corrected.
Will you put this update in the next stable release of the ES connector?
Once again, I thank you for your support on this topic.

ArcTheMaster commented Nov 20, 2015

@costin, wow I thank you for your reactivity on this subject. It's really appreciated.
I tested the dev build 2.2.0.BUILD-SNAPSHOT (that one elasticsearch-hadoop-2.2.0.BUILD-20151119.024605-154.zip to be precise) this morning and I wasn't able to reproduce the error during my tests. I think we can say it's corrected.
Will you put this update in the next stable release of the ES connector?
Once again, I thank you for your support on this topic.

@costin

This comment has been minimized.

Show comment
Hide comment
@costin

costin Nov 20, 2015

Member

Thank for confirming. Yes, the fix will be available in the next version (see the issue tags in the right column).
Thanks for reporting (even more so in so great detail).

Cheers,

Member

costin commented Nov 20, 2015

Thank for confirming. Yes, the fix will be available in the next version (see the issue tags in the right column).
Thanks for reporting (even more so in so great detail).

Cheers,

@ArcTheMaster

This comment has been minimized.

Show comment
Hide comment
@ArcTheMaster

ArcTheMaster Nov 23, 2015

Ok thanks for the last info. I'm going to use the BETA version of the connector for now and I will use the stable version when it will be released.

Can I close the issue or do I need to keep it open?

Cheers

ArcTheMaster commented Nov 23, 2015

Ok thanks for the last info. I'm going to use the BETA version of the connector for now and I will use the stable version when it will be released.

Can I close the issue or do I need to keep it open?

Cheers

@costin costin closed this Jan 8, 2016

costin added a commit that referenced this issue Jan 16, 2016

[SPARK] Handle cmd-line passed properties when saving
When saving Spark properties, handle command line properties so they can be picked
up down the stream

relates #591

(cherry picked from commit 9695742)

costin added a commit that referenced this issue Jan 16, 2016

Add test for properties handling
relates #591

(cherry picked from commit 8d82071)
@yogeshdarji99

This comment has been minimized.

Show comment
Hide comment
@yogeshdarji99

yogeshdarji99 Nov 21, 2016

I am also trying to run the same code, but inside a container. So my spark and elasticsearch are running on different conatiners in the same box. I am getting below error. It seems it is trying to connect to local host instead of 172.17.0.3(another container where ES is running).

I am running below command to start spark shell:

./bin/spark-shell --master local[4] --packages "org.elasticsearch:elasticsearch-spark_2.10:2.1.0.Beta2" --name elasticsearch-hadoop --conf spark.es.net.ssl=true --conf spark.es.nodes=172.17.0.3 --conf spark.es.port=9200 --conf spark.es.field.read.empty.as.null=true --conf spark.es.resource=index/type

scala> val RDD = sc.esRDD("index/type/", "?q=Ge*")
RDD: org.elasticsearch.spark.rdd.ScalaEsRDD = ScalaEsRDD[0] at RDD at AbstractEsRDD.scala:17

scala>RDD.count

> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection refused (Connection refused)
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: Retrying request
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection refused (Connection refused)
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: Retrying request
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection refused (Connection refused)
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: Retrying request
> 16/11/21 21:26:06 ERROR rest.NetworkClient: Node [Connection refused (Connection refused)] failed **(localhost:9200)**; no other nodes left - aborting...
> org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[localhost:9200]]
>   at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:123)
>   at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:303)
>   at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:287)
>   at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:291)
>   at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:118)
>   at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:100)
>   at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:57)
>   at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:220)
>   at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:49)
>   at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:48)
>   at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:25)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
> 

Can anyone help me figure out the issue here? Thanks in advance

yogeshdarji99 commented Nov 21, 2016

I am also trying to run the same code, but inside a container. So my spark and elasticsearch are running on different conatiners in the same box. I am getting below error. It seems it is trying to connect to local host instead of 172.17.0.3(another container where ES is running).

I am running below command to start spark shell:

./bin/spark-shell --master local[4] --packages "org.elasticsearch:elasticsearch-spark_2.10:2.1.0.Beta2" --name elasticsearch-hadoop --conf spark.es.net.ssl=true --conf spark.es.nodes=172.17.0.3 --conf spark.es.port=9200 --conf spark.es.field.read.empty.as.null=true --conf spark.es.resource=index/type

scala> val RDD = sc.esRDD("index/type/", "?q=Ge*")
RDD: org.elasticsearch.spark.rdd.ScalaEsRDD = ScalaEsRDD[0] at RDD at AbstractEsRDD.scala:17

scala>RDD.count

> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection refused (Connection refused)
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: Retrying request
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection refused (Connection refused)
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: Retrying request
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: I/O exception (java.net.ConnectException) caught when processing request: Connection refused (Connection refused)
> 16/11/21 21:26:06 INFO httpclient.HttpMethodDirector: Retrying request
> 16/11/21 21:26:06 ERROR rest.NetworkClient: Node [Connection refused (Connection refused)] failed **(localhost:9200)**; no other nodes left - aborting...
> org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[localhost:9200]]
>   at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:123)
>   at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:303)
>   at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:287)
>   at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:291)
>   at org.elasticsearch.hadoop.rest.RestClient.get(RestClient.java:118)
>   at org.elasticsearch.hadoop.rest.RestClient.discoverNodes(RestClient.java:100)
>   at org.elasticsearch.hadoop.rest.InitializationUtils.discoverNodesIfNeeded(InitializationUtils.java:57)
>   at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:220)
>   at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:49)
>   at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:48)
>   at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:25)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
> 

Can anyone help me figure out the issue here? Thanks in advance

@jbaiera

This comment has been minimized.

Show comment
Hide comment
@jbaiera

jbaiera Nov 21, 2016

Contributor

@yogeshdarji99 Please upgrade to a more recent version of ES-Hadoop. The ability to recognize spark command line options was added after the version you are using (2.1.0.Beta2). I would recommend using the most recent versions of ES-Hadoop (5.0.1 or 2.4.0 if you really need to). I'd steer clear of the beta and alpha versions unless you are doing early access testing for releases as they often contain quite a few bugs!

Contributor

jbaiera commented Nov 21, 2016

@yogeshdarji99 Please upgrade to a more recent version of ES-Hadoop. The ability to recognize spark command line options was added after the version you are using (2.1.0.Beta2). I would recommend using the most recent versions of ES-Hadoop (5.0.1 or 2.4.0 if you really need to). I'd steer clear of the beta and alpha versions unless you are doing early access testing for releases as they often contain quite a few bugs!

@yogeshdarji99

This comment has been minimized.

Show comment
Hide comment
@yogeshdarji99

yogeshdarji99 Nov 21, 2016

@jbaiera I also tried Elasticsearch-spark with 2.4.0 and 5.0.0-alph2(since 5.0.0 is nota available). It is not working. Still trying to connect to localhost instead of 172.17.0.3:9200. Is it still in development phase?

yogeshdarji99 commented Nov 21, 2016

@jbaiera I also tried Elasticsearch-spark with 2.4.0 and 5.0.0-alph2(since 5.0.0 is nota available). It is not working. Still trying to connect to localhost instead of 172.17.0.3:9200. Is it still in development phase?

@vaibhavtupe

This comment has been minimized.

Show comment
Hide comment
@vaibhavtupe

vaibhavtupe Nov 21, 2016

Hi @jbaiera / @yogeshdarji99 , I am also facing same issue while using it in container. I have simple application which uses sparkSQL to process the data and store it in Elasticsearch.

Elasticsearch version: 2.4.1
Maven dependency added in project :

 <dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark_2.10</artifactId>
<version>2.4.0</version>
</dependency>

project properties :
ENV elasticsearch.nodes 10.222.171.57
ENV elasticsearch.port 31921

Error:

`10.202.151.57:31921]
 | 2016-11-21 23:01:18.989  INFO 1 --- [launch worker-0] o.e.spark.sql.EsDataFrameWriter          : Writing to [testAnalyticsApp/proxy_data]
 | 2016-11-21 23:03:26.258  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
 | 2016-11-21 23:03:26.259  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : Retrying request
 | 2016-11-21 23:05:33.618  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
  | 2016-11-21 23:05:33.619  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : Retrying request
  | 2016-11-21 23:07:40.978  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
 | 2016-11-21 23:07:40.979  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : Retrying request
 | 2016-11-21 23:09:48.338 ERROR 1 --- [launch worker-0] o.e.hadoop.rest.NetworkClient            : Node [172.17.0.5:9200] failed (Connection timed out); no other nodes left - aborting...
  | 2016-11-21 23:09:48.345 ERROR 1 --- [launch worker-0] org.apache.spark.executor.Executor       : Exception in task 0.0 in stage 0.0 (TID 0)
  | 
  | org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.5:9200]] 
 | 	at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
  | 	at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
  | 	at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
  | 	at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
  | 	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:59) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:59) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) ~[spark-core_2.10-1.6.1.jar!/:1.6.1]
 | 	at org.apache.spark.scheduler.Task.run(Task.scala:89) ~[spark-core_2.10-1.6.1.jar!/:1.6.1]
  | 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ~[spark-core_2.10-1.6.1.jar!/:1.6.1]
 | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102]
  | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102]
 | 	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102]
`

vaibhavtupe commented Nov 21, 2016

Hi @jbaiera / @yogeshdarji99 , I am also facing same issue while using it in container. I have simple application which uses sparkSQL to process the data and store it in Elasticsearch.

Elasticsearch version: 2.4.1
Maven dependency added in project :

 <dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark_2.10</artifactId>
<version>2.4.0</version>
</dependency>

project properties :
ENV elasticsearch.nodes 10.222.171.57
ENV elasticsearch.port 31921

Error:

`10.202.151.57:31921]
 | 2016-11-21 23:01:18.989  INFO 1 --- [launch worker-0] o.e.spark.sql.EsDataFrameWriter          : Writing to [testAnalyticsApp/proxy_data]
 | 2016-11-21 23:03:26.258  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
 | 2016-11-21 23:03:26.259  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : Retrying request
 | 2016-11-21 23:05:33.618  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
  | 2016-11-21 23:05:33.619  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : Retrying request
  | 2016-11-21 23:07:40.978  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : I/O exception (java.net.ConnectException) caught when processing request: Connection timed out
 | 2016-11-21 23:07:40.979  INFO 1 --- [launch worker-0] o.a.c.httpclient.HttpMethodDirector      : Retrying request
 | 2016-11-21 23:09:48.338 ERROR 1 --- [launch worker-0] o.e.hadoop.rest.NetworkClient            : Node [172.17.0.5:9200] failed (Connection timed out); no other nodes left - aborting...
  | 2016-11-21 23:09:48.345 ERROR 1 --- [launch worker-0] org.apache.spark.executor.Executor       : Exception in task 0.0 in stage 0.0 (TID 0)
  | 
  | org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.5:9200]] 
 | 	at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
  | 	at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
  | 	at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
  | 	at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
  | 	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:59) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.elasticsearch.spark.sql.EsSparkSQL$$anonfun$saveToEs$1.apply(EsSparkSQL.scala:59) ~[elasticsearch-spark_2.10-2.4.0.jar!/:2.4.0]
 | 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) ~[spark-core_2.10-1.6.1.jar!/:1.6.1]
 | 	at org.apache.spark.scheduler.Task.run(Task.scala:89) ~[spark-core_2.10-1.6.1.jar!/:1.6.1]
  | 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) ~[spark-core_2.10-1.6.1.jar!/:1.6.1]
 | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102]
  | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102]
 | 	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102]
`
@vaibhavtupe

This comment has been minimized.

Show comment
Hide comment
@vaibhavtupe

vaibhavtupe Nov 22, 2016

@jbaiera @yogeshdarji99 , I found out the problem. I had not specified the property
" es.nodes.wan.only = true " my bad. Specifying this property solved my problem

vaibhavtupe commented Nov 22, 2016

@jbaiera @yogeshdarji99 , I found out the problem. I had not specified the property
" es.nodes.wan.only = true " my bad. Specifying this property solved my problem

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment