Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel processing on replicas, reworked. #26748

Closed
alexey-milovidov opened this issue Jul 23, 2021 · 6 comments · Fixed by #29279
Closed

Parallel processing on replicas, reworked. #26748

alexey-milovidov opened this issue Jul 23, 2021 · 6 comments · Fixed by #29279
Assignees
Labels

Comments

@alexey-milovidov
Copy link
Member

alexey-milovidov commented Jul 23, 2021

We want to parallelize data processing using multiple replicas of single shard.
Every replica should process some split of data.

There are the following considerations that makes the task non-trivial:

  1. Replicas may contain different set of data parts, because some replicas may lag behind and miss some new parts.
  2. Replicas may contain different set of data parts, because some parts may be merged on one replica and unmerged on another replica.
  3. We want to distribute the work uniformly across replicas, so if one replica is slower than another, we should not be bounded by performance of the most slow replica.

We cannot use some hashing based split of data parts, because replicas have different set of data parts.
We cannot split the set of data parts statically and assign to replicas before query processing.

This task will also help to implement distributed processing over shared storage.
In most simple case, multiple computation nodes over shared storage can look as replicas, and we split data processing over them.

Proposal

Initiator node sends the query to all participating replicas (all available replicas or limited subset of available replicas depending on settings) along with the setting that these replicas should coordinate in parallel processing and with another setting - a hint on how many replicas are participating.

Every replica collects a snapshot of data parts to process the query as usual. From these data parts "read tasks" are being formed like: - read this data part; - read this range of marks from this data part (for large data parts). Read tasks are identical to what the replica wants to read during normal query processing.

Replica collects some amount of read tasks and make a request with the list of read tasks back to initiator node. Initiator node acts like a "semaphore" for replicas to coordinate their data processing. The request like "tell me if I can take these tasks and assign them for me". Initiator node builds the set of data parts (in memory state) with the ranges inside them that are being processed.

Note: this is similar to already implemented s3Cluster table function that coordinates processing of files on remote storage across multiple computation nodes.

For every read task it answers:

  • if no replicas already took this data part or covering or intersecting data part - then allow it to be processed;
  • if some other replica already took this data part as a whole, or anything from covering or intersecting data part - then answer that replica should skip it;
  • if some other replica already took a range of this data part - then answer that replica should took another range of this data part starting from already taken range with the size comparable to already taken range.

In addition it can send info about already taken tasks - so replica will not ask about them later.

The amount of read tasks that are sent to initiator in one network request is selected to balance between uniform workload distribution and low number of RTTs for short queries.
E.g. it can be one request per 1 GB of data (controlled by a setting).
It will automatically lower the number of replicas and the amount of coordination to process short queries.

If some replicas lagging behind and miss some parts, the result will include the data available at least on one replica (the most complete result).

If some replicas have processed and unprocessed mutations, the result will be calculated over the data parts either before or after mutation (non deterministically).
E.g. if some records were deleted on one replica and not deleted on another, the result may include half of these records.

If some replicas have lost half of data parts that should be merged and other replicas have completed the merge (rare case), the result may not include the data in lost parts.

Every replica may select read tasks in order determined by consistent hashing function of replica number and data part. It will allow to maintain better cache affinity in case of shared storage.

Note: as simple extension, we can implement failover during query processing. If some replica died in the middle of query processing and did not return any block of data yet, we can drop processing on that replica and reassign tasks.

@yiguolei
Copy link
Contributor

Any process on this task?

@nikitamikhaylov
Copy link
Member

Any process on this task?

Feature is being developed, no PR. I would write a bit about an algorithm chosen, but I'd better write some code.

@alexey-milovidov
Copy link
Member Author

FYI we found that the current implementation is inefficient, and @nikitamikhaylov is reworking the details of it.

@maskshell
Copy link

How about extending Zero Copy from S3/HDFS to general-purpose disk storage? Similar to RAC's shared disk.

@LuPan92
Copy link

LuPan92 commented Aug 16, 2023

@nikitamikhaylov This feature is awesome, I'm going to try it out. Do you have more detailed design documents?

@alexey-milovidov
Copy link
Member Author

@LuPan92 the description in this issue is the complete design document.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants