New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce amount of scaning needed to find notifications #500

Closed
keith-turner opened this Issue May 22, 2015 · 6 comments

Comments

Projects
None yet
4 participants
@keith-turner
Contributor

keith-turner commented May 22, 2015

In #282, the way Fluo finds notifications was switched to the following

  • Single thread per worker scans all notifications and does hash+mod only taking ones with a specific remainder.
  • Workers coordinate via zookeeper to determine modulus and remainder.

This change was a vast improvement, but having each worker scan all notifications will only scale to a certain point.

A possible solution is to only have one worker scan each tablet for notifications. Workers would hash notifications and send them to other workers that are responsible for those notifications. Worker would only scan a tablet when other workers request it. Also they would only send notifications to workers who recently requested a scan. This avoids problems with scanning unnecessarily and one worker sending notifications to another worker whose queue is full.

  1. Table is created with 3 tablets named T1, T2, and T3
  2. Worker W1 starts
  3. Worker W2 starts
  4. Worker W3 starts
  5. Worker W4 starts
  6. T2 hashes to W1
  7. W2 ask W1 to scan T2 for notifications.
  8. W3 ask W1 to scan T2 for notifications.
  9. W1 starts scan of T2 for W2 and W3. Any notifications it finds for W4 are ignored, because W4 did not request a scan. Some notifications are found for W2. Nothing is found for W3.
  10. W1 sends notifications found to W2
  11. W1 reports to W3 that it found nothing. W3 uses this info to decided when to request a scan of T2 again.

This model is conceptually similar to the way map reduce distributes data in cluster. I think this is a really scalable way to find and distribute notifications in a cluster. Also I like sticking with hashing (vs the lock service described in the paper) because it evens out the work so well. When a worker receives notifications to process found by another worker, it should check this hash before queueing and before executing like was done in #458.

@keith-turner

This comment has been minimized.

Show comment
Hide comment
@keith-turner

keith-turner May 26, 2015

Contributor

Workers should send out request for tablet scan asynchronously. This approach makes it likely that when a workers scans a tablet, its doing it on behalf of many other workers.

If a worker issued a scan request to another worker and waited, then its possible that other tablets its interested in scanning could be scanned while its waiting.

Contributor

keith-turner commented May 26, 2015

Workers should send out request for tablet scan asynchronously. This approach makes it likely that when a workers scans a tablet, its doing it on behalf of many other workers.

If a worker issued a scan request to another worker and waited, then its possible that other tablets its interested in scanning could be scanned while its waiting.

@keith-turner keith-turner added this to the 1.0.0-beta-2 milestone Jun 11, 2015

@keith-turner keith-turner removed this from the 1.0.0-beta-2 milestone Jan 5, 2016

@keith-turner keith-turner added this to the 1.0.0 milestone Jan 19, 2016

@keith-turner

This comment has been minimized.

Show comment
Hide comment
@keith-turner

keith-turner Apr 3, 2017

Contributor

I have come up with another solution to this problem. The solution is to group workers into fixed sized groups and have each group scan a disjoint set of tablets.

For example if there are 23 workers, a minimum group size of 7, and 100 tablets, then would create the following groups :

  • Group 1 with 8 workers and 34 tablets
  • Group 2 with 8 workers and 33 tablets
  • Group 3 with 7 workers and 33 tablets

Each worker in group 1 would have a unique id within the group ranging from 0 to 7. A worker with id 5 in the group would scan all 34 of the groups tablets looking for notifications where hash(notification) % 8 == 5.

This solution allows the cost of scanning for notifications to stay fixed as the number of workers grows.
The reason to have groups of workers is that it evens out notification processing in the case where notifications are not evenly distributed among tablets.

The current notification finding implementation in Fluo has a single group. So notification processing is very evenly spread among workers without having worry about collisions. However, the cost of every worker scanning every tablet does not scale well as the number of workers grows.

Contributor

keith-turner commented Apr 3, 2017

I have come up with another solution to this problem. The solution is to group workers into fixed sized groups and have each group scan a disjoint set of tablets.

For example if there are 23 workers, a minimum group size of 7, and 100 tablets, then would create the following groups :

  • Group 1 with 8 workers and 34 tablets
  • Group 2 with 8 workers and 33 tablets
  • Group 3 with 7 workers and 33 tablets

Each worker in group 1 would have a unique id within the group ranging from 0 to 7. A worker with id 5 in the group would scan all 34 of the groups tablets looking for notifications where hash(notification) % 8 == 5.

