-
Notifications
You must be signed in to change notification settings - Fork 76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance fixes for Solr integration [JIRA: RIAK-2955] #700
Conversation
Thanks @JeetKunDoug! Settings---
minimum_reviewers: 2
build_steps:
- make clean
- make test
- make xref
- make dialyzer
merge: false
org_mode: true
timeout: 1790 |
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
@@ -217,11 +217,13 @@ handle_cast({exchange_status, Pid, Index, {StartIdx, N}, Status}, S) -> | |||
{noreply, S2}; | |||
|
|||
handle_cast(clear_trees, S) -> | |||
lager:info("Clearing YZ hashtrees and stopping all current exchanges..."), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If these logs are going to go in, should remove the "..."
send_solr_ops_for_entries(Index, solr_ops(LI, Entries), | ||
Entries) | ||
LogicalPartition = logical_partition(LI, Entries0), | ||
%% Entries = get_latest_entries_for_keys(Entries0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can remove this line
%% Flatten combined operators for a batch. | ||
lists:flatten(Ops). | ||
_FlattenedOps = lists:flatten(Ops). | ||
%% Just for this experiment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be removed
{1, true} -> | ||
Func(JSonResponse, Accum); | ||
_ -> | ||
lager:info("FDUSHIN> recursive call. NumFound: ~p Iteration: ~p", [NumFound, Iteration]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing logging, noting so it will be removed
clear_all_exchanges(S#state.exchanges), | ||
clear_all_trees(S#state.trees), | ||
{noreply, S}; | ||
|
||
handle_cast(expire_trees, S) -> | ||
lager:info("Expiring YZ hashtrees..."), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment on "..."
@@ -34,6 +34,8 @@ | |||
built :: integer(), | |||
timeout :: pos_integer()}). | |||
|
|||
-type repair_count() :: {non_neg_integer(), non_neg_integer()}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it painfully obvious what the two elements of the repair_count()
tuple is for?
Consider naming the type variables:
-type repair_count() :: {SomeCount :: non_neg_integer(), AnotherCount :: non_neg_interger()}.
|
||
-spec repair_fold_func(p(), keydiff(), repair_count()) -> | ||
repair_count(). | ||
repair_fold_func(Index, {remote_missing, _KeyBin} = KeyDiff, {YZDeleteCount, YZRepairCount} = Accum) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would split this into two functions:
repair_fold_func(Index, KeyDiff, Accum) ->
update_repair_func_accum(repair(Index, KeyDiff), KeyDiff, Accum).
update_repair_func_accum(full_repair, _KeyDiff={remote_missing, _KeyBin}, {YZDeleteCount, YZRepairCount}) ->
{YZDeleteCount + 1, YZRepairCount};
update_repair_func_accum(full_repair, _KeyDiff, {YZDeleteCount, YZRepairCount}) ->
{YZDeleteCount, YZRepairCount + 1};
update_repair_func_accum(_RepairType, _KeyDiff, Accum) ->
Accum.
@@ -251,20 +293,21 @@ get_ops_for_entry_action(Action, _ObjValues, LI, P, Obj, BKey, | |||
%% allowing it to update to tree to prevent forever repair) or it reaches | |||
%% a Solr Internal error of sorts, for which we stop list-processing and | |||
%% use the success-length to segment the entries list for `AAE-updates'. | |||
-spec send_solr_ops_for_entries(index_name(), solr_ops(), solr_entries()) -> | |||
-spec send_solr_ops_for_entries(index_name(), non_neg_integer(), solr_ops(), solr_entries()) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should non_neg_integer()
really be p()
?
If not, please introduce a suitably named type alias.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ends up it's lp()
. Will address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better, the parameter isn't really needed - will clean up dead code.
JSonResponse = search(Index, [ | ||
{q, Query}, {wt, json}, {fl, "_yz_id"}, {rows, Rows}, | ||
{sort, "_yz_id asc"}, {'cursorMark', CurrentCursor}]), | ||
%% lager:info("FDUSHIN> JSonResponse: ~p", [JSonResponse]), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one should go...
add_value_to_list(_Key, Value, List) -> | ||
[Value | List]. | ||
|
||
entry_given_previous_entries(Entry, PreviousEntries) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be simpler if we leverage dict:update/4
along these lines:
entry_given_previous_entries(Entry, PreviousEntries) ->
NewReason = get_reason_from_entry(Entry),
BKey = get_bkey_from_entry(Entry),
dict:update(BKey,
fun (PreviousEntry) ->
PreviousReason = get_reason_from_entry(PreviousEntry),
case choose_entry(NewReason, PreviousReason) of
new -> Entry;
_ -> PreviousEntry
end
end,
Entry,
PreviousEntries).
Having written this I would even say that we should let choose_entry/2
take Entries as argument and rename the existing choose_entry/2
to compare_entry_reasons/2
choose_entry(Entry, PreviousEntry) ->
NewReason = get_reason_from_entry(Entry),
PreviousReason = get_reason_from_entry(PreviousEntry),
case compare_entry_reasons(NewReason, PreviousReason) of
new -> Entry;
_ -> PreviousEntry
end.
And then we get:
entry_given_previous_entries(Entry, PreviousEntries) ->
BKey = get_bkey_from_entry(Entry),
dict:update(BKey,
fun (PreviousEntry) ->
choose_entry(Entry, PreviousEntry)
end,
Entry,
PreviousEntries).
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated per review comments.
@@ -251,20 +293,21 @@ get_ops_for_entry_action(Action, _ObjValues, LI, P, Obj, BKey, | |||
%% allowing it to update to tree to prevent forever repair) or it reaches | |||
%% a Solr Internal error of sorts, for which we stop list-processing and | |||
%% use the success-length to segment the entries list for `AAE-updates'. | |||
-spec send_solr_ops_for_entries(index_name(), solr_ops(), solr_entries()) -> | |||
-spec send_solr_ops_for_entries(index_name(), non_neg_integer(), solr_ops(), solr_entries()) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ends up it's lp()
. Will address.
@@ -251,20 +293,21 @@ get_ops_for_entry_action(Action, _ObjValues, LI, P, Obj, BKey, | |||
%% allowing it to update to tree to prevent forever repair) or it reaches | |||
%% a Solr Internal error of sorts, for which we stop list-processing and | |||
%% use the success-length to segment the entries list for `AAE-updates'. | |||
-spec send_solr_ops_for_entries(index_name(), solr_ops(), solr_entries()) -> | |||
-spec send_solr_ops_for_entries(index_name(), non_neg_integer(), solr_ops(), solr_entries()) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even better, the parameter isn't really needed - will clean up dead code.
7ef8000
to
21929bf
Compare
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
+1 |
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
create jira issue |
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
f567a65
to
4345b67
Compare
4345b67
to
6a9ce66
Compare
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
6a9ce66
to
c5df456
Compare
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few minor nits - I can actually take care of these myself at this point if you're otherwise busy.
@@ -66,11 +56,10 @@ In the undesirable cases where Solr becomes unresponsive (e.g., data corruption, | |||
|
|||
If a Solr core has become unresponsive and the specified error threshold has been traversed, and if, in addition, the high water mark has been exceeded, then the yokozuna batching system has a mechanism for automatically purging enqueued entries, so as to allow vnodes to continue servicing requests, as well as to allow update operations to occur for indices that are not in a pathological state. | |||
|
|||
The yokozuna batching subsystem supports 4 different purge strategies: | |||
The yokozuna batching subsystem supports 3 different purge strategies: | |||
|
|||
* `purge_one` (default behavior): Purge the oldest entry from a randomly selected indexq structure among the set of search indexes which have crossed the error threshold; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This no longer is "a randomly selected indexq structure" as there are A) no indexq structures and B) it'll always purge from the {index, partition} to which the new entry is written.
@@ -66,11 +56,10 @@ In the undesirable cases where Solr becomes unresponsive (e.g., data corruption, | |||
|
|||
If a Solr core has become unresponsive and the specified error threshold has been traversed, and if, in addition, the high water mark has been exceeded, then the yokozuna batching system has a mechanism for automatically purging enqueued entries, so as to allow vnodes to continue servicing requests, as well as to allow update operations to occur for indices that are not in a pathological state. | |||
|
|||
The yokozuna batching subsystem supports 4 different purge strategies: | |||
The yokozuna batching subsystem supports 3 different purge strategies: | |||
|
|||
* `purge_one` (default behavior): Purge the oldest entry from a randomly selected indexq structure among the set of search indexes which have crossed the error threshold; | |||
* `purge_index`: Purge all entries from a randomly selected indexq structure among the set of search indexes which have crossed the error threshold; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above - will purge all of the entries for the {index, partition} pair. ? is, should it actually purge the whole index or is the current behavior of only purging the one solrq_worker "correct enough"? I believe it is, as this really previously would purge all the data for a single indexq structure for a single worker as it does today.
26b8115
to
edf1413
Compare
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
+1 edf1413 |
- Remove Delete By Query calls whenever possible as it's a performance killer on Solr - Split hashtree update out of the "Drain Timeout" window as we were killing hashtree builds unnecessarily. - Fix issue where `yz_index_hashtree:fold_keys` was flooding the yz_index_hashtree mailbox by implementing the "90 async/1 sync" call pattern as other `index` and `delete` callers used. Prior to this fix, whenever we would add a document to Solr that might contain siblings, we would add a delete by query operation to the batch we send to Solr (in a single HTTP request). This had a significant impact on performance, especially when objects being updated had a lot of siblings. (We found that deleting by query would contend for locks with the segment scheduler, and in general adding deletesByQuery to high workloads seems like a bad idea). The fix involves changing deleteByQuery to deleting documents in Solr by their document ids, which has much better performance than deleting by query.. The challenge is that when adding an object, we didn't have the sibling vtags to generate a document id. We therefore made a change to riak_kv (basho/riak_kv#1520) which, when updating an object, also sends the old object being replaced. From that old object, we can extract any siblings and generate associated document ids to delete in Solr. The process of doing a KV->Yokozuna hastree comparison takes several steps, including draining the relevant Solr queues before snapshotting the Yokozuna hashtree so that its state can be as close as possible to the KV hashtree. In order to not block the exchange indefinitely, a "Drain Timeout" was included in this process. However, as initially developed, the drain was not the only thing being measured in this timeout window. It also included the hashtree:update call, which, on a heavily-loaded system, can take several minutes. This would cause the "drain" to time out and be forcibly cancelled, even though the drain itself had only taken a few hundreds of milliseconds. `yz_drain_fsm`, `yz_drain_mgr`, and `yz_exchange_fsm` were was reworked to split the Solr drain from the hashtree update, resolving the issue of drain timeouts under normal operating conditions. During performance testing, it was discovered that the `yz_index_hashtree:fold_keys` function could swamp the mailbox of the `yz_index_hashtree` so that other processes could not make progress. This commit makes `fold_keys` use the same "90 async + 1 sync" call pattern that `yz_kv:index` used. That logic (which lived in `yz_kv`) has been moved to `yz_index_hashtree` and shared by the new `index` and `delete` calls that do not take an explicit "call mode" parameter. Future work is scheduled to figure out how to make this pattern more generic (as it's used in several places) and to eliminate the use of the caller's process dictionary for storing hashtree tokens. * Added logging for clear and exchange trees, for audit of administrative operations. * Modified the "Will Repair" log in the yz_exchange_fsm to track the "direction" of repair, viz., whether the repair resulted in a delete of Solr data or an add/update to Solr. * Corrected the list of `required_queues` in `yz_solrq_sup` to include both _current_ and _future_ indexes the node owned. This prevented premature shutdown of a solrq worker/helper pair that was started by writes to a _future_ vnode (one in the process of being handed off to the node in question). * Remove exometer statistics when an index is removed. Before, if an index was later re-added, the fuse creation would fail, eventually causing the node to crash. Test updated in 315b746 * Monitor `yz_solrq_drain_fsm` from the queues being drained in case the drain_fsm crashes. Before, it was possible for a queue to get stuck in `wait_for_drain_complete` state if the drain fsm crashed before the drain complete messages were sent.
edf1413
to
d6f9bfb
Compare
There seems to be an issue with build step **make_test,make_xref,make_dialyzer** ! ☁️
|
Several major performance improvements for Solr integration:
yz_index_hashtree:fold_keys
was flooding the yz_index_hashtree mailbox by implementing the "90 async/1 sync" call pattern as otherindex
anddelete
callers used.Removed usages of Delete By Query
Prior to this fix, whenever we would add a document to Solr that might contain siblings, we would add a delete by query operation to the batch we send to Solr (in a single HTTP request). This had a significant impact on performance, especially when objects being updated had a lot of siblings. (We found that deleting by query would contend for locks with the segment scheduler, and in general adding deletesByQuery to high workloads seems like a bad idea).
The fix involves changing deleteByQuery to deleting documents in Solr by their document ids, which has much better performance than deleting by query.. The challenge is that when adding an object, we didn't have the sibling vtags to generate a document id. We therefore made a change to riak_kv (basho/riak_kv#1520) which, when updating an object, also sends the old object being replaced. From that old object, we can extract any siblings and generate associated document ids to delete in Solr.
Hashtree update/drain timeout split
The process of doing a KV->Yokozuna hastree comparison takes several steps, including draining the relevant Solr queues before snapshotting the Yokozuna hashtree so that its state can be as close as possible to the KV hashtree. In order to not block the exchange indefinitely, a "Drain Timeout" was included in this process. However, as initially developed, the drain was not the only thing being measured in this timeout window. It also included the hashtree:update call, which, on a heavily-loaded system, can take several minutes. This would cause the "drain" to time out and be forcibly cancelled, even though the drain itself had only taken a few hundreds of milliseconds.
yz_drain_fsm
,yz_drain_mgr
, andyz_exchange_fsm
were was reworked to split the Solr drain from the hashtree update, resolving the issue of drain timeouts under normal operating conditions.Hashtree asyc/sync change
During performance testing, it was discovered that the
yz_index_hashtree:fold_keys
function could swamp the mailbox of the
yz_index_hashtree
so that other processescould not make progress. This commit makes
fold_keys
use the same"90 async + 1 sync" call pattern that
yz_kv:index
used. That logic(which lived in
yz_kv
) has been moved toyz_index_hashtree
andshared by the new
index
anddelete
calls that do not take an explicit"call mode" parameter.
Future work is scheduled to figure out how to make this pattern more generic
(as it's used in several places) and to eliminate the use of the caller's
process dictionary for storing hashtree tokens.
Minor improvements included in this Pull request
required_queues
inyz_solrq_sup
to include both current and future indexes the node owned. This prevented premature shutdown of a solrq worker/helper pair that was started by writes to a future vnode (one in the process of being handed off to the node in question).yz_solrq_drain_fsm
from the queues being drained in case the drain_fsm crashes. Before, it was possible for a queue to get stuck inwait_for_drain_complete
state if the drain fsm crashed before the drain complete messages were sent.Review Checklist:
Observability
Performance
Fault-tolerance
yz_solrq_drain_fsm
to prevent queues from being stuck inwait_for_drain_complete
state.Testing
Code Clarity/Quality
Code Documentation
batching.md
and associated docs. These can be used to dive External Documentation changes as well.External Documentation
purge_all
strategy fromsearch.queue.high_watermark.purge_strategy
options as it no longer makes sense to use (eachyz_solrq_worker
now manages a single index/partition pair.)search.queue.worker_count
andsearch.queue.helper_count
as they are no longer applicable.search.dist_query
setting to riak.conf which will remove the node in question from distributed search results. This setting can also be changed viariak-admin
by issuing theriak-admin set search.dist_query=off
orriak-admin set search.dist_query=on
commands at the command line. Setting this value inriak.conf
is useful when you are restarting a node which was removed from search queries with the above-mentionedriak-admin
feature, to prevent the node from being included in search queries until it is fully spun up. This may be necessary if a re-indexing operation is under way, or, in the case of very large Solr indexes, the time it takes for Solr to read it's indexes off disk and be prepared for query.Release Notes
{yokozuna, anti_entropy_max_async}
that mirrors a similar setting in riak_kv (and will take the KV value if the yokozuna value isn't provided), but the setting in riak_kv isn't documented and is essentially never changed, so this should not have to be called out in release notes as we doubt anyone would ever change it.