Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add maximum shard size to config #4986

Merged
merged 1 commit into from Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions distributed/distributed-schema.yaml
Expand Up @@ -629,6 +629,15 @@ properties:
This is useful if you want to include serialization in profiling data,
or if you have data types that are particularly sensitive to deserialization

shard:
type: string
description: |
The maximum size of a frame to send through a comm

Some network infrastructure doesn't like sending through very large messages.
Dask comms will cut up these large messages into many small ones.
This attribute determines the maximum size of such a shard.

socket-backlog:
type: integer
description: |
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Expand Up @@ -165,6 +165,7 @@ distributed:
min: 1s # the first non-zero delay between re-tries
max: 20s # the maximum delay between re-tries
compression: auto
shard: 64MiB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick but the previous value was 64MB not 64MiB :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point. I think that we should default to powers of two in general. Any objection?

Copy link
Member

@fjetter fjetter Jun 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should default to powers of two in genera

I guess it doesn't really matter but with powers of two we probably have the best chance to hit some kind of sweet mem alignment/cache size/whatever optimization so I'm all for it.

However, if you prefer powers of two, you should use MB, shoudn't you? (I understand if you didn't want to push changes any more, just wondering if I messed something up in my mind :) )

MB == 1024 ** 2 / 2 ** 20
MiB == 1000 ** 2 / 10**6

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the internet, the source of all truth.

Megabytes (MB) or Mebibytes (MiB)?

Though the article refers to Linux, the topic is applicable to all computers. ... According to these standards, technically a megabyte (MB) is a power of ten, while a mebibyte (MiB) is a power of two, appropriate for binary machines. A megabyte is then 1,000,000 bytes.Dec 23, 2001

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, I was wrong all those years. I remember a first semester class where my prof introduced the unit and I would've sworn all these years it was the other way round 🤦

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the prof was wrong too? Imagine how they feel :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If he was wrong, he'd never admit to it!

offload: 10MiB # Size after which we choose to offload serialization to another thread
default-scheme: tcp
socket-backlog: 2048
Expand Down
4 changes: 3 additions & 1 deletion distributed/protocol/utils.py
@@ -1,8 +1,10 @@
import struct

import dask

from ..utils import nbytes

BIG_BYTES_SHARD_SIZE = 2 ** 26
BIG_BYTES_SHARD_SIZE = dask.utils.parse_bytes(dask.config.get("distributed.comm.shard"))


msgpack_opts = {
Expand Down