This solution allows the cost of scanning for notifications to stay fixed as the number of workers grows.
The reason to have groups of workers is that it evens out notification processing in the case where notifications are not evenly distributed among tablets.

The current notification finding implementation in Fluo has a single group. So notification processing is very evenly spread among workers without having worry about collisions. However, the cost of every worker scanning every tablet does not scale well as the number of workers grows.

@ctubbsii

This comment has been minimized.

Show comment
Hide comment
@ctubbsii

ctubbsii Apr 3, 2017

Member

How would tablets be assigned to groups?

Member

ctubbsii commented Apr 3, 2017

How would tablets be assigned to groups?

@keith-turner

This comment has been minimized.

Show comment
Hide comment
@keith-turner

keith-turner Apr 3, 2017

Contributor

How would tablets be assigned to groups?

Good question, I have spent a good bit of time thinking about this. This is easy to do, IF all of the workers can agree on what the current set of tablets for a table is. However, the workers can possibly read different splits at different times.

I was trying to figure out a fancy distributed way of all workers agreeing on the same set of split points for a table (for a time period), but could not think of anything. So I think putting the splits in zookeeper is a good option. I am currently thinking of taking a subset of table split points where the total size is less than 128K or 256K and putting that in zookeeper. Thinking of reading the splits and removing all odd splits while the total size is greater than 256K before storing in ZK. The worker with the lowest ID could manage the splits stored in ZK. All other workers can observe the splits node.

Once the workers all agree on a set of split point, its smooth sailing. Using the info about workers in zookeeper, can decide how many groups there are. Then can shuffle the splits in a deterministic way and round robin assign to them to groups. This should lead to all workers making the same decisions about which splits are in a group. The shuffle+round robin will result in very even and random assignment of tablets to groups. I was thinking it would be nice to avoid assigning contiguous tablets to a group.

I have used the term tablets and splits. In reality all of the worker just need to agree on some set of row ranges for the table that don't overlap and cover the table. It does not need to be tablet split points, that's just convenient.

Contributor

keith-turner commented Apr 3, 2017

How would tablets be assigned to groups?

Good question, I have spent a good bit of time thinking about this. This is easy to do, IF all of the workers can agree on what the current set of tablets for a table is. However, the workers can possibly read different splits at different times.

I was trying to figure out a fancy distributed way of all workers agreeing on the same set of split points for a table (for a time period), but could not think of anything. So I think putting the splits in zookeeper is a good option. I am currently thinking of taking a subset of table split points where the total size is less than 128K or 256K and putting that in zookeeper. Thinking of reading the splits and removing all odd splits while the total size is greater than 256K before storing in ZK. The worker with the lowest ID could manage the splits stored in ZK. All other workers can observe the splits node.

Once the workers all agree on a set of split point, its smooth sailing. Using the info about workers in zookeeper, can decide how many groups there are. Then can shuffle the splits in a deterministic way and round robin assign to them to groups. This should lead to all workers making the same decisions about which splits are in a group. The shuffle+round robin will result in very even and random assignment of tablets to groups. I was thinking it would be nice to avoid assigning contiguous tablets to a group.

I have used the term tablets and splits. In reality all of the worker just need to agree on some set of row ranges for the table that don't overlap and cover the table. It does not need to be tablet split points, that's just convenient.

@mjwall

This comment has been minimized.

Show comment
Hide comment
@mjwall

mjwall Apr 6, 2017

Member

How does recovery happen if any given process is killed?

Member

mjwall commented Apr 6, 2017

How does recovery happen if any given process is killed?

@keith-turner

This comment has been minimized.

Show comment
Hide comment
@keith-turner

keith-turner Apr 6, 2017

Contributor

How does recovery happen if any given process is killed?

All of the information used to partition workers, tablets, and notification comes from zookeeper. All of the workers watch this information in zookeeper. What I am currently doing in my branch is when any of the information changes then workers stop processing notifications until the information in zookeeper is stable for 60 seconds.

Contributor

keith-turner commented Apr 6, 2017

How does recovery happen if any given process is killed?

All of the information used to partition workers, tablets, and notification comes from zookeeper. All of the workers watch this information in zookeeper. What I am currently doing in my branch is when any of the information changes then workers stop processing notifications until the information in zookeeper is stable for 60 seconds.

keith-turner added a commit to keith-turner/fluo that referenced this issue Apr 7, 2017

keith-turner added a commit to keith-turner/fluo that referenced this issue Apr 7, 2017

@asfgit asfgit closed this in 3da76dc Apr 21, 2017

@mikewalch mikewalch modified the milestones: 1.1.0, 1.0.0 May 19, 2017

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment