Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only one executor doing the join/shuffle #643

Closed
shockdm opened this issue Oct 1, 2019 · 4 comments
Closed

Only one executor doing the join/shuffle #643

shockdm opened this issue Oct 1, 2019 · 4 comments

Comments

@shockdm
Copy link

shockdm commented Oct 1, 2019

Hi all,

I am relatively new to Spark and am having a problem with my workload running on Spark on K8s. The workflow is rather simple:

  1. Read data from disk.
  2. Export data from ElasticSearch.
  3. Join it with data read from disk.
  4. Load it into final storage.

What I am seeing is - stages 1 and 4 performed by all executors concurrently, while stage 2/3 seems to be performed only by a single executor, while others stand idle waiting. The setup I have has CephFS as the shared filesystem across all executors and driver, from where the "data on disk" is read. Am I doing something completely wrong? Has anyone else experienced this problem?

Thanks in advance.

@shockdm shockdm changed the title Only one executor doing the join Only one executor doing the join/shuffle Oct 1, 2019
@runzhliu
Copy link
Contributor

runzhliu commented Oct 2, 2019

@shockdm

while stage 2/3 seems to be performed only by a single executor, while others stand idle waiting

it could happen when you have only one task at stage 2 or 3, which means the only task would be scheduled to one executor.
This is something about Spark not for the Operator here, maybe you could find tips from the Spark Documents.

@shockdm
Copy link
Author

shockdm commented Oct 2, 2019

@runzhliu The problem though is that this behaviour is not seen on a Yarn cluster we were using previously. Before - all of the steps were spread evenly across all of the executors. Specifically, the stage that is being worked on by a single executor - has 200 partitions, but is for some reason only taken care of sequentially by a single executor.

This started showing up only when we started trying Spark on K8s... Perhaps there is something wrong with the way the setup is? Like I said, we have a Spark on K8s cluster, that is backed by CephFS which is mounter as a RWX volume in every executor and driver. Data is shared in that volume and made accessible to every participant that way.

@runzhliu
Copy link
Contributor

runzhliu commented Oct 7, 2019

Hi @shockdm , you probably should try to read data from local filesystem or else to see if it would happen again. What's more, do you load the data from CephFS also on Yarn Cluster? I don't think we can do it directly on the official Spark, so you have some private changes on Apache Spark?

@shockdm
Copy link
Author

shockdm commented Oct 7, 2019

@runzhliu to answer your questions..

you probably should try to read data from local filesystem or else to see if it would happen again

We mount CephFS as a RWX volume in the Spark pods. So to Spark pods - this is being treated as local file system. Unless I am misunderstanding the question.

What's more, do you load the data from CephFS also on Yarn Cluster? I don't think we can do it directly on the official Spark, so you have some private changes on Apache Spark?

On Yarn cluster we do not - we use HDFS. However we've used HDFS previously with the K8s cluster as well, and have gotten the same results as we get on CephFS. CephFS volume is shared across multiple pods (driver and executors) as a shared local volume, akin to NFS. No other special handling is used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants