Skip to content

Commit

Permalink
feat: add RestoreStreamer to migration process
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Jan 15, 2024
1 parent 17a3044 commit 95a315c
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/server/cluster/cluster_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
namespace dfly {

using SlotId = uint16_t;
// TODO consider to use bit set or some more compact way to store SlotId
using SlotSet = absl::flat_hash_set<SlotId>;

class ClusterConfig {
Expand Down
53 changes: 45 additions & 8 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "server/dflycmd.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/journal/streamer.h"
#include "server/main_service.h"
#include "server/replica.h"
#include "server/server_family.h"
Expand Down Expand Up @@ -51,6 +52,19 @@ thread_local shared_ptr<ClusterConfig> tl_cluster_config;

} // namespace

ClusterFamily::FlowInfo::~FlowInfo() = default;
ClusterFamily::MigrationInfo::~MigrationInfo() = default;
ClusterFamily::MigrationInfo::MigrationInfo(std::uint32_t flows_num, std::string ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots,
Context::ErrHandler err_handler)
: host_ip(ip),
flows(flows_num),
slots(slots),
port(port),
cntx(err_handler),
state(ClusterSlotMigration::State::C_CONNECTING) {
}

ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(server_family) {
CHECK_NOTNULL(server_family_);
ClusterConfig::Initialize();
Expand Down Expand Up @@ -763,9 +777,15 @@ uint32_t ClusterFamily::CreateMigrationSession(ConnectionContext* cntx, uint16_t
std::vector<ClusterConfig::SlotRange> slots) {
std::lock_guard lk(migration_mu_);
auto sync_id = next_sync_id_++;
auto err_handler = [this, sync_id](const GenericError& err) {
LOG(INFO) << "Slot migration error: " << err.Format();

// Todo add error processing, stop migration process
// fb2::Fiber("stop_Migration", &ClusterFamily::StopMigration, this, sync_id).Detach();
};
auto info = make_shared<MigrationInfo>(shard_set->size(), cntx->conn()->RemoteEndpointAddress(),
sync_id, port, std::move(slots));
auto [it, inserted] = outgoing_migration_infos_.emplace(sync_id, info);
port, std::move(slots), err_handler);
auto [it, inserted] = outgoing_migration_infos_.emplace(sync_id, std::move(info));
CHECK(inserted);
return sync_id;
}
Expand All @@ -787,15 +807,31 @@ void ClusterFamily::Flow(CmdArgList args, ConnectionContext* cntx) {
if (!info)
return cntx->SendError(kIdNotFound);

info->flows[shard_id].conn = cntx->conn();
// info->flows[shard_id].conn = cntx->conn();

cntx->conn()->Migrate(shard_set->pool()->at(shard_id));

cntx->SendOk();

// TODO refactor: maybe we can use vector<slotRange> instead of SlotSet
SlotSet sset;
for (const auto& slot_range : info->slots) {
for (auto i = slot_range.start; i <= slot_range.end; ++i)
sset.insert(i);
}
info->state = ClusterSlotMigration::State::C_FULL_SYNC;

cntx->SendOk();
EngineShard* shard = EngineShard::tlocal();
info->flows[shard_id].streamer = std::make_unique<RestoreStreamer>(
&shard->db_slice(), std::move(sset), sync_id, shard_id, server_family_->journal(),
&info->cntx, [weak_info = weak_ptr(info), shard_id] {
if (auto info = weak_info.lock(); info) {
VLOG(1) << "Change state to C_STABLE_SYNC for shard_id: " << shard_id;
info->state = ClusterSlotMigration::State::C_STABLE_SYNC;
}
});

info->flows[shard_id].conn->socket()->Write(io::Buffer("SYNC"));
info->flows[shard_id].streamer->Start(cntx->conn()->socket());

LOG(INFO) << "Started migation with target node " << info->host_ip << ":" << info->port;
}
Expand All @@ -808,20 +844,21 @@ void ClusterFamily::FullSyncCut(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(err->MakeReply());
}

VLOG(1) << "Full sync cut "
<< " sync_id: " << sync_id << " shard_id: " << shard_id << " shard";

std::lock_guard lck(migration_mu_);
auto migration_it =
std::find_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[sync_id](const auto& el) { return el->getSyncId() == sync_id; });

VLOG(1) << "Full sync cut "
<< " sync_id: " << sync_id << " shard_id: " << shard_id << " shard";

if (migration_it == incoming_migrations_jobs_.end())
return cntx->SendError(kIdNotFound);

if ((*migration_it)->trySetStableSync(shard_id)) {
LOG(INFO) << "STABLE-SYNC state is set for sync_id " << sync_id;
}
cntx->SendOk();
}

shared_ptr<ClusterFamily::MigrationInfo> ClusterFamily::GetMigrationInfo(uint32_t sync_id) {
Expand Down
20 changes: 8 additions & 12 deletions src/server/cluster/cluster_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace dfly {
class CommandRegistry;
class ConnectionContext;
class ServerFamily;
class DflyCmd;
class RestoreStreamer;

class ClusterFamily {
public:
Expand Down Expand Up @@ -81,26 +81,22 @@ class ClusterFamily {
// FlowInfo is used to store state, connection, and all auxiliary data
// that is needed for correct slots (per shard) data transfer
struct FlowInfo {
facade::Connection* conn = nullptr;
// facade::Connection* conn = nullptr;
std::unique_ptr<RestoreStreamer> streamer;
~FlowInfo();
};

// Whole slots migration process information
struct MigrationInfo {
MigrationInfo() = default;
MigrationInfo(std::uint32_t flows_num, std::string ip, uint32_t sync_id, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots)
: host_ip(ip),
flows(flows_num),
slots(slots),
sync_id(sync_id),
port(port),
state(ClusterSlotMigration::State::C_CONNECTING) {
}
~MigrationInfo();
MigrationInfo(std::uint32_t flows_num, std::string ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots, Context::ErrHandler err_handler);
std::string host_ip;
std::vector<FlowInfo> flows;
std::vector<ClusterConfig::SlotRange> slots;
uint32_t sync_id;
uint16_t port;
Context cntx;
ClusterSlotMigration::State state = ClusterSlotMigration::State::C_NO_STATE;
};

Expand Down
4 changes: 0 additions & 4 deletions src/server/cluster/cluster_shard_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ void ClusterShardMigration::FullSyncShardFb(Context* cntx) {
TransactionReader tx_reader{};

while (!cntx->IsCancelled()) {
waker_.await([&]() { return cntx->IsCancelled(); });
if (cntx->IsCancelled())
break;

Expand All @@ -97,8 +96,6 @@ void ClusterShardMigration::FullSyncShardFb(Context* cntx) {
// force_ping_ = true;
// journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
}

waker_.notify();
}
}

Expand All @@ -113,7 +110,6 @@ void ClusterShardMigration::ExecuteTxWithNoShardSync(TransactionData&& tx_data,
if (tx_data.shard_cnt <= 1 || !tx_data.IsGlobalCmd()) {
VLOG(2) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
// journal_rec_executed_.fetch_add(tx_data.journal_rec_count, std::memory_order_relaxed);
return;
}

Expand Down
4 changes: 3 additions & 1 deletion src/server/cluster/cluster_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ bool ClusterSlotMigration::trySetStableSync(uint32_t flow) {
if (res) {
// TODO make this when we set new config
state_ = ClusterSlotMigration::C_STABLE_SYNC;
cntx_.Cancel();
for (auto& flow : shard_flows_) {
flow->Cancel();
}
}
return res;
}
Expand Down
16 changes: 14 additions & 2 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,19 @@ void JournalStreamer::WriterFb(io::Sink* dest) {
}

RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id,
journal::Journal* journal, Context* cntx)
journal::Journal* journal, Context* cntx,
std::function<void()> full_sync_cut_cb)
: JournalStreamer(journal, cntx),
db_slice_(slice),
my_slots_(std::move(slots)),
sync_id_(sync_id),
flow_id_(flow_id) {
flow_id_(flow_id),
full_sync_cut_cb(std::move(full_sync_cut_cb)) {
DCHECK(slice != nullptr);
}

void RestoreStreamer::Start(io::Sink* dest) {
VLOG(2) << "RestoreStreamer start";
auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this);
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));

Expand All @@ -83,8 +86,11 @@ void RestoreStreamer::Start(io::Sink* dest) {
}
} while (cursor);

VLOG(2) << "FULL-SYNC-CUT for " << sync_id_ << " : " << flow_id_;
WriteCommand(make_pair(
"DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT", absl::StrCat(sync_id_), absl::StrCat(flow_id_)}));
NotifyWritten(true);
full_sync_cut_cb();
});
}

Expand All @@ -95,6 +101,12 @@ void RestoreStreamer::Cancel() {
JournalStreamer::Cancel();
}

RestoreStreamer::~RestoreStreamer() {
fiber_cancellation_.Cancel();
snapshot_fb_.JoinIfNeeded();
db_slice_->UnregisterOnChange(snapshot_version_);
}

bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const {
if (!item.slot.has_value()) {
return false;
Expand Down
5 changes: 4 additions & 1 deletion src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ class JournalStreamer : protected BufferedStreamerBase {
class RestoreStreamer : public JournalStreamer {
public:
RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id,
journal::Journal* journal, Context* cntx);
journal::Journal* journal, Context* cntx, std::function<void()> full_sync_cut_cb);

void Start(io::Sink* dest) override;
void Cancel() override;

~RestoreStreamer();

private:
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
bool ShouldWrite(const journal::JournalItem& item) const override;
Expand All @@ -75,6 +77,7 @@ class RestoreStreamer : public JournalStreamer {
uint32_t flow_id_;
Fiber snapshot_fb_;
Cancellation fiber_cancellation_;
std::function<void()> full_sync_cut_cb;
};

} // namespace dfly
8 changes: 4 additions & 4 deletions tests/dragonfly/cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,20 +801,20 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
)
assert "OK" == res

await asyncio.sleep(0.5)
await asyncio.sleep(1)

status = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
assert "FULL_SYNC" == status
assert "STABLE_SYNC" == status

status = await c_nodes_admin[0].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[1].port)
)
assert "FULL_SYNC" == status
assert "STABLE_SYNC" == status

status = await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
assert ["out 127.0.0.1:30002 FULL_SYNC"] == status
assert ["out 127.0.0.1:30002 STABLE_SYNC"] == status

try:
await c_nodes_admin[1].execute_command(
Expand Down

0 comments on commit 95a315c

Please sign in to comment.