New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sticky query routing via query options #12276
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #12276 +/- ##
============================================
- Coverage 61.58% 61.58% -0.01%
Complexity 1152 1152
============================================
Files 2417 2415 -2
Lines 131367 131502 +135
Branches 20262 20310 +48
============================================
+ Hits 80907 80982 +75
- Misses 44559 44587 +28
- Partials 5901 5933 +32
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
int selectedIdx; | ||
if (_useStickyRouting || QueryOptionsUtils.isUseStickyRouting(queryOptions)) { | ||
// candidates array is always sorted | ||
selectedIdx = Math.abs(_tableNameWithType.hashCode() % candidates.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sticks the request to a single broker instead of distributed a sticky routing across broker but maintain the same one for a single client. is this what we wanted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant single server?
And yes, the requirement is to ensure same set of servers are picked for queries that are run with this query option set to true.
What we're trying to achieve is essentially ensure a consistent view of the data across queries which may be a requirement for a certain usecases.
Since the current set of instance selectors essentially round robin / randomly pick any of the avaliable servers, sometimes it's possible to see query result inconsistencies across queries due to for example the consuming segment on one of the replicas lagging behind another replica.
This option ensures we always route to same set of servers therefore for the particular subset of queries with this option set, always see a consistent view of the data (unless ofcourse the assignment changes / server restarts and is catching up / segments are deleted etc, which has been called out explicitly in the PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes I meant server and the concern is with traffic skew b/c all sticky routing query are going to hit the same replica of the table (request is based on tableName hash)
if we dont have a clientID for sticky routing i dont think there's any other way to achieve this.
b3a9f25
to
15c5a62
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add a config in TableConfig.RoutingConfig, and allow 3 levels of config: instance -> table -> query
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock) { | ||
super(tableNameWithType, propertyStore, brokerMetrics, adaptiveServerSelector, clock); | ||
BrokerMetrics brokerMetrics, @Nullable AdaptiveServerSelector adaptiveServerSelector, Clock clock, | ||
boolean useStickyRouting) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO sticky routing is not very clear, useSingleReplica
or useFixedReplica
might be more explicit. This is personal preference, so open to other names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to consistent routing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are still some places with useStickyRouting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to useFixedReplica
@@ -89,7 +91,13 @@ Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int | |||
if (candidates == null) { | |||
continue; | |||
} | |||
int selectedIdx = requestId++ % candidates.size(); | |||
int selectedIdx; | |||
if (_useStickyRouting || QueryOptionsUtils.isUseStickyRouting(queryOptions)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should allow query option to override this. Basically we can have broker config -> table config -> query options. Each level can override previous level if explicitly configured.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the support for overridng brokerConf with query options.
The problem I see with table level config is, we either have to fetch the table config on each query, or we need to somehow recreate the routing config each time table config changes or we need to call out that updating the table config setting for this feature needs a broker restart. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use the table config when creating the routing. When table config is changed, it will trigger a routing rebuild to pick up the changes. We rely on the same logic for other routing configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I did not notice the RefreshTableConfigMessageHandler
. Changed
int selectedIdx; | ||
if (_useStickyRouting || QueryOptionsUtils.isUseStickyRouting(queryOptions)) { | ||
// candidates array is always sorted | ||
selectedIdx = Math.abs(_tableNameWithType.hashCode() % candidates.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using raw table name so that we can pick the same replica for hybrid use cases for future proof. Also we can cache the hash (positive value) to avoid calculating hash per query
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
public static InstanceSelector getInstanceSelector(TableConfig tableConfig, | ||
ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics brokerMetrics) { | ||
return getInstanceSelector(tableConfig, propertyStore, brokerMetrics, null, Clock.systemUTC()); | ||
ZkHelixPropertyStore<ZNRecord> propertyStore, BrokerMetrics brokerMetrics, PinotConfiguration config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Suggest renaming to brokerConf
to make it clear that it is top level broker config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
// selection of replica group across queries i.e. same instance replica group id is picked each time. | ||
// Since the instances within a selected replica group are iterated in order, the assignment within a selected | ||
// replica group is guaranteed to be deterministic. | ||
replicaGroupSelected = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should still start with the table hash to avoid always selecting the servers from the same replica. If we always pick replica 0, it will likely cause hotspot servers because servers with smaller id will always be picked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the idea with the MultiStageReplicaGroupSelector
is to ensure colocated tables get routed to same set of servers IIUC. If we use table hash here, that goal will not be achieved (the non sticky routing path uses requestId % and not tableName hash).
It's possible to
- Use table hash here -> preferring even distribution across replica group for different tables over colocation.
- Start from 0 -> preferring colocation at the cost of potential hotspots.
- Not support stick routing when using the
MultiStageReplicaGroupSelector
I went with 2, but open to suggestions here. cc: @walterddr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. IIRC we are not using this selector, so both 2 and 3 (1 breaks the contract which is not okay) are fine. If we go with 2, let's add a note describing the potential hotspot problem, or we go with 3 and add a todo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a note comment
@@ -41,4 +41,9 @@ public String getInstance() { | |||
public boolean isOnline() { | |||
return _online; | |||
} | |||
|
|||
@Override | |||
public int compareTo(SegmentInstanceCandidate o) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a bugfix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bad commit. reverted.
@@ -145,6 +145,11 @@ public static boolean isSkipScanFilterReorder(Map<String, String> queryOptions) | |||
return "false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION)); | |||
} | |||
|
|||
@Nullable | |||
public static boolean isUseStickyRouting(Map<String, String> queryOptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should return Boolean
here, and differentiate not set and explicit false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated to use queryOption -> broker config fallback
Please take a look at the failed test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -87,6 +89,8 @@ abstract class BaseInstanceSelector implements InstanceSelector { | |||
final BrokerMetrics _brokerMetrics; | |||
final AdaptiveServerSelector _adaptiveServerSelector; | |||
final Clock _clock; | |||
final Boolean _useFixedReplica; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final Boolean _useFixedReplica; | |
final boolean _useFixedReplica; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
_useFixedReplica = useFixedReplica; | ||
// Using raw table name to ensure queries spanning across REALTIME and OFFLINE tables are routed to the same | ||
// instance | ||
_tableNameHashForFixedReplicaRouting = TableNameBuilder.extractRawTableName(tableNameWithType).hashCode(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) We can keep a positive hash here, which avoids the per query abs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
@@ -429,6 +443,12 @@ public SelectionResult select(BrokerRequest brokerRequest, List<String> segments | |||
} | |||
} | |||
|
|||
protected boolean isUseFixedReplica(Map<String, String> queryOptions) { | |||
return Boolean.parseBoolean( | |||
queryOptions.getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.USE_FIXED_REPLICA, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor) Suggest moving query option handling into QueryOptionUtils
, and avoid per query string conversion:
Boolean useFixedReplicaFromQueryOption = QueryOptionUtils.getUseFixedReplica(queryOptions);
return useFixedReplicaFromQueryOption != null ? useFixedReplicaFromQueryOption : _useFixedReplica;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Ack
* Support sticky routing via query options --------- Co-authored-by: Saurabh Dubey <saurabh.dubey@Saurabhs-MacBook-Pro.local>
Allow deterministic and sticky query routing selection based on a query option. This supports cases where users may want to ensure consistent view of the data segments across multiple queries (in case of realtime tables, essentially route to the same server for a consuming segments, avoiding data inconsistency issues). This is still a best effort implementation, and fallback to other servers is possible in case of a server being down.
Release notes
Adds support for deterministic and sticky routing for a query / table / broker. This setting would lead to same server / set of servers (for
MultiStageReplicaGroupSelector
) being used for all queries of a given table.Query option (takes precedence over fixed routing setting at table / broker config level)
SET "useFixedReplica"=true;
Table config (takes precedence over fixed routing setting at broker config level)
Broker conf