Skip to content

Conversation

@AngersZhuuuu
Copy link
Contributor

@AngersZhuuuu AngersZhuuuu commented Oct 23, 2020

What changes were proposed in this pull request?

Followup from #27831 , origin author @chrysan.

Each request it will check chunksBeingTransferred

public long chunksBeingTransferred() {
    long sum = 0L;
    for (StreamState streamState: streams.values()) {
      sum += streamState.chunksBeingTransferred.get();
    }
    return sum;
  }

such as

long chunksBeingTransferred = streamManager.chunksBeingTransferred();
    if (chunksBeingTransferred >= maxChunksBeingTransferred) {
      logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
        chunksBeingTransferred, maxChunksBeingTransferred);
      channel.close();
      return;
    }

It will traverse streams repeatedly and we know that fetch data chunk will access stream too, there cause two problem:

  1. repeated traverse streams, the longer the length, the longer the time
  2. lock race in ConcurrentHashMap streams

In this PR, when maxChunksBeingTransferred use default value, we avoid compute chunksBeingTransferred since we don't care about this. If user want to set this configuration and meet performance problem, you can also backport PR #27831

Why are the changes needed?

Speed up getting chunksBeingTransferred and avoid lock race in object streams

Does this PR introduce any user-facing change?

No

How was this patch tested?

Existed UT

@AngersZhuuuu
Copy link
Contributor Author

AngersZhuuuu commented Oct 23, 2020

ping @wangyum @dongjoon-hyun @HeartSaVioR @jiangxb1987 @Ngone51
With patch duration grows linearly in streams's length. The results seem to be in line with our expectations.

@wangyum
Copy link
Member

wangyum commented Oct 23, 2020

Thank you @AngersZhuuuu' number. We have been running this change for more than half a year.

@SparkQA
Copy link

SparkQA commented Oct 23, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34799/

@SparkQA
Copy link

SparkQA commented Oct 23, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34799/

@SparkQA
Copy link

SparkQA commented Oct 23, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34800/

@SparkQA
Copy link

SparkQA commented Oct 23, 2020

Test build #130198 has finished for PR 30139 at commit 6857e4e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 23, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34800/

@SparkQA
Copy link

SparkQA commented Oct 23, 2020

Test build #130199 has finished for PR 30139 at commit 72e7b87.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mridulm
Copy link
Contributor

mridulm commented Oct 24, 2020

+CC @otterc

@dongjoon-hyun
Copy link
Member

Could you add an empty commit whose authorship is the original author, @AngersZhuuuu . If then, Apache Spark merge script can give both of you the authorship properly.

@AngersZhuuuu
Copy link
Contributor Author

Could you add an empty commit whose authorship is the original author, @AngersZhuuuu . If then, Apache Spark merge script can give both of you the authorship properly.

Done this. thanks for mention this point.

@SparkQA
Copy link

SparkQA commented Oct 25, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34841/

@SparkQA
Copy link

SparkQA commented Oct 25, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34841/

@SparkQA
Copy link

SparkQA commented Oct 25, 2020

Test build #130241 has finished for PR 30139 at commit 0654684.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned that the totalChunksBeingTransferred can diverge over time from state of streams when there are concurrent updates. Either updates to both should be within the same critical section, or we should carefully ensure there is no potential for divergence. Can you relook and ensure this is not the case ?

For example, can there be concurrent execution between chunkBeingSent, chunkSent and connectionTerminated ? If yes, can we ensure the state remains consistent.

@AngersZhuuuu
Copy link
Contributor Author

I am concerned that the totalChunksBeingTransferred can diverge over time from state of streams when there are concurrent updates. Either updates to both should be within the same critical section, or we should carefully ensure there is no potential for divergence. Can you relook and ensure this is not the case ?

For example, can there be concurrent execution between chunkBeingSent, chunkSent and connectionTerminated ? If yes, can we ensure the state remains consistent.

In origin code, we just use a concurrent hash map(streams) and also there no strong locking mechanism. And chunksBeingTransferred is only used to check with config.

    long chunksBeingTransferred = streamManager.chunksBeingTransferred();
    if (chunksBeingTransferred >= maxChunksBeingTransferred) {
      logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
        chunksBeingTransferred, maxChunksBeingTransferred);
      channel.close();
      return;
    }

IMO, we can add a lock to keep strong consistence of value totalChunksBeingTransferred, such as
image

And I have run the test in desc, this change has little effect on performance. WDYT?

@jiangxb1987
Copy link
Contributor

Yes we should ensure the streamState and the totalChunksBeingTransfered are updated synchronically. Other than that the PR looks good!


private final AtomicLong nextStreamId;
private final ConcurrentHashMap<Long, StreamState> streams;
private final AtomicLong totalChunksBeingTransferred = new AtomicLong(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename to numChunksBeingTransferred? Because it's not accumulating all the chunks that are transferred in history.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename to numChunksBeingTransferred? Because it's not accumulating all the chunks that are transferred in history.

Updated

@otterc
Copy link
Contributor

otterc commented Oct 27, 2020

IMO, we can add a lock to keep strong consistence of value totalChunksBeingTransferred, such as
image

And I have run the test in desc, this change has little effect on performance. WDYT?

This should have a considerable impact on the performance when there are multiple open streams because updates of different streams would lock on a single object totalChunksBeingTransferred. Isn't that the case?

@AngersZhuuuu
Copy link
Contributor Author

AngersZhuuuu commented Oct 27, 2020

This should have a considerable impact on the performance when there are multiple open streams because updates of different streams would lock on a single object totalChunksBeingTransferred. Isn't that the case?

Since totalChunksBeingTransfereed is atomic, when it update It has its own competition, add a lock at totalChunksBeingTransfereed won't have too much impact. And we can keep strong consistences between the streamState and the totalChunksBeingTransfered are updated synchronically. Also run the test above, can't see any apparent influence.

@otterc
Copy link
Contributor

otterc commented Oct 27, 2020

This should have a considerable impact on the performance when there are multiple open streams because updates of different streams would lock on a single object totalChunksBeingTransferred. Isn't that the case?

Since totalChunksBeingTransfereed is atomic, when it update It has its own competition, add a lock at totalChunksBeingTransfereed won't have too much impact. And we can keep strong consistences between the streamState and the totalChunksBeingTransfered are updated synchronically.

Hmmm. Every update to chunkSent and chunkBeingSent will compete for the lock on object totalChunksBeingTransferred if we add the synchronize(totalChunksBeingTransferred). This would increase the time for these operations. This would mean that to speed up chunksBeingTransferred, we are increasing the time of updates to streamState.

@AngersZhuuuu
Copy link
Contributor Author

Yes we should ensure the streamState and the totalChunksBeingTransfered are updated synchronically. Other than that the PR looks good!

How about current change? Don't use AtomicLong but use synchronize to keep strong consistency. And the test result is

OneForOneStreamManager fetch data duration test:
Stream Size          Max           Min           Avg
10000           1796           187           497.5
50000           4214           1295           2267.9
100000           10635           3643           5800.3

Process finished with exit code 0

@AngersZhuuuu
Copy link
Contributor Author

AngersZhuuuu commented Oct 27, 2020

Hmmm. Every update to chunkSent and chunkBeingSent will compete for the lock on object totalChunksBeingTransferred if we add the synchronize(totalChunksBeingTransferred). This would increase the time for these operations. This would mean that to speed up chunksBeingTransferred, we are increasing the time of updates to streamState.

I know that, but as @jiangxb1987 @mridulm mentioned, we need to ensure the streamState and the totalChunksBeingTransfered are updated synchronically. Add this lock is a strong guarantee. The execution process in the middle of the lock is very fast, so the impact is not really significant. Also, It's a huge leap in performance compared to what it was before

@AngersZhuuuu
Copy link
Contributor Author

Hmmm. Every update to chunkSent and chunkBeingSent will compete for the lock on object totalChunksBeingTransferred

We reduce many race condition on streams and just add a very quick lock on. numChunksBeingTransferred

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Oct 27, 2020

Sorry but your latest change doesn't actually lock properly. Long is immutable, and you always replace the object when you do the calculation and assign to the field, which is used as a lock. Your best try would be changing it to long (to avoid box/unbox), and have a separate Object field as locking purpose.

@AngersZhuuuu
Copy link
Contributor Author

Sorry but your latest change doesn't actually lock properly. Long is immutable, and you always replace the object when you do the calculation and assign to the field, which is used as a lock. Your best try would be changing it to long, and have a separate Object field as locking purpose.

a big mistake, thanks for your suggestion.

@SparkQA
Copy link

SparkQA commented Nov 9, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35388/

@SparkQA
Copy link

SparkQA commented Nov 9, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35388/

@SparkQA
Copy link

SparkQA commented Nov 9, 2020

Test build #130779 has finished for PR 30139 at commit b07b999.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

I see this is now only making exception on not calculating the chunksBeingTransferred on default value of config. This is far from the original approach, so could you please update PR title and description based on the new change?

IMHO we no longer fixes SPARK-31069 via this PR, but if there's no good idea to address the root issue, it'd be OK to keep this to be associated with SPARK-31069. If someone encounters with the issue setting the config as non-default, the issue would be filed again and we can work on the new JIRA issue.

@AngersZhuuuu AngersZhuuuu changed the title [SPARK-31069][CORE] high cpu caused by chunksBeingTransferred in external shuffle service [SPARK-31069][CORE] Avoid repeat compute chunksBeingTransferred cause hight cpu cost in external shuffle service when maxChunksBeingTransferred use default value. Nov 13, 2020
@AngersZhuuuu
Copy link
Contributor Author

I see this is now only making exception on not calculating the chunksBeingTransferred on default value of config. This is far from the original approach, so could you please update PR title and description based on the new change?

IMHO we no longer fixes SPARK-31069 via this PR, but if there's no good idea to address the root issue, it'd be OK to keep this to be associated with SPARK-31069. If someone encounters with the issue setting the config as non-default, the issue would be filed again and we can work on the new JIRA issue.

Yea, updated and added guidelines for users who need to use the origin PR #27831

@mridulm
Copy link
Contributor

mridulm commented Nov 13, 2020

@AngersZhuuuu To clarify, the updated behavior of the PR is that by default we dont incur the cost of computing chunksBeingTransferred as default values disable maxChunksBeingTransferred. But if maxChunksBeingTransferred is configured, the performance characteristics is same as earlier.

I am fine with the change, ideally we should fix the behavior even when maxChunksBeingTransferred is configured, but that can be a followup work.

@AngersZhuuuu
Copy link
Contributor Author

I am fine with the change, ideally we should fix the behavior even when maxChunksBeingTransferred is configured, but that can be a followup work.

Yea, need to think about how to guarantee performance and consistent execution with minimal overhead as followup work. Current change will solve most users' problems.

@mridulm
Copy link
Contributor

mridulm commented Nov 14, 2020

+CC @srowen, @jiangxb1987, @cloud-fan
I will leave it around till monday in case others want to comment, and then merge.

@mridulm
Copy link
Contributor

mridulm commented Nov 14, 2020

+CC @otterc, @wangyum, @HeartSaVioR, @Ngone51 who also reviewed this change.

Copy link
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 Looks OK to me.

It'd be ideal if we could find the good way we all agree to resolve the origin problem, but it doesn't look like that easy.

Copy link
Contributor

@otterc otterc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the nits, looks good to me.

long chunksBeingTransferred = streamManager.chunksBeingTransferred();
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
chunksBeingTransferred, maxChunksBeingTransferred);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation should be 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation should be 2

Done

long chunksBeingTransferred = streamManager.chunksBeingTransferred();
if (chunksBeingTransferred >= maxChunksBeingTransferred) {
logger.warn("The number of chunks being transferred {} is above {}, close the connection.",
chunksBeingTransferred, maxChunksBeingTransferred);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation should be 2 spaces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: indentation should be 2 spaces

Done

@SparkQA
Copy link

SparkQA commented Nov 14, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35690/

@SparkQA
Copy link

SparkQA commented Nov 14, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35690/

@SparkQA
Copy link

SparkQA commented Nov 14, 2020

Test build #131087 has finished for PR 30139 at commit fab5557.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AngersZhuuuu
Copy link
Contributor Author

Any update?

@mridulm
Copy link
Contributor

mridulm commented Nov 18, 2020

+CC @srowen, @HyukjinKwon I merged the pr via ./dev/merge_spark_pr.py - but it failed with following [1]
When I look at apache repo, the commit has made it through : https://git-wip-us.apache.org/repos/asf?p=spark.git;a=commit;h=f8b95dddc1571194fd728d7e0c6de495895da99e

I had run a git fetch -all and git pull before merging. A subsequent pull showed no conflicts with local [2].

Any idea why there is a disconnect between github and apache repo ? Thanks.

[1]

Merge complete (local ref PR_TOOL_MERGE_PR_30139_MASTER). Push to apache? (y/n): y
git push apache PR_TOOL_MERGE_PR_30139_MASTER:master
Username for 'https://git-wip-us.apache.org': mridulm80
Password for 'https://mridulm80@git-wip-us.apache.org': 
Enumerating objects: 26, done.
Counting objects: 100% (26/26), done.
Delta compression using up to 8 threads
Compressing objects: 100% (9/9), done.
Writing objects: 100% (14/14), 4.72 KiB | 4.72 MiB/s, done.
Total 14 (delta 5), reused 0 (delta 0)
remote: To git@github:apache/spark.git
remote:  ! [rejected]        f8b95dddc1571194fd728d7e0c6de495895da99e -> master (fetch first)
remote: error: failed to push some refs to 'git@github:apache/spark.git'
remote: hint: Updates were rejected because the remote contains work that you do
remote: hint: not have locally. This is usually caused by another repository pushing
remote: hint: to the same ref. You may want to first integrate the remote changes
remote: hint: (e.g., 'git pull ...') before pushing again.
remote: hint: See the 'Note about fast-forwards' in 'git push --help' for details.
remote: Syncing refs/heads/master...
remote: Could not sync with GitHub: 
remote: Sending notification emails to: [u'"commits@spark.apache.org" <commits@spark.apache.org>']
remote: Error running hook: /x1/gitbox/hooks/post-receive.d/01-sync-repo.py
To https://git-wip-us.apache.org/repos/asf/spark.git
   5e8549973dc..f8b95dddc15  PR_TOOL_MERGE_PR_30139_MASTER -> master
git rev-parse PR_TOOL_MERGE_PR_30139_MASTER
Restoring head pointer to master
git checkout master
Switched to branch 'master'
git branch
Deleting local branch PR_TOOL_MERGE_PR_30139
git branch -D PR_TOOL_MERGE_PR_30139
Deleting local branch PR_TOOL_MERGE_PR_30139_MASTER
git branch -D PR_TOOL_MERGE_PR_30139_MASTER
Pull request #30139 merged!
Merge hash: f8b95ddd

[2]

$ git pull apache-github master
remote: Enumerating objects: 8, done.
remote: Counting objects: 100% (8/8), done.
remote: Compressing objects: 100% (3/3), done.
remote: Total 13 (delta 5), reused 6 (delta 5), pack-reused 5
Unpacking objects: 100% (13/13), 1.63 KiB | 104.00 KiB/s, done.
From github.com:apache/spark
 * branch                    master     -> FETCH_HEAD
 + f8b95dddc15...7f3d99a8a5b master     -> origin/master  (forced update)
Updating 5e8549973dc..7f3d99a8a5b
Fast-forward
 python/pyspark/sql/functions.py                              | 4 ++--
 sql/core/src/main/scala/org/apache/spark/sql/functions.scala | 8 ++++----
 2 files changed, 6 insertions(+), 6 deletions(-)

@asfgit asfgit closed this in dd32f45 Nov 18, 2020
@mridulm
Copy link
Contributor

mridulm commented Nov 18, 2020

Scratch that - was a concurrent merge issue at git, not seen this before :-)

@mridulm
Copy link
Contributor

mridulm commented Nov 18, 2020

Thanks for fixing this @AngersZhuuuu !

Thanks for the reviews @jiangxb1987, @HeartSaVioR, @otterc, @srowen, @cloud-fan

@HeartSaVioR
Copy link
Contributor

I became a committer after gitbox integration, and according to the guide, I guess we no longer need to set up ASF git as remote repo.

@mridulm
Copy link
Contributor

mridulm commented Nov 18, 2020

@AngersZhuuuu Can you update the jira please ? I am not sure if I have added your id as assignee

@mridulm
Copy link
Contributor

mridulm commented Nov 18, 2020

My repo config's are from ages back @HeartSaVioR :-)
I should perhaps update them ... good point.

@AngersZhuuuu
Copy link
Contributor Author

@AngersZhuuuu Can you update the jira please ? I am not sure if I have added your id as assignee

assignee is me now, any other information need update?
image

@mridulm
Copy link
Contributor

mridulm commented Nov 18, 2020

Thanks !

@HyukjinKwon
Copy link
Member

Yeah .. I migrated to gitbox completely IIRC. I remember there was a similar syncing issue without the migration. Now committers have to set up according to https://infra.apache.org/apache-github.html if I am not wrong. FYI, I believe we now can use interchangeably gitbox and github. This is my remote FYI:

git remote -v
...
apache	https://github.com/apache/spark.git (fetch)
apache	https://github.com/apache/spark.git (push)
apache-github	https://github.com/apache/spark.git (fetch)
apache-github	https://github.com/apache/spark.git (push)
...
origin	https://github.com/HyukjinKwon/spark.git (fetch)
origin	https://github.com/HyukjinKwon/spark.git (push)
...
upstream	https://github.com/apache/spark.git (fetch)
upstream	https://github.com/apache/spark.git (push)
...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.