Skip to content

Commit

Permalink
Read from followers: APM-296 (#16335)
Browse files Browse the repository at this point in the history
Add read-from-followers for clusters. This currently covers the following APIs:
  - single document reads
  - batch document reads
  - standalone AQL queries
  - edge reads
  - exists reads (HTTP HEAD)
  - streaming transactions and operations in their context
So far, it does not cover:
  - JavaScript transactions
  - The graph API

Co-authored-by: Jan <jsteemann@users.noreply.github.com>
  • Loading branch information
neunhoef and jsteemann committed Jun 28, 2022
1 parent 8651f07 commit f96eb80
Show file tree
Hide file tree
Showing 26 changed files with 628 additions and 57 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
devel
-----

* Introduce reading from followers in clusters. This works by offering
an additional HTTP header "x-arango-allow-dirty-read" for certain
read-only APIs. This header has already been used for active failover
deployments to allow reading from followers. Using this header leads
to the fact that coordinators are allowed to read from follower shards
instead only from leader shards. This can help to spread the read load
better across the cluster. Obviously, using this header can result in
"dirty reads", which are read results returning stale data or even
not-yet-officially committed data. Use at your own risk if performance
is more important than correctness or if you know that data does not
change.

* Changed HTTP response code for error number 1521 from 500 to 400.

Error 1521 (query collection lock failed) is nowadays only emitted by
Expand Down
17 changes: 13 additions & 4 deletions arangod/Aql/ShardLocking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "Aql/Query.h"
#include "Cluster/ClusterFeature.h"
#include "Logger/LogMacros.h"
#include "StorageEngine/TransactionState.h"
#include "Utilities/NameValidator.h"

using namespace arangodb;
Expand Down Expand Up @@ -378,16 +379,24 @@ ShardLocking::getShardMapping() {
}
}
}
TRI_ASSERT(!shardIds.empty());
auto& server = _query.vocbase().server();
if (!server.hasFeature<ClusterFeature>()) {
THROW_ARANGO_EXCEPTION(TRI_ERROR_SHUTTING_DOWN);
}
auto& ci = server.getFeature<ClusterFeature>().clusterInfo();
// We have at least one shard, otherwise we would not have snippets!
TRI_ASSERT(!shardIds.empty());
_shardMapping = ci.getResponsibleServers(shardIds);

#ifdef USE_ENTERPRISE
auto& trx = _query.trxForOptimization();
if (trx.state()->options().allowDirtyReads) {
_shardMapping = trx.state()->whichReplicas(shardIds);
} else
#endif
{
// We have at least one shard, otherwise we would not have snippets!
_shardMapping = ci.getResponsibleServers(shardIds);
}
TRI_ASSERT(_shardMapping.size() == shardIds.size());

for (auto const& lockInfo : _collectionLocking) {
for (auto const& sid : lockInfo.second.allShards) {
auto mapped = _shardMapping.find(sid);
Expand Down
3 changes: 3 additions & 0 deletions arangod/Aql/ShardLocking.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ class ShardLocking {
// Get a full mapping of ShardID => LeaderID.
// This will stay constant during this query, and a query could be aborted in
// case of failovers.
// For ReadFromFollower situations in read-only queries, this map maps
// each ShardID to the actual leader or follower which has been chosen
// for the query.
containers::FlatHashMap<ShardID, ServerID> const& getShardMapping();

// Get the shards of the given collection within the given snippet.
Expand Down
111 changes: 108 additions & 3 deletions arangod/Cluster/ClusterInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,8 @@ void ClusterInfo::loadPlan() {
decltype(_plannedCollections) newCollections;
decltype(_shards) newShards;
decltype(_shardServers) newShardServers;
decltype(_shardToShardGroupLeader) newShardToShardGroupLeader;
decltype(_shardGroups) newShardGroups;
decltype(_shardToName) newShardToName;
decltype(_dbAnalyzersRevision) newDbAnalyzersRevision;
decltype(_newStuffByDatabase) newStuffByDatabase;
Expand All @@ -1041,6 +1043,8 @@ void ClusterInfo::loadPlan() {
newCollections = _plannedCollections;
newShards = _shards;
newShardServers = _shardServers;
newShardToShardGroupLeader = _shardToShardGroupLeader;
newShardGroups = _shardGroups;
newShardToName = _shardToName;
newDbAnalyzersRevision = _dbAnalyzersRevision;
newStuffByDatabase = _newStuffByDatabase;
Expand Down Expand Up @@ -1097,6 +1101,8 @@ void ClusterInfo::loadPlan() {
newShards.erase(shardName);
newShardServers.erase(shardName);
newShardToName.erase(shardName);
newShardToShardGroupLeader.erase(shardName);
newShardGroups.erase(shardName);
}
}
}
Expand Down Expand Up @@ -1449,6 +1455,10 @@ void ClusterInfo::loadPlan() {
newShards.erase(shardId);
newShardServers.erase(shardId);
newShardToName.erase(shardId);
// We try to erase the shard ID anyway, no problem if it is
// not in there, should it be a shard group leader!
newShardToShardGroupLeader.erase(shardId);
newShardGroups.erase(shardId);
}
collectionsPath.pop_back();
}
Expand Down Expand Up @@ -1549,6 +1559,59 @@ void ClusterInfo::loadPlan() {
continue;
}
}
// Now that the loop is completed, we have to run through it one more
// time to get the shard groups done:
for (auto const& colPair : *databaseCollections) {
if (colPair.first == colPair.second.collection->name()) {
// Every collection shows up once with its ID and once with its name.
// We only want it once, so we only take it when we see the ID, not
// the name as key:
continue;
}
auto const& groupLeader =
colPair.second.collection->distributeShardsLike();
if (!groupLeader.empty()) {
auto groupLeaderCol = newShards.find(groupLeader);
if (groupLeaderCol != newShards.end()) {
auto col = newShards.find(
std::to_string(colPair.second.collection->id().id()));
if (col != newShards.end()) {
if (col->second->size() == 0) {
// Can happen for smart edge collections. But in this case we
// can ignore the collection.
continue;
}
TRI_ASSERT(groupLeaderCol->second->size() == col->second->size());
for (size_t i = 0; i < col->second->size(); ++i) {
newShardToShardGroupLeader.try_emplace(
col->second->at(i), groupLeaderCol->second->at(i));
auto it = newShardGroups.find(groupLeaderCol->second->at(i));
if (it == newShardGroups.end()) {
// Need to create a new list:
auto list = std::make_shared<std::vector<ShardID>>();
list->reserve(2);
// group leader as well as member:
list->emplace_back(groupLeaderCol->second->at(i));
list->emplace_back(col->second->at(i));
newShardGroups.try_emplace(groupLeaderCol->second->at(i),
std::move(list));
} else {
// Need to add us to the list:
it->second->push_back(col->second->at(i));
}
}
} else {
LOG_TOPIC("12f32", WARN, Logger::CLUSTER)
<< "loadPlan: Strange, could not find collection: "
<< colPair.second.collection->name();
}
} else {
LOG_TOPIC("22312", WARN, Logger::CLUSTER)
<< "loadPlan: Strange, could not find proto collection: "
<< groupLeader;
}
}
}
newCollections.insert_or_assign(databaseName,
std::move(databaseCollections));
}
Expand Down Expand Up @@ -1649,6 +1712,8 @@ void ClusterInfo::loadPlan() {
_plannedCollections.swap(newCollections);
_shards.swap(newShards);
_shardServers.swap(newShardServers);
_shardToShardGroupLeader.swap(newShardToShardGroupLeader);
_shardGroups.swap(newShardGroups);
_shardToName.swap(newShardToName);
}

Expand Down Expand Up @@ -6064,21 +6129,41 @@ void ClusterInfo::setFailedServers(
#ifdef ARANGODB_USE_GOOGLE_TESTS
void ClusterInfo::setServers(
containers::FlatHashMap<ServerID, std::string> servers) {
WRITE_LOCKER(readLocker, _serversProt.lock);
WRITE_LOCKER(writeLocker, _serversProt.lock);
_servers = std::move(servers);
}

void ClusterInfo::setServerAliases(
containers::FlatHashMap<ServerID, std::string> aliases) {
WRITE_LOCKER(readLocker, _serversProt.lock);
WRITE_LOCKER(writeLocker, _serversProt.lock);
_serverAliases = std::move(aliases);
}

void ClusterInfo::setServerAdvertisedEndpoints(
containers::FlatHashMap<ServerID, std::string> advertisedEndpoints) {
WRITE_LOCKER(readLocker, _serversProt.lock);
WRITE_LOCKER(writeLocker, _serversProt.lock);
_serverAdvertisedEndpoints = std::move(advertisedEndpoints);
}

void ClusterInfo::setShardToShardGroupLeader(
containers::FlatHashMap<ShardID, ShardID> shardToShardGroupLeader) {
WRITE_LOCKER(writeLocker, _planProt.lock);
_shardToShardGroupLeader = std::move(shardToShardGroupLeader);
}

void ClusterInfo::setShardGroups(
containers::FlatHashMap<ShardID, std::shared_ptr<std::vector<ShardID>>>
shardGroups) {
WRITE_LOCKER(writeLocker, _planProt.lock);
_shardGroups = std::move(shardGroups);
}

void ClusterInfo::setShardIds(
containers::FlatHashMap<ShardID, std::shared_ptr<std::vector<ServerID>>>
shardIds) {
WRITE_LOCKER(writeLocker, _currentProt.lock);
_shardIds = std::move(shardIds);
}
#endif

bool ClusterInfo::serverExists(std::string_view serverId) const noexcept {
Expand Down Expand Up @@ -6945,6 +7030,26 @@ VPackBuilder ClusterInfo::toVelocyPack() {
}
}
}
dump.add(VPackValue("shardToShardGroupLeader"));
{
VPackObjectBuilder d(&dump);
for (auto const& s : _shardToShardGroupLeader) {
dump.add(s.first, VPackValue(s.second));
}
}
dump.add(VPackValue("shardGroups"));
{
VPackObjectBuilder d(&dump);
for (auto const& s : _shardGroups) {
dump.add(VPackValue(s.first));
{
VPackArrayBuilder d2(&dump);
for (auto const& ss : *s.second) {
dump.add(VPackValue(ss));
}
}
}
}
dump.add(VPackValue("shards"));
{
VPackObjectBuilder d(&dump);
Expand Down
70 changes: 70 additions & 0 deletions arangod/Cluster/ClusterInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,28 @@ class ClusterInfo final {
containers::FlatHashMap<ShardID, ServerID> getResponsibleServers(
containers::FlatHashSet<ShardID> const&);

//////////////////////////////////////////////////////////////////////////////
/// @brief atomically find all servers who are responsible for the given
/// shards (choose either the leader or some follower for each, but
/// make the choice consistent with `distributeShardsLike` dependencies.
/// Will throw an exception if no leader can be found for any
/// of the shards. Will return an empty result if the shards couldn't be
/// determined after a while - it is the responsibility of the caller to
/// check for an empty result!
/// The map `result` can already contain a partial choice, this method
/// ensures that all the shards in `list` are in the end set in the
/// `result` map. Additional shards can be added to `result` as needed,
/// in particular the shard prototypes of the shards in list will be added.
/// It is not allowed that `result` contains a setting for a shard but
/// no setting (or a different one) for its shard prototype!
//////////////////////////////////////////////////////////////////////////////

#ifdef USE_ENTERPRISE
void getResponsibleServersReadFromFollower(
containers::FlatHashSet<ShardID> const& list,
containers::FlatHashMap<ShardID, ServerID>& result);
#endif

//////////////////////////////////////////////////////////////////////////////
/// @brief find the shard list of a collection, sorted numerically
//////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -946,6 +968,16 @@ class ClusterInfo final {

void setServerAdvertisedEndpoints(
containers::FlatHashMap<ServerID, std::string> advertisedEndpoints);

void setShardToShardGroupLeader(
containers::FlatHashMap<ShardID, ShardID> shardToShardGroupLeader);

void setShardGroups(
containers::FlatHashMap<ShardID, std::shared_ptr<std::vector<ShardID>>>);

void setShardIds(
containers::FlatHashMap<ShardID, std::shared_ptr<std::vector<ServerID>>>
shardIds);
#endif

bool serverExists(std::string_view serverID) const noexcept;
Expand Down Expand Up @@ -1232,6 +1264,44 @@ class ClusterInfo final {
// planned shard ID => collection name
containers::FlatHashMap<ShardID, CollectionID> _shardToName;

// planned shard ID => shard ID of shard group leader
// This deserves an explanation. If collection B has `distributeShardsLike`
// collection A, then A and B have the same number of shards. We say that
// the k-th shard of A and the k-th shard of B are in the same "shard group".
// This can be true for multiple collections, but they must then always
// have the same collection A under `distributeShardsLike`. The shard of
// collection A is then called the "shard group leader". It is guaranteed that
// the shards of a shard group are always planned to be on the same
// dbserver, and the leader is always the same for all shards in the group.
// If a shard is a shard group leader, it does not appear in this map.
// Example:
// Collection: A B C
// Shard index 0: s1 s5 s9
// Shard index 1: s2 s6 s10
// Shard index 2: s3 s7 s11
// Shard index 3: s4 s8 s12
// Here, collection B has "distributeShardsLike" set to "A",
// collection C has "distributeShardsLike" set to "B",
// the `numberOfShards` is 4 for all three collections.
// Shard groups are: s1, s5, s9
// and: s2, s6, s10
// and: s3, s7, s11
// and: s4, s8, s12
// Shard group leaders are s1, s2, s3 and s4.
// That is, "shard group" is across collections, "shard index" is
// within a collection.
// All three collections must have the same `replicationFactor`, and
// it is guaranteed, that all shards in a group always have the same
// leader and the same list of followers.
// Note however, that a follower for a shard group can be in sync with
// its leader for some of the shards in the group and not for others!
// Note that shard group leaders themselves do not appear in this map:
containers::FlatHashMap<ShardID, ShardID> _shardToShardGroupLeader;
// In the following map we store for each shard group leader the list
// of shards in the group, including the leader.
containers::FlatHashMap<ShardID, std::shared_ptr<std::vector<ShardID>>>
_shardGroups;

AllViews _plannedViews; // from Plan/Views/
AllViews _newPlannedViews; // views that have been created during `loadPlan`
// execution
Expand Down
Loading

0 comments on commit f96eb80

Please sign in to comment.