Skip to content

Commit

Permalink
fix: S3 writes are now working with new format as well
Browse files Browse the repository at this point in the history
1. Refactor snapshot saving logic, specifically, build the full
path once for both versions.
2. Pull the newest version of helio, where many s3 bugs are fixed.
3. Include some cleanups because helio now does not contain Boost.Fibers implementation anymore.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange committed May 22, 2023
1 parent bf6abf3 commit 55e55ba
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 90 deletions.
31 changes: 0 additions & 31 deletions src/core/fibers.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

// An import header that centralizes all the imports from helio project regarding fibers

#ifdef USE_FB2

#include "util/fibers/fiber2.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/future.h"
Expand All @@ -33,32 +31,3 @@ using util::fb2::SimpleChannel;
namespace util {
using fb2::SharedMutex;
}

#else

#include "util/fiber_sched_algo.h"
#include "util/fibers/event_count.h"
#include "util/fibers/fiber.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/fibers_ext.h"
#include "util/fibers/simple_channel.h"

namespace dfly {

using util::fibers_ext::Barrier;
using util::fibers_ext::BlockingCounter;
using util::fibers_ext::Done;
using util::fibers_ext::EventCount;
using util::fibers_ext::Fiber;
using util::fibers_ext::FiberQueue;
using util::fibers_ext::FiberQueueThreadPool;
using util::fibers_ext::Future;
using util::fibers_ext::Launch;
using util::fibers_ext::Mutex;
using util::fibers_ext::Promise;
using util::fibers_ext::SimpleChannel;
using CondVar = ::boost::fibers::condition_variable;

} // namespace dfly

#endif
16 changes: 0 additions & 16 deletions src/core/uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

#pragma once

#ifdef USE_FB2

#include "util/fibers/uring_proactor.h"
#include "util/uring/uring_file.h"
namespace dfly {
Expand All @@ -16,17 +14,3 @@ using util::fb2::OpenLinux;
using util::fb2::OpenRead;

} // namespace dfly

#else
#include "util/uring/proactor.h"
#include "util/uring/uring_file.h"

