Skip to content

Commit

Permalink
feat(cluster): Add important logs (#3138)
Browse files Browse the repository at this point in the history
Issue: #2829
  • Loading branch information
chakaz committed Jun 6, 2024
1 parent 37eea74 commit 53c5227
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
4 changes: 4 additions & 0 deletions src/server/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ struct MigrationInfo {
bool operator==(const MigrationInfo& r) const {
return ip == r.ip && port == r.port && slot_ranges == r.slot_ranges && node_id == r.node_id;
}

std::string ToString() const {
return absl::StrCat(node_id, ",", ip, ":", port, " (", SlotRange::ToString(slot_ranges), ")");
}
};

struct ClusterShardInfo {
Expand Down
9 changes: 7 additions & 2 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(kClusterDisabled);
}

VLOG(2) << "Got DFLYCLUSTER command (" << cntx->conn()->GetClientId() << "): " << args;

ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
args.remove_prefix(1); // remove subcommand name
Expand Down Expand Up @@ -505,11 +507,12 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)

lock_guard gu(set_config_mu);

VLOG(1) << "Setting new cluster config: " << json_str;
RemoveOutgoingMigrations(new_config->GetFinishedOutgoingMigrations(tl_cluster_config));
RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(tl_cluster_config));

lock_guard config_update_lk(
config_update_mu_); // to prevent simultaneous update config from outgoing migration
// Prevent simultaneous config update from outgoing migration
lock_guard config_update_lk(config_update_mu_);

SlotRanges enable_slots, disable_slots;

Expand Down Expand Up @@ -562,6 +565,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
SlotSet after = tl_cluster_config->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges();
LOG_IF(INFO, !deleted_slots.empty())
<< "Flushing newly unowned slots: " << SlotRange::ToString(deleted_slots);
DeleteSlots(deleted_slots);
WriteFlushSlotsToJournal(deleted_slots);
}
Expand Down
4 changes: 3 additions & 1 deletion src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ bool IncomingSlotMigration::Join() {
}

void IncomingSlotMigration::Cancel() {
LOG(INFO) << "Cancelling incoming migration of slots " << SlotRange::ToString(slots_);
string_view log_state = state_.load() == MigrationState::C_FINISHED ? "Finishing" : "Cancelling";
LOG(INFO) << log_state << " incoming migration of slots " << SlotRange::ToString(slots_);
cntx_.Cancel();

for (auto& flow : shard_flows_) {
Expand All @@ -147,6 +148,7 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou
state_.store(MigrationState::C_SYNC);

shard_flows_[shard]->Start(&cntx_, source, bc_);
VLOG(1) << "Incoming slot migration flow for shard: " << shard << " finished";
}

size_t IncomingSlotMigration::GetKeyCount() const {
Expand Down
17 changes: 14 additions & 3 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ MigrationState OutgoingMigration::GetState() const {
}

void OutgoingMigration::SyncFb() {
VLOG(1) << "Starting outgoing migration fiber for migration " << migration_info_.ToString();

// we retry starting migration until "cancel" is happened
while (GetState() != MigrationState::C_FINISHED) {
if (!ChangeState(MigrationState::C_CONNECTING)) {
Expand All @@ -141,27 +143,31 @@ void OutgoingMigration::SyncFb() {
ThisFiber::SleepFor(1000ms);
}

VLOG(1) << "Connecting to source";
VLOG(2) << "Connecting to source";
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
VLOG(1) << "Can't connect to source";
cntx_.ReportError(GenericError(ec, "Couldn't connect to source."));
continue;
}

VLOG(1) << "Migration initiating";
VLOG(2) << "Migration initiating";
ResetParser(false);
auto cmd = absl::StrCat("DFLYMIGRATE INIT ", cf_->MyID(), " ", slot_migrations_.size());
for (const auto& s : migration_info_.slot_ranges) {
absl::StrAppend(&cmd, " ", s.start, " ", s.end);
}

if (auto ec = SendCommandAndReadResponse(cmd); ec) {
VLOG(1) << "Unable to initialize migration";
cntx_.ReportError(GenericError(ec, "Could not send INIT command."));
continue;
}

if (!CheckRespIsSimpleReply("OK")) {
VLOG(2) << "Received non-OK response, retrying";
if (!CheckRespIsSimpleReply(kUnknownMigration)) {
VLOG(2) << "Target node does not recognize migration";
cntx_.ReportError(GenericError(std::string(ToSV(LastResponseArgs().front().GetBuf()))));
}
continue;
Expand All @@ -185,21 +191,26 @@ void OutgoingMigration::SyncFb() {
});

if (CheckFlowsForErrors()) {
VLOG(1) << "Errors detected, retrying outgoing migration";
continue;
}

VLOG(1) << "Migrations snapshot is finished";
VLOG(2) << "Migrations snapshot is finished";

long attempt = 0;
while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) {
// process commands that were on pause and try again
VLOG(2) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
}
if (CheckFlowsForErrors()) {
VLOG(1) << "Errors detected, retrying outgoing migration";
continue;
}
break;
}

VLOG(1) << "Exiting outgoing migration fiber for migration " << migration_info_.ToString();
}

bool OutgoingMigration::FinalizeMigration(long attempt) {
Expand Down

0 comments on commit 53c5227

Please sign in to comment.