Skip to content

Commit

Permalink
feat: Implement rdb snapshot write directly into s3.
Browse files Browse the repository at this point in the history
1. Load flow reorganized - most of the logic is now in InferLoadFile function.
   S3 read is not yet supported.
2. Write path is implemented. Specifically, you can use undocumented (by design) option to save like:
   `SAVE rdb s3://bucket/path/file`.
3. When using `--dir=s3://bucket/path/` it also saves into s3 on shutdown.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange committed May 12, 2023
1 parent 39aed00 commit d8b6b05
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 42 deletions.
4 changes: 2 additions & 2 deletions src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc)


cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib
absl::random_random TRDP::jsoncons zstd TRDP::lz4)
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib aws_lib strings_lib html_lib
http_client_lib absl::random_random TRDP::jsoncons zstd TRDP::lz4)

add_library(dfly_test_lib test_utils.cc)
cxx_link(dfly_test_lib dragonfly_lib facade_test gtest_main_ext)
Expand Down
200 changes: 161 additions & 39 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <absl/strings/match.h>
#include <absl/strings/str_join.h>
#include <absl/strings/str_replace.h>
#include <absl/strings/strip.h>
#include <sys/resource.h>

#include <algorithm>
Expand Down Expand Up @@ -45,6 +46,8 @@ extern "C" {
#include "server/version.h"
#include "strings/human_readable.h"
#include "util/accept_server.h"
#include "util/cloud/aws.h"
#include "util/cloud/s3.h"
#include "util/fibers/fiber_file.h"
#include "util/uring/uring_file.h"

Expand Down Expand Up @@ -82,8 +85,10 @@ using strings::HumanReadableNumBytes;
namespace {

const auto kRedisVersion = "6.2.11";
constexpr string_view kS3Prefix = "s3://"sv;

const auto kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT;
const size_t kBucketConnectMs = 2000;

using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx);

Expand Down Expand Up @@ -114,12 +119,74 @@ void SubstituteFilenameTsPlaceholder(fs::path* filename, std::string_view replac
*filename = absl::StrReplaceAll(filename->string(), {{"{timestamp}", replacement}});
}

string InferLoadFile(fs::path data_dir) {
bool IsCloudPath(string_view path) {
return absl::StartsWith(path, kS3Prefix);
}

// Returns bucket_name, obj_path for an s3 path.
optional<pair<string, string>> GetBucketPath(string_view path) {
string_view clean = absl::StripPrefix(path, kS3Prefix);

size_t pos = clean.find('/');
if (pos == string_view::npos)
return nullopt;

string bucket_name{clean.substr(0, pos)};
string obj_path{clean.substr(pos + 1)};
return make_pair(move(bucket_name), move(obj_path));
}

string InferLoadFile(string_view dir, cloud::AWS* aws) {
fs::path data_folder;
string_view bucket_name, obj_path;

if (dir.empty()) {
data_folder = fs::current_path();
} else {
if (IsCloudPath(dir)) {
CHECK(aws);
auto res = GetBucketPath(dir);
if (!res) {
LOG(ERROR) << "Invalid S3 path: " << dir;
return {};
}
data_folder = dir;
bucket_name = res->first;
obj_path = res->second;
} else {
error_code file_ec;
data_folder = fs::canonical(dir, file_ec);
if (file_ec) {
LOG(ERROR) << "Data directory error: " << file_ec.message() << " for dir " << dir;
return {};
}
}
}

LOG(INFO) << "Data directory is " << data_folder;

const auto& dbname = GetFlag(FLAGS_dbfilename);
if (dbname.empty())
return string{};

fs::path fl_path = data_dir.append(dbname);
if (IsCloudPath(dir)) {
cloud::S3Bucket bucket(*aws, bucket_name);
ProactorBase* proactor = shard_set->pool()->GetNextProactor();
auto ec = proactor->Await([&] { return bucket.Connect(kBucketConnectMs); });
if (ec) {
LOG(ERROR) << "Couldn't connect to S3 bucket: " << ec.message();
return {};
}

fs::path fl_path{obj_path};
fl_path.append(dbname);

LOG(INFO) << "Loading from s3 path s3://" << bucket_name << "/" << fl_path;
// TODO: to load from S3 file.
return {};
}

fs::path fl_path = data_folder.append(dbname);
if (fs::exists(fl_path))
return fl_path.generic_string();

Expand Down Expand Up @@ -204,9 +271,18 @@ class RdbSnapshot {
return started_ || (saver_ && saver_->Mode() == SaveMode::SUMMARY);
}

// Sets a pointer to global aws object that provides an auth key.
// The ownership stays with the caller.
void SetAWS(cloud::AWS* aws) {
aws_ = aws;
}

private:
bool started_ = false;
FiberQueueThreadPool* fq_tp_;
bool is_linux_file_ = false;
FiberQueueThreadPool* fq_tp_ = nullptr;
cloud::AWS* aws_ = nullptr;

unique_ptr<io::Sink> io_sink_;
unique_ptr<RdbSaver> saver_;
RdbTypeFreqMap freq_map_;
Expand All @@ -226,20 +302,44 @@ io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
GenericError RdbSnapshot::Start(SaveMode save_mode, const std::string& path,
const StringVec& lua_scripts) {
bool is_direct = false;
if (fq_tp_) { // EPOLL
auto res = util::OpenFiberWriteFile(path, fq_tp_);
if (!res)
VLOG(1) << "Saving RDB " << path;

if (IsCloudPath(path)) {
DCHECK(aws_);

optional<pair<string_view, string_view>> bucket_path = GetBucketPath(path);
if (!bucket_path) {
return GenericError("Invalid S3 path");
}
auto [bucket_name, obj_path] = *bucket_path;

cloud::S3Bucket bucket(*aws_, bucket_name);
error_code ec = bucket.Connect(kBucketConnectMs);
if (ec) {
return GenericError(ec, "Couldn't connect to S3 bucket");
}
auto res = bucket.OpenWriteFile(obj_path);
if (!res) {
return GenericError(res.error(), "Couldn't open file for writing");
}
io_sink_.reset(*res);
} else {
auto res = OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return GenericError(
res.error(),
"Couldn't open file for writing (is direct I/O supported by the file system?)");
if (fq_tp_) { // EPOLL
auto res = util::OpenFiberWriteFile(path, fq_tp_);
if (!res)
return GenericError(res.error(), "Couldn't open file for writing");
io_sink_.reset(*res);
} else {
auto res = OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return GenericError(
res.error(),
"Couldn't open file for writing (is direct I/O supported by the file system?)");
}
is_linux_file_ = true;
io_sink_.reset(new LinuxWriteWrapper(res->release()));
is_direct = kRdbWriteFlags & O_DIRECT;
}
io_sink_.reset(new LinuxWriteWrapper(res->release()));
is_direct = kRdbWriteFlags & O_DIRECT;
}

saver_.reset(new RdbSaver(io_sink_.get(), save_mode, is_direct));
Expand All @@ -252,11 +352,10 @@ error_code RdbSnapshot::SaveBody() {
}

error_code RdbSnapshot::Close() {
// TODO: to solve it in a more elegant way.
if (fq_tp_) {
return static_cast<io::WriteFile*>(io_sink_.get())->Close();
if (is_linux_file_) {
return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close();
}
return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close();
return static_cast<io::WriteFile*>(io_sink_.get())->Close();
}

void RdbSnapshot::StartInShard(EngineShard* shard) {
Expand All @@ -279,7 +378,9 @@ void ExtendDfsFilenameWithShard(int shard, fs::path* filename) {
}

GenericError ValidateFilename(const fs::path& filename, bool new_version) {
if (!filename.parent_path().empty()) {
bool is_cloud_path = IsCloudPath(filename.string());

if (!filename.parent_path().empty() && !is_cloud_path) {
return {absl::StrCat("filename may not contain directory separators (Got \"", filename.c_str(),
"\")")};
}
Expand Down Expand Up @@ -446,22 +547,17 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
stats_caching_task_ =
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });

fs::path data_folder = fs::current_path();
const auto& dir = GetFlag(FLAGS_dir);

error_code file_ec;
if (!dir.empty()) {
data_folder = fs::canonical(dir, file_ec);
string flag_dir = GetFlag(FLAGS_dir);
if (IsCloudPath(flag_dir)) {
aws_ = make_unique<cloud::AWS>("s3");
if (auto ec = aws_->Init(); ec) {
LOG(FATAL) << "Failed to initialize AWS " << ec;
}
}

if (!file_ec) {
LOG(INFO) << "Data directory is " << data_folder;
string load_path = InferLoadFile(data_folder);
if (!load_path.empty()) {
load_result_ = Load(load_path);
}
} else {
LOG(ERROR) << "Data directory error: " << file_ec.message();
string load_path = InferLoadFile(flag_dir, aws_.get());
if (!load_path.empty()) {
load_result_ = Load(load_path);
}

string save_time = GetFlag(FLAGS_save_schedule);
Expand Down Expand Up @@ -918,7 +1014,7 @@ GenericError DoPartialSave(PartialSaveOpts opts, const dfly::StringVec& scripts,

// Start rdb saving.
SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
GenericError local_ec = snapshot->Start(mode, full_path.generic_string(), scripts);
GenericError local_ec = snapshot->Start(mode, full_path.string(), scripts);

if (!local_ec && mode == SaveMode::SINGLE_SHARD) {
snapshot->StartInShard(shard);
Expand All @@ -933,10 +1029,10 @@ GenericError ServerFamily::DoSave() {
boost::intrusive_ptr<Transaction> trans(
new Transaction{cid, ServerState::tlocal()->thread_index()});
trans->InitByArgs(0, {});
return DoSave(absl::GetFlag(FLAGS_df_snapshot_format), trans.get());
return DoSave(absl::GetFlag(FLAGS_df_snapshot_format), {}, trans.get());
}

GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transaction* trans) {
fs::path dir_path(GetFlag(FLAGS_dir));
AggregateGenericError ec;

Expand All @@ -959,7 +1055,13 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {

absl::Time start = absl::Now();

fs::path filename = GetFlag(FLAGS_dbfilename);
fs::path filename;

if (basename.empty())
filename = GetFlag(FLAGS_dbfilename);
else
filename = basename;

if (auto ec = ValidateFilename(filename, new_version); ec) {
return ec;
}
Expand Down Expand Up @@ -1014,6 +1116,7 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
auto& snapshot = snapshots[shard_set->size()];
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));

snapshot->SetAWS(aws_.get());
if (auto local_ec = DoPartialSave(file_opts, scripts, snapshot.get(), nullptr); local_ec) {
ec = local_ec;
snapshot.reset();
Expand All @@ -1024,6 +1127,7 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
auto cb = [&](Transaction* t, EngineShard* shard) {
auto& snapshot = snapshots[shard->shard_id()];
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
snapshot->SetAWS(aws_.get());
if (auto local_ec = DoPartialSave(file_opts, {}, snapshot.get(), shard); local_ec) {
ec = local_ec;
snapshot.reset();
Expand All @@ -1042,8 +1146,21 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {

snapshots[0].reset(new RdbSnapshot(fq_threadpool_.get()));
auto lua_scripts = get_scripts();
string path_str = path.string();

if (IsCloudPath(path_str)) {
if (!aws_) {
aws_ = make_unique<cloud::AWS>("s3");
if (auto ec = aws_->Init(); ec) {
LOG(ERROR) << "Failed to initialize AWS " << ec;
aws_.reset();
return GenericError(ec, "Couldn't initialize AWS");
}
}
snapshots[0]->SetAWS(aws_.get());
}

ec = snapshots[0]->Start(SaveMode::RDB, path.generic_string(), lua_scripts);
ec = snapshots[0]->Start(SaveMode::RDB, path.string(), lua_scripts);

if (!ec) {
auto cb = [&](Transaction* t, EngineShard* shard) {
Expand Down Expand Up @@ -1425,7 +1542,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(kSyntaxErr);
}

if (args.size() == 1) {
if (args.size() >= 1) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
if (sub_cmd == "DF") {
Expand All @@ -1437,7 +1554,12 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
}
}

GenericError ec = DoSave(new_version, cntx->transaction);
string_view basename;
if (args.size() == 2) {
basename = ArgS(args, 1);
}

GenericError ec = DoSave(new_version, basename, cntx->transaction);
if (ec) {
(*cntx)->SendError(ec.Format());
} else {
Expand Down
9 changes: 8 additions & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ namespace util {
class AcceptServer;
class ListenerInterface;
class HttpListenerBase;

namespace cloud {
class AWS;
} // namespace cloud

} // namespace util

namespace dfly {
Expand Down Expand Up @@ -94,7 +99,8 @@ class ServerFamily {
void StatsMC(std::string_view section, facade::ConnectionContext* cntx);

// if new_version is true, saves DF specific, non redis compatible snapshot.
GenericError DoSave(bool new_version, Transaction* transaction);
// if basename is not empty it will override dbfilename flag.
GenericError DoSave(bool new_version, std::string_view basename, Transaction* transaction);

// Calls DoSave with a default generated transaction and with the format
// specified in --df_snapshot_format
Expand Down Expand Up @@ -203,6 +209,7 @@ class ServerFamily {

Done schedule_done_;
std::unique_ptr<FiberQueueThreadPool> fq_threadpool_;
std::unique_ptr<util::cloud::AWS> aws_;
};

} // namespace dfly

0 comments on commit d8b6b05

Please sign in to comment.