Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark]Is there a way to make elasticsearch-hadoop stick to the load-balancer(client), instead of going trying to ping all the data nodes? #373

Closed
wingchen opened this issue Feb 6, 2015 · 16 comments

Comments

@wingchen
Copy link

wingchen commented Feb 6, 2015

This is spark related.

I got the following exception with the code:

scala> sc.getConf.get("es.nodes.discovery")
res2: String = false

scala> val rdd = sc.esRDD("news/searchable-news-article")
rdd: org.elasticsearch.spark.rdd.ScalaEsRDD = ScalaEsRDD[1] at RDD at AbstractEsRDD.scala:17

scala> rdd.count()

Exception:

org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot find node with id [gy_92kljRluUFT9N022KDw] (is HTTP enabled?) from shard
...
    at org.elasticsearch.hadoop.util.Assert.notNull(Assert.java:40)
    at org.elasticsearch.hadoop.rest.RestRepository.getReadTargetShards(RestRepository.java:277)
    at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:241)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:49)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:48)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:25)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)

It seems to go and try other nodes even after es.nodes.discovery is explicitly turned off.

Here comes the /_nodes?pretty (I removed some sensitive info):

{
  "ok" : true,
  "cluster_name" : "nyuzu",
  "nodes" : {
    "JRpaF5f7T2uLfJ90-sLBLA" : {
      "name" : "es-client001.mycom.com",
      "transport_address" : "inet[/11.222.111.68:9300]",
      "hostname" : "es-client001.mycom.com",
      "version" : "0.90.5",
      "http_address" : "inet[/11.222.111.68:9500]",
      "attributes" : {
        "data" : "false",
        "max_local_storage_nodes" : "1",
        "master" : "false"
      }
    },
    "Q_5mAcbBS_KGm6dEVp3oqQ" : {
      "name" : "es-data002.mycom.com",
      "transport_address" : "inet[/11.333.222.69:9300]",
      "hostname" : "es-data002.mycom.com",
      "version" : "0.90.5",
      "attributes" : {
        "max_local_storage_nodes" : "1",
        "master" : "false"
      }
    },
    "4d63O0bTSxyoVLE7DNDuiA" : {
      "name" : "es-master01.mycom.com",
      "transport_address" : "inet[/11.221.163.11:9300]",
      "hostname" : "es-master01.mycom.com",
      "version" : "0.90.5",
      "attributes" : {
        "data" : "false",
        "max_local_storage_nodes" : "1",
        "master" : "true"
      }
    },
    "gy_92kljRluUFT9N022KDw" : {
      "name" : "es-data001.mycom.com",
      "transport_address" : "inet[/11.113.18.70:9300]",
      "hostname" : "es-data001.mycom.com",
      "version" : "0.90.5",
      "attributes" : {
        "max_local_storage_nodes" : "1",
        "master" : "false"
      }
    }
  }
}
@wingchen
Copy link
Author

wingchen commented Feb 6, 2015

It turns out that only es-data001 and es-data002 has data, es-client001 is the load-balancer, and es-master01 is in charge of the cluster.

Basically, this environment is set up based on this post:
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-node.html#modules-node

According to the code in RestRepository.getReadTargetShards, elasticsearch-hadoop tries to match shards with nodes, instead of sticking to the load-balancer (line 276 - line 280). That's why this error happens.

Is there a way to make elasticsearch-hadoop stick to the load-balancer?

@wingchen wingchen changed the title es.nodes.discovery is set to false, but sc.esRDD still goes out and ping other non-http-enabled nodes [Spark]Is there a way to make elasticsearch-hadoop stick to the load-balancer(client), instead of going trying to ping all the data nodes? Feb 6, 2015
@costin
Copy link
Member

costin commented Feb 7, 2015

@wingchen es-hadoop/spark relies on connecting to the data nodes directly to support a parallel, node-to-node architecture. In other words, for each read and write, for each shard of the target shard, es-hadoop/spark will create a task/split that works directly against the data node. This way, each task works locally with the data without impacting the rest of the cluster.

If we were to support client nodes, the benefits of such a parallelized query would go away since everything would be shoved through the client. There would be no locality and in fact, no parallelism. In fact, in your case, rather than having 2 parallel tasks reading data from es-data001 and es-data002, one would have a serialized task reading data from es-client001.

@costin
Copy link
Member

costin commented Feb 8, 2015

@wingchen I've pushed a fix for this in master; you can find the dev builds already in Maven (more information here). To restrict es-spark/hadoop set the newly introduced es.nodes.client.only property to true.
Feedback welcome!

