Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Locking for ResignLeadership #20837

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 100 additions & 12 deletions arangod/Cluster/Maintenance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,9 @@ static void handleLocalShard(
MaintenanceFeature::ShardActionMap const& shardActionMap,
replication::Version replicationVersion,
std::optional<ReplicatedLogStatusMap> const& localLogs,
std::optional<ShardIdToLogIdMap> const& shardsToLogs) {
std::optional<ShardIdToLogIdMap> const& shardsToLogs,
DistributeShardsLikeMapping const& distributeShardsLike,
ClonePrototypeMapping const& clonePrototypeMapping) {
// First check if the shard is locked:
auto iter = shardActionMap.find(shname);
if (iter != shardActionMap.end()) {
Expand Down Expand Up @@ -574,15 +576,27 @@ static void handleLocalShard(

if (replicationVersion != replication::Version::TWO &&
(activeResign || adjustResignState)) {
description = std::make_shared<ActionDescription>(
std::map<std::string, std::string>{{NAME, RESIGN_SHARD_LEADERSHIP},
{DATABASE, dbname},
{SHARD, shname},
{CLONES, ""}},
RESIGN_PRIORITY, true);
makeDirty.insert(dbname);
callNotify = true;
actions.emplace_back(description);
// only handle prototypes
if (clonePrototypeMapping.find(shname) == clonePrototypeMapping.end()) {
auto clones = [&] {
auto myClones = distributeShardsLike.find(shname);
if (myClones != distributeShardsLike.end()) {
return basics::StringUtils::join(myClones->second);
}

return std::string{};
}();

description = std::make_shared<ActionDescription>(
std::map<std::string, std::string>{{NAME, RESIGN_SHARD_LEADERSHIP},
{DATABASE, dbname},
{SHARD, shname},
{CLONES, clones}},
RESIGN_PRIORITY, true);
makeDirty.insert(dbname);
callNotify = true;
actions.emplace_back(description);
}
}

// We only drop indexes, when collection is not being dropped already
Expand Down Expand Up @@ -649,6 +663,77 @@ VPackBuilder getShardMap(VPackSlice const& collections) {
return shardMap;
}

std::set<ShardID> extractShardsFromCollection(VPackSlice collection) {
auto shards = collection.get(SHARDS);
if (!shards.isObject()) {
return {};
}

std::set<ShardID> result;
for (auto shard : VPackObjectIterator(collection.get(SHARDS))) {
result.insert(ShardID::shardIdFromString(shard.key.stringView()).get());
}
return result;
}

std::tuple<DistributeShardsLikeMapping, ClonePrototypeMapping>
getDistributeShardsLike(VPackSlice const& collections) {
maierlars marked this conversation as resolved.
Show resolved Hide resolved
DistributeShardsLikeMapping mapping;
ClonePrototypeMapping reverseMapping;

// maps a collection to the sorted set of shards
std::unordered_map<std::string_view, std::set<ShardID>> prototypeShards;

maierlars marked this conversation as resolved.
Show resolved Hide resolved
if (!collections.isObject()) {
return {};
}

for (auto collection : VPackObjectIterator(collections)) {
TRI_ASSERT(collection.value.isObject());

auto prototype = [&] {
if (auto s = collection.value.get("distributeShardsLike"); s.isString()) {
return s.stringView();
}
return std::string_view{};
}();

if (prototype.empty()) {
continue;
}
// check is we have collected information about this prototype already
maierlars marked this conversation as resolved.
Show resolved Hide resolved
auto it = prototypeShards.find(prototype);
if (it == prototypeShards.end()) {
auto protoCol = collections.get(prototype);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I had a hard time read protoCol because I was intuitively thinking of protocol (as in HTTP, HTTPS, etc.). I get that it means protoCollection here, but naming it protoCol is slightly ambiguous, so I suggest using protoCollection instead.

TRI_ASSERT(protoCol.isObject()) << prototype << " " << protoCol.toJson();

bool inserted;
std::tie(it, inserted) = prototypeShards.emplace(
prototype, extractShardsFromCollection(protoCol));
TRI_ASSERT(inserted);
}

// now zip both shard sets in sorted order and map the prototype shards
// to the clone shards
auto cloneShards = extractShardsFromCollection(collection.value);

auto iterA = it->second.begin();
auto iterB = cloneShards.begin();

while (iterA != it->second.end()) {
TRI_ASSERT(iterB != cloneShards.end());
mapping[*iterA].insert(*iterB);
reverseMapping.emplace(*iterB, *iterA);
++iterA;
++iterB;
}
// prototype and clone have the same number of shards
TRI_ASSERT(iterB == cloneShards.end());
maierlars marked this conversation as resolved.
Show resolved Hide resolved
}

return std::make_pair(std::move(mapping), std::move(reverseMapping));
}

void arangodb::maintenance::diffReplicatedLogs(
DatabaseID const& database, ReplicatedLogStatusMap const& localLogs,
ReplicatedLogSpecMap const& planLogs, std::string const& serverId,
Expand Down Expand Up @@ -1055,8 +1140,10 @@ arangodb::Result arangodb::maintenance::diffPlanLocal(
continue;
}

auto const shardMap = getShardMap(plan); // plan shards -> servers
auto const [distributeShardsLikeMap, clonePrototypeMap] =
getDistributeShardsLike(plan);
for (auto const& lcol : VPackObjectIterator(ldbslice)) {
auto const shardMap = getShardMap(plan); // plan shards -> servers
auto maybeShardID = ShardID::shardIdFromString(lcol.key.stringView());
if (ADB_UNLIKELY(maybeShardID.fail())) {
TRI_ASSERT(false)
Expand All @@ -1067,7 +1154,8 @@ arangodb::Result arangodb::maintenance::diffPlanLocal(
handleLocalShard(ldbname, maybeShardID.get(), lcol.value,
shardMap.slice(), commonShrds, indis, serverId,
actions, makeDirty, callNotify, shardActionMap, rv,
localLogs, shardsToLogs);
localLogs, shardsToLogs, distributeShardsLikeMap,
clonePrototypeMap);
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions arangod/Cluster/Maintenance.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ using ShardIdToLogIdMap =
using ShardIdToLogIdMapByDatabase =
std::unordered_map<DatabaseID, ShardIdToLogIdMap>;

// maps the prototype shard to the clones
using DistributeShardsLikeMapping =
std::unordered_map<ShardID, std::unordered_set<ShardID>>;
// maps clone shards to their prototype shard
using ClonePrototypeMapping = std::unordered_map<ShardID, ShardID>;

/**
* @brief Diff Plan Replicated Logs and Local Replicated Logs for phase
* 1 of Maintenance run
Expand Down
37 changes: 21 additions & 16 deletions arangod/Cluster/ResignShardLeadership.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,7 @@ bool ResignShardLeadership::first() {
DatabaseGuard guard(df, database);
auto vocbase = &guard.database();

auto col = vocbase->lookupCollection(collection);
if (col == nullptr) {
std::stringstream error;
error << "Failed to lookup local collection " << collection
<< " in database " << database;
LOG_TOPIC("e06ca", ERR, Logger::MAINTENANCE)
<< "ResignLeadership: " << error.str();
result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, error.str());
return false;
}

// Get exclusive lock on collection
// Get exclusive lock on all shards
auto origin = transaction::OperationOriginInternal{"resigning leadership"};
auto ctx =
std::make_shared<transaction::StandaloneContext>(*vocbase, origin);
Expand All @@ -130,15 +119,31 @@ bool ResignShardLeadership::first() {
// for now but we will not accept any replication operation from any
// leader, until we have negotiated a deal with it. Then the actual
// name of the leader will be set.
col->followers()->setTheLeader(LeaderNotYetKnownString); // resign
res = methods->abort(); // unlock
for (auto const& s : shards) {
auto col = vocbase->lookupCollection(s);
if (col == nullptr) {
std::stringstream error;
error << "Failed to lookup local collection " << collection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we should include the shard name in the error message here as well, to have more detail should this error happen, e.g.
"Failed to lookup shard s of local collection collection"

<< " in database " << database;
LOG_TOPIC("e06ca", ERR, Logger::MAINTENANCE)
<< "ResignLeadership: " << error.str();
result(TRI_ERROR_ARANGO_DATA_SOURCE_NOT_FOUND, error.str());
return false;
}

col->followers()->setTheLeader(LeaderNotYetKnownString); // resign
maierlars marked this conversation as resolved.
Show resolved Hide resolved
}

res = methods->abort(); // unlock
if (res.fail()) {
LOG_TOPIC("10c35", ERR, Logger::MAINTENANCE)
<< "Failed to abort transaction during resign leadership: " << res;
}

transaction::cluster::abortLeaderTransactionsOnShard(col->id());

for (auto const& s : shards) {
auto col = vocbase->lookupCollection(s);
transaction::cluster::abortLeaderTransactionsOnShard(col->id());
maierlars marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (std::exception const& e) {
std::stringstream error;
error << "exception thrown when resigning:" << e.what();
Expand Down