namespace dfly {

using util::uring::FiberCall;
using util::uring::LinuxFile;
using util::uring::OpenLinux;
using util::uring::OpenRead;

} // namespace dfly
#endif
6 changes: 3 additions & 3 deletions src/facade/dragonfly_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ class Listener : public util::ListenerInterface {
std::error_code ConfigureServerSocket(int fd) final;

private:
util::Connection* NewConnection(util::ProactorBase* proactor) final;
util::ProactorBase* PickConnectionProactor(util::LinuxSocketBase* sock) final;
util::Connection* NewConnection(ProactorBase* proactor) final;
ProactorBase* PickConnectionProactor(util::LinuxSocketBase* sock) final;

void OnConnectionStart(util::Connection* conn) final;
void OnConnectionClose(util::Connection* conn) final;
void PreAcceptLoop(util::ProactorBase* pb) final;
void PreAcceptLoop(ProactorBase* pb) final;

void PreShutdown() final;

Expand Down
6 changes: 0 additions & 6 deletions src/server/io_mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@ using namespace std;
using namespace util;
using namespace facade;

#ifdef USE_FB2
using Proactor = fb2::UringProactor;
using fb2::ProactorBase;
using fb2::SubmitEntry;

#else
using uring::Proactor;
using uring::SubmitEntry;
#endif

namespace {

constexpr inline size_t alignup(size_t num, size_t align) {
Expand Down
59 changes: 25 additions & 34 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -977,25 +977,19 @@ static void RunStage(bool new_version, std::function<void(unsigned)> cb) {
}
};

using PartialSaveOpts = tuple<const fs::path& /*filename*/, const fs::path& /*path*/>;

// Start saving a single snapshot of a multi-file dfly snapshot.
// If shard is null, then this is the summary file.
GenericError DoPartialSave(PartialSaveOpts opts, const dfly::StringVec& scripts,
GenericError DoPartialSave(fs::path full_filename, const dfly::StringVec& scripts,
RdbSnapshot* snapshot, EngineShard* shard) {
auto [filename, path] = opts;
// Construct resulting filename.
fs::path full_filename = filename;
if (shard == nullptr) {
ExtendDfsFilename("summary", &full_filename);
} else {
ExtendDfsFilenameWithShard(shard->shard_id(), &full_filename);
}
fs::path full_path = path / full_filename; // use / operator to concatenate paths.

// Start rdb saving.
SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
GenericError local_ec = snapshot->Start(mode, full_path.string(), scripts);
GenericError local_ec = snapshot->Start(mode, full_filename.string(), scripts);

if (!local_ec && mode == SaveMode::SINGLE_SHARD) {
snapshot->StartInShard(shard);
Expand Down Expand Up @@ -1047,7 +1041,7 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
return ec;
}
SubstituteFilenameTsPlaceholder(&filename, FormatTs(start));
fs::path path = dir_path;
fs::path fpath = dir_path;

shared_ptr<LastSaveInfo> save_info;

Expand Down Expand Up @@ -1083,10 +1077,21 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
return script_bodies;
};

fpath /= filename;

if (IsCloudPath(fpath.string())) {
if (!aws_) {
aws_ = make_unique<cloud::AWS>("s3");
if (auto ec = aws_->Init(); ec) {
LOG(ERROR) << "Failed to initialize AWS " << ec;
aws_.reset();
return GenericError(ec, "Couldn't initialize AWS");
}
}
}

// Start snapshots.
if (new_version) {
auto file_opts = make_tuple(cref(filename), cref(path));

// In the new version (.dfs) we store a file for every shard and one more summary file.
// Summary file is always last in snapshots array.
snapshots.resize(shard_set->size() + 1);
Expand All @@ -1098,7 +1103,7 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));

snapshot->SetAWS(aws_.get());
if (auto local_ec = DoPartialSave(file_opts, scripts, snapshot.get(), nullptr); local_ec) {
if (auto local_ec = DoPartialSave(fpath, scripts, snapshot.get(), nullptr); local_ec) {
ec = local_ec;
snapshot.reset();
}
Expand All @@ -1109,7 +1114,7 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
auto& snapshot = snapshots[shard->shard_id()];
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
snapshot->SetAWS(aws_.get());
if (auto local_ec = DoPartialSave(file_opts, {}, snapshot.get(), shard); local_ec) {
if (auto local_ec = DoPartialSave(fpath, {}, snapshot.get(), shard); local_ec) {
ec = local_ec;
snapshot.reset();
}
Expand All @@ -1120,28 +1125,15 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
} else {
snapshots.resize(1);

if (!filename.has_extension()) {
filename += ".rdb";
if (!fpath.has_extension()) {
fpath += ".rdb";
}
path /= filename; // use / operator to concatenate paths.

snapshots[0].reset(new RdbSnapshot(fq_threadpool_.get()));
auto lua_scripts = get_scripts();
string path_str = path.string();

if (IsCloudPath(path_str)) {
if (!aws_) {
aws_ = make_unique<cloud::AWS>("s3");
if (auto ec = aws_->Init(); ec) {
LOG(ERROR) << "Failed to initialize AWS " << ec;
aws_.reset();
return GenericError(ec, "Couldn't initialize AWS");
}
}
snapshots[0]->SetAWS(aws_.get());
}

ec = snapshots[0]->Start(SaveMode::RDB, path.string(), lua_scripts);
snapshots[0]->SetAWS(aws_.get());
ec = snapshots[0]->Start(SaveMode::RDB, fpath.string(), lua_scripts);

if (!ec) {
auto cb = [&](Transaction* t, EngineShard* shard) {
Expand All @@ -1166,16 +1158,15 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
RunStage(new_version, close_cb);

if (new_version) {
ExtendDfsFilename("summary", &filename);
path /= filename;
ExtendDfsFilename("summary", &fpath);
}

absl::Duration dur = absl::Now() - start;
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000;

// Populate LastSaveInfo.
if (!ec) {
LOG(INFO) << "Saving " << path << " finished after "
LOG(INFO) << "Saving " << fpath << " finished after "
<< strings::HumanReadableElapsedTime(seconds);

save_info = make_shared<LastSaveInfo>();
Expand All @@ -1184,7 +1175,7 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
}

save_info->save_time = absl::ToUnixSeconds(start);
save_info->file_name = path.generic_string();
save_info->file_name = fpath.generic_string();
save_info->duration_sec = uint32_t(seconds);

lock_guard lk(save_mu_);
Expand Down

0 comments on commit 55e55ba

Please sign in to comment.