@wingchen
Copy link
Author

wingchen commented Feb 9, 2015

@costin

Thanks for the quick response (and for replying during the weekend).

I agree with you. It's faster to have driver querying each individual nodes, instead of sticking to the client node. We are evaluating the impact of changing the config this way.

I got passed the previous error with this new code update. However, I still run into an Exception:

Exception in thread "main" org.elasticsearch.hadoop.EsHadoopIllegalStateException: Cluster state volatile; cannot find node backing shards - please check whether your cluster is stable
    at org.elasticsearch.hadoop.rest.RestRepository.getReadTargetShards(RestRepository.java:281)
    at org.elasticsearch.hadoop.rest.RestService.findPartitions(RestService.java:248)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions$lzycompute(AbstractEsRDD.scala:49)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.esPartitions(AbstractEsRDD.scala:48)
    at org.elasticsearch.spark.rdd.AbstractEsRDD.getPartitions(AbstractEsRDD.scala:25)

It turns out RestRepository.getReadTargetShards calls RestRepository.doGetReadTargetShards, which in turn calls RestClient.targetShards. And since RestRepository.doGetReadTargetShards line no 332 again uses all the shard info to match their nodes, null is then returned at line no 335.

@costin
Copy link
Member

costin commented Feb 9, 2015

@wingchen What's your configuration? Can you please turn on logging all the way to TRACE level on org.elasticsearch.spark and org.elasticsearch.hadoop packages and save the output in a gist?
Indeed no nodes are found but it's unclear why that is - the logs should help clarify this.

@wingchen
Copy link
Author

wingchen commented Feb 9, 2015

This is too little for a gist, so I am posting it here:

15/02/09 13:43:21 DEBUG JavaEsRDD: Found client nodes [10.10.10.10:9500]
15/02/09 13:43:21 DEBUG JavaEsRDD: Filtered discovered only nodes [develop.mycom.com:9200] to client-only []
15/02/09 13:43:21 DEBUG JavaEsRDD: Discovered Elasticsearch version [0.90.5]

It turns out that all the other logs are the same, but JavaEsRDD did not find any client-only nodes.

I am looking into why too.

@costin
Copy link
Member

costin commented Feb 9, 2015

There should be a lot more than that such as what data is sent and received from Elastic; make sure to enable TRACE on the two packages I mentioned above.
At a quick glance it might be that you are using a list nodes which are not client, disabled discovery and restrict es-hadoop to use only client nodes. At runtime, the configured nodes are eliminated (since they are not client) and an empty list is returned.
This should be handled in es proper but until then, can you verify this hypothesis?
Thanks,

@wingchen
Copy link
Author

wingchen commented Feb 9, 2015

@costin

Correction. The only one still exists. I have updated all the logs to: https://gist.github.com/wingchen/7eb26cffbe59f6626e5a

The client node was activated successfully though:

15/02/09 15:34:31 DEBUG JavaEsRDD: Found client nodes [10.223.128.68:9500]
15/02/09 15:34:31 DEBUG JavaEsRDD: Filtered discovered only nodes [develop.es.puppy.int.mycom.com:9200, 10.223.128.68:9500] to client-only [10.223.128.68:9500] 

@costin
Copy link
Member

costin commented Feb 10, 2015

@wingchen I've added a fix in master and pushed another build. There are better messages in case no client nodes are found (including the case of discovery being disabled) and also better node filtering. Can you please try it out and report back.
Depending on your time zone, I'll try to be around and maybe if there are issues, we can continue the discussion on the IRC.

@wingchen
Copy link
Author

@costin

Thanks a lot for the update. I am in PST.

Unfortunately we got an NPE on the new commit: https://gist.github.com/wingchen/f5b2d86ca128c7aea2b4

ShardSorter.find(info, httpNodes, log) on line 304 in RestRepository seems to return null.

@wingchen
Copy link
Author

@costin Do you have any update? thanks

@costin
Copy link
Member

costin commented Feb 12, 2015

I've pushed an update a couple of days ago but forgot to comment :( - can you please try it out and report back? If you're online, it would be great if we could connect over the IRC to get to the bottom of this. My user is costin

@wingchen
Copy link
Author

I am in IRC now.

@costin
Copy link
Member

costin commented Feb 13, 2015

@wingchen The final fix has been committed and its relevant build pushed to maven. Try it out and let me know if we can close this issue.

Cheers,

@wingchen
Copy link
Author

confirmed fixed. thanks

@costin
Copy link
Member

costin commented Apr 28, 2015

Closing the issue.

@costin costin closed this as completed Apr 28, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants