Skip to content

Commit

Permalink
Few improvements.
Browse files Browse the repository at this point in the history
1. Stabilized naming scheme for snapshot files.
   The behavior is determined by dbfilename flag.
   By default it's 'dump'. Dragonfly checks if the flag has an extension
   if not, it automatically saves timestamped files (dump*.rdb) and loads the latest file that is available.
   In case the flag has extension like 'dump.rdb' it fallbacks to redis behavior of loading and saving to the
   same file.

2. Updated the logo url.
  • Loading branch information
romange committed May 19, 2022
1 parent 9eb13b5 commit 439070d
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/core/dash_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ template <unsigned NUM_SLOTS> class SlotBitmap {
// probe=true GetProbe returns index of probing entries, i.e. hosted but not owned by this bucket.
// probe=false - mask of owning entries
uint32_t GetProbe(bool probe) const {
if (SINGLE)
if constexpr (SINGLE)
return ((val_[0].d >> 4) & kAllocMask) ^ ((!probe) * kAllocMask);
return (val_[1].d & kAllocMask) ^ ((!probe) * kAllocMask);
}
Expand Down
2 changes: 1 addition & 1 deletion src/facade/dragonfly_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Listener::Listener(Protocol protocol, ServiceInterface* e) : service_(e), protoc
ctx_ = CreateSslCntx();
}
http_base_.reset(new HttpListener<>);

http_base_->set_resource_prefix("https://romange.s3.eu-west-1.amazonaws.com/static");
http_base_->enable_metrics();
}

Expand Down
10 changes: 9 additions & 1 deletion src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
#include "server/common.h"

#include <absl/strings/str_cat.h>
#include <mimalloc.h>

extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
#include "redis/zmalloc.h"
}

#include "base/logging.h"
Expand All @@ -27,6 +29,11 @@ unsigned kernel_version = 0;
size_t max_memory_limit = 0;

ServerState::ServerState() {
CHECK(mi_heap_get_backing() == mi_heap_get_default());

mi_heap_t* tlh = mi_heap_new();
init_zmalloc_threadlocal(tlh);
data_heap_ = tlh;
}

ServerState::~ServerState() {
Expand Down Expand Up @@ -144,8 +151,9 @@ bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes) {
return false;
}
d *= scale;
if (d > kint64max || d < 0)
if (int64_t(d) > kint64max || d < 0)
return false;

*num_bytes = static_cast<int64>(d + 0.5);
if (neg) {
*num_bytes = -*num_bytes;
Expand Down
36 changes: 18 additions & 18 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
#include "server/error.h"
#include "server/main_service.h"
#include "server/rdb_load.h"
#include "server/server_state.h"
#include "server/string_family.h"
#include "server/transaction.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/uring/uring_file.h"

DECLARE_string(dir);
DECLARE_string(dbfilename);
Expand Down Expand Up @@ -150,13 +150,23 @@ void DebugCmd::Reload(CmdArgList args) {
}
}


string last_save_file = sf_.LastSaveFile();
Load(last_save_file);
}

void DebugCmd::Load(std::string_view filename) {
EngineShardSet& ess = *shard_set;
auto [current, switched] = sf_.global_state()->Next(GlobalState::LOADING);
if (!switched) {
LOG(WARNING) << GlobalState::Name(current) << " in progress, ignored";
return;
}

ProactorPool& pp = sf_.service().proactor_pool();
pp.Await([&](ProactorBase*) {
CHECK(ServerState::tlocal()->gstate() == GlobalState::IDLE);
ServerState::tlocal()->set_gstate(GlobalState::LOADING);
});

const CommandId* cid = sf_.service().FindCmd("FLUSHALL");
intrusive_ptr<Transaction> flush_trans(new Transaction{cid});
flush_trans->InitByArgs(0, {});
Expand All @@ -174,24 +184,14 @@ void DebugCmd::Load(std::string_view filename) {
dir_path.append(filename);
path = dir_path;
}
auto res = uring::OpenRead(path.generic_string());

if (!res) {
(*cntx_)->SendError(res.error().message());
return;
}

VLOG(1) << "Performing load";
io::FileSource fs(*res);

RdbLoader loader(&ess, sf_.script_mgr());
ec = loader.Load(&fs);

// switches back to
ec = sf_.LoadRdb(path.generic_string());
if (ec) {
(*cntx_)->SendError(ec.message());
} else {
(*cntx_)->SendOk();
return (*cntx_)->SendError(ec.message());
}

(*cntx_)->SendOk();
}

void DebugCmd::Populate(CmdArgList args) {
Expand Down
12 changes: 5 additions & 7 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ extern "C" {

#include "base/logging.h"
#include "server/blocking_controller.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"
#include "util/fiber_sched_algo.h"
Expand Down Expand Up @@ -81,16 +82,13 @@ void EngineShard::Shutdown() {

void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
CHECK(shard_ == nullptr) << pb->GetIndex();
CHECK(mi_heap_get_backing() == mi_heap_get_default());

mi_heap_t* tlh = mi_heap_new();
init_zmalloc_threadlocal(tlh);

void* ptr = mi_heap_malloc_aligned(tlh, sizeof(EngineShard), alignof(EngineShard));
shard_ = new (ptr) EngineShard(pb, update_db_time, tlh);
mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
void* ptr = mi_heap_malloc_aligned(data_heap, sizeof(EngineShard), alignof(EngineShard));
shard_ = new (ptr) EngineShard(pb, update_db_time, data_heap);

CompactObj::InitThreadLocal(shard_->memory_resource());
SmallString::InitThreadLocal(tlh);
SmallString::InitThreadLocal(data_heap);

if (!FLAGS_backing_prefix.empty()) {
string fn =
Expand Down
6 changes: 3 additions & 3 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,13 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
const InitOpts& opts) {
InitRedisTables();

uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
shard_set->Init(shard_num, !opts.disable_time_update);

pp_.AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
ServerState::tlocal()->Init();
});

uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
shard_set->Init(shard_num, !opts.disable_time_update);

request_latency_usec.Init(&pp_);
StringFamily::Init(&pp_);
GenericFamily::Init(&pp_);
Expand Down
122 changes: 109 additions & 13 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ extern "C" {

#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/debugcmd.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/main_service.h"
#include "server/rdb_load.h"
#include "server/rdb_save.h"
#include "server/replica.h"
#include "server/script_mgr.h"
Expand All @@ -37,7 +39,7 @@ extern "C" {
#include "util/uring/uring_file.h"

DEFINE_string(dir, "", "working directory");
DEFINE_string(dbfilename, "", "the filename to save/load the DB");
DEFINE_string(dbfilename, "dump", "the filename to save/load the DB");
DEFINE_string(requirepass, "", "password for AUTH authentication");

DECLARE_uint32(port);
Expand Down Expand Up @@ -77,11 +79,37 @@ error_code CreateDirs(fs::path dir_path) {
return ec;
}

string UnknowSubCmd(string_view subcmd, string cmd) {
string UnknownSubCmd(string_view subcmd, string cmd) {
return absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, "'. Try ",
cmd, " HELP.");
}

string InferLoadFile(fs::path data_dir) {
if (FLAGS_dbfilename.empty())
return string{};

fs::path fl_path = data_dir.append(FLAGS_dbfilename);

if (fs::exists(fl_path))
return fl_path.generic_string();
if (!fl_path.has_extension()) {
string glob = fl_path.generic_string();
glob.append("*.rdb");

io::Result<io::StatShortVec> short_vec = io::StatFiles(glob);
if (short_vec) {
if (!short_vec->empty()) {
return short_vec->back().name;
}
} else {
LOG(WARNING) << "Could not stat " << glob << ", error " << short_vec.error().message();
}
LOG(INFO) << "Checking " << fl_path;
}

return string{};
}

} // namespace

ServerFamily::ServerFamily(Service* service) : service_(*service) {
Expand Down Expand Up @@ -114,22 +142,29 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m

task_10ms_ = pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(10, cache_cb); });

fs::path data_path = fs::current_path();
fs::path data_folder = fs::current_path();

if (!FLAGS_dir.empty()) {
data_path = FLAGS_dir;
data_folder = FLAGS_dir;

error_code ec;

data_path = fs::canonical(data_path, ec);
data_folder = fs::canonical(data_folder, ec);
}

LOG(INFO) << "Data directory is " << data_path;
LOG(INFO) << "Data directory is " << data_folder;
string load_path = InferLoadFile(data_folder);
if (!load_path.empty()) {
Load(load_path);
}
}

void ServerFamily::Shutdown() {
VLOG(1) << "ServerFamily::Shutdown";

if (load_fiber_.joinable())
load_fiber_.join();

pb_task_->Await([this] {
pb_task_->CancelPeriodic(task_10ms_);
task_10ms_ = 0;
Expand All @@ -141,6 +176,63 @@ void ServerFamily::Shutdown() {
});
}

void ServerFamily::Load(const std::string& load_path) {
CHECK(!load_fiber_.get_id());

error_code ec;
auto path = fs::canonical(load_path, ec);
if (ec) {
LOG(ERROR) << "Error loading " << load_path << " " << ec.message();
return;
}

LOG(INFO) << "Loading " << load_path;
auto [current, switched] = global_state()->Next(GlobalState::LOADING);
if (!switched) {
LOG(WARNING) << GlobalState::Name(current) << " in progress, ignored";
return;
}

auto& pool = service_.proactor_pool();

// Deliberitely run on all I/O threads to update the state for non-shard threads as well.
pool.Await([&](ProactorBase*) {
// TODO: There can be a bug where status is different.
CHECK(ServerState::tlocal()->gstate() == GlobalState::IDLE);
ServerState::tlocal()->set_gstate(GlobalState::LOADING);
});

// Choose thread that does not handle shards if possible.
// This will balance out the CPU during the load.
ProactorBase* proactor =
shard_count() < pool.size() ? pool.at(shard_count()) : pool.GetNextProactor();

load_fiber_ = proactor->LaunchFiber([load_path, this] {
auto ec = LoadRdb(load_path);
LOG_IF(ERROR, ec) << "Error loading file " << ec.message();
});
}

error_code ServerFamily::LoadRdb(const std::string& rdb_file) {
io::ReadonlyFileOrError res = uring::OpenRead(rdb_file);
error_code ec;

if (res) {
io::FileSource fs(*res);

RdbLoader loader(shard_set, script_mgr());
ec = loader.Load(&fs);
} else {
ec = res.error();
}

auto& pool = service_.proactor_pool();
pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); });
global_state()->Clear();

return ec;
}

void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) {
if (!section.empty()) {
return cntx->reply_builder()->SendError("");
Expand Down Expand Up @@ -188,8 +280,6 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext*
}

error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
static unsigned fl_index = 1;

auto [current, switched] = global_state_.Next(GlobalState::SAVING);
if (!switched) {
*err_details = StrCat(GlobalState::Name(current), " - can not save database");
Expand All @@ -209,10 +299,16 @@ error_code ServerFamily::DoSave(Transaction* trans, string* err_details) {
}
}

string filename = FLAGS_dbfilename.empty() ? "dump_save.rdb" : FLAGS_dbfilename;
fs::path filename = FLAGS_dbfilename.empty() ? "dump" : FLAGS_dbfilename;
fs::path path = dir_path;
path.append(filename);
path.concat(StrCat("_", fl_index++));

if (!filename.has_extension()) {
absl::Time now = absl::Now();
string ft_time = absl::FormatTime("-%Y-%m-%dT%H:%M:%S", now, absl::UTCTimeZone());
filename += StrCat(ft_time, ".rdb");
}
path += filename;

VLOG(1) << "Saving to " << path;

auto res = uring::OpenWrite(path.generic_string());
Expand Down Expand Up @@ -370,7 +466,7 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
}

LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CLIENT"), kSyntaxErr);
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErr);
}

void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
Expand All @@ -394,7 +490,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
});
return (*cntx)->SendOk();
} else {
return (*cntx)->SendError(UnknowSubCmd(sub_cmd, "CONFIG"), kSyntaxErr);
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErr);
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class ServerFamily {

std::string LastSaveFile() const;

std::error_code LoadRdb(const std::string& rdb_file);

private:
uint32_t shard_count() const {
return shard_set->size();
Expand Down Expand Up @@ -97,6 +99,11 @@ class ServerFamily {

void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);

void Load(const std::string& file_name);


boost::fibers::fiber load_fiber_;

uint32_t task_10ms_ = 0;
Service& service_;

Expand Down

0 comments on commit 439070d

Please sign in to comment.