Skip to content

Commit

Permalink
feat: WIP search index persistence
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg committed Aug 22, 2023
1 parent afb3928 commit 9d23a5b
Show file tree
Hide file tree
Showing 16 changed files with 273 additions and 77 deletions.
2 changes: 1 addition & 1 deletion src/facade/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
add_library(dfly_facade dragonfly_listener.cc dragonfly_connection.cc facade.cc
memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc
reply_capture.cc)
reply_capture.cc resp_expr.cc)

if (DF_USE_SSL)
set(TLS_LIB tls_lib)
Expand Down
11 changes: 1 addition & 10 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ void Connection::DispatchCommand(uint32_t consumed, mi_heap_t* heap) {
if (can_dispatch_sync) {
ShrinkPipelinePool(); // Gradually release pipeline request pool.

RespToArgList(tmp_parse_args_, &tmp_cmd_vec_);
RespExpr::VecToArgList(tmp_parse_args_, &tmp_cmd_vec_);

{
cc_->sync_dispatch = true;
Expand Down Expand Up @@ -900,15 +900,6 @@ bool Connection::IsCurrentlyDispatching() const {
return cc_->async_dispatch || cc_->sync_dispatch;
}

void RespToArgList(const RespVec& src, CmdArgVec* dest) {
dest->resize(src.size());
for (size_t i = 0; i < src.size(); ++i) {
DCHECK(src[i].type == RespExpr::STRING);

(*dest)[i] = ToMSS(src[i].GetBuf());
}
}

void Connection::SendPubMessageAsync(PubMessage msg) {
void* ptr = mi_malloc(sizeof(PubMessage));
SendAsync({PubMessagePtr{new (ptr) PubMessage{move(msg)}, MessageDeleter{}}});
Expand Down
2 changes: 0 additions & 2 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,4 @@ class Connection : public util::Connection {
static thread_local std::vector<PipelineMessagePtr> pipeline_req_pool_;
};

void RespToArgList(const RespVec& src, CmdArgVec* dest);

} // namespace facade
20 changes: 20 additions & 0 deletions src/facade/resp_expr.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#include "facade/resp_expr.h"

#include "base/logging.h"

namespace facade {

void RespExpr::VecToArgList(const Vec& src, CmdArgVec* dest) {
dest->resize(src.size());
for (size_t i = 0; i < src.size(); ++i) {
DCHECK(src[i].type == RespExpr::STRING);

(*dest)[i] = ToMSS(src[i].GetBuf());
}
}

} // namespace facade
8 changes: 6 additions & 2 deletions src/facade/resp_expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
#include <variant>
#include <vector>

#include "facade/facade_types.h"

namespace facade {

class RespExpr {
Expand Down Expand Up @@ -52,13 +54,15 @@ class RespExpr {
}

static const char* TypeName(Type t);

static void VecToArgList(const Vec& src, CmdArgVec* dest);
};

using RespVec = RespExpr::Vec;
using RespSpan = absl::Span<const RespExpr>;

inline std::string_view ToSV(const absl::Span<uint8_t>& s) {
return std::string_view{reinterpret_cast<char*>(s.data()), s.size()};
inline std::string_view ToSV(RespExpr::Buffer buf) {
return std::string_view{reinterpret_cast<char*>(buf.data()), buf.size()};
}

} // namespace facade
Expand Down
2 changes: 1 addition & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
string& body = data.orig_body.empty() ? data.body : data.orig_body;
script_bodies.push_back(move(body));
}
ec = saver->SaveHeader(script_bodies);
ec = saver->SaveHeader({script_bodies, {}});
} else {
ec = saver->SaveHeader({});
}
Expand Down
60 changes: 50 additions & 10 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ extern "C" {
#include <absl/cleanup/cleanup.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_split.h>
#include <lz4frame.h>
#include <zstd.h>

Expand Down Expand Up @@ -2188,16 +2189,7 @@ error_code RdbLoader::HandleAux() {
} else if (auxkey == "repl-offset") {
// TODO
} else if (auxkey == "lua") {
ServerState* ss = ServerState::tlocal();
auto interpreter = ss->BorrowInterpreter();
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };

string_view body{auxval};
if (script_mgr_) {
auto res = script_mgr_->Insert(body, interpreter);
if (!res)
LOG(ERROR) << "Error compiling script";
}
LoadScriptFromAux(move(auxval));
} else if (auxkey == "redis-ver") {
VLOG(1) << "Loading RDB produced by version " << auxval;
} else if (auxkey == "ctime") {
Expand All @@ -2220,6 +2212,8 @@ error_code RdbLoader::HandleAux() {
}
} else if (auxkey == "redis-bits") {
/* Just ignored. */
} else if (auxkey == "search-index") {
LoadSearchIndexDefFromAux(move(auxval));
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
Expand Down Expand Up @@ -2339,4 +2333,50 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
return kOk;
}

void RdbLoader::LoadScriptFromAux(string&& body) {
ServerState* ss = ServerState::tlocal();
auto interpreter = ss->BorrowInterpreter();
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };

if (script_mgr_) {
auto res = script_mgr_->Insert(body, interpreter);
if (!res)
LOG(ERROR) << "Error compiling script";
}
}

void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
facade::CapturingReplyBuilder crb{};
ConnectionContext cntx{nullptr, nullptr, &crb};
cntx.journal_emulated = true;

absl::Cleanup cntx_clean = [&cntx] { cntx.Inject(nullptr); };

uint32_t consumed = 0;
facade::RespVec resp_vec;
facade::RedisParser parser;

def += "\r\n"; // RESP terminator
absl::Span<uint8_t> buffer{reinterpret_cast<uint8_t*>(def.data()), def.size()};
auto res = parser.Parse(buffer, &consumed, &resp_vec);

if (res != facade::RedisParser::Result::OK) {
LOG(ERROR) << "Bad index definition: " << def;
return;
}

CmdArgVec arg_vec;
facade::RespExpr::VecToArgList(resp_vec, &arg_vec);

string ft_create = "FT.CREATE";
arg_vec.insert(arg_vec.begin(), MutableSlice{ft_create.data(), ft_create.size()});

service_->DispatchCommand(absl::MakeSpan(arg_vec), &cntx);

auto response = crb.Take();
if (auto err = facade::CapturingReplyBuilder::GetError(response); err) {
LOG(ERROR) << "Bad index definition: " << def << " " << err->first;
}
}

} // namespace dfly
5 changes: 5 additions & 0 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ class RdbLoader : protected RdbLoaderBase {
void FlushShardAsync(ShardId sid);
void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);

void LoadScriptFromAux(std::string&& value);
void LoadSearchIndexDefFromAux(std::string&& value);

private:
Service* service_;
ScriptMgr* script_mgr_;
std::unique_ptr<ItemsBuf[]> shard_buf_;
Expand All @@ -245,6 +249,7 @@ class RdbLoader : protected RdbLoaderBase {

// Callback when receiving RDB_OPCODE_FULLSYNC_END
std::function<void()> full_sync_cut_cb;

detail::MPSCIntrusiveQueue<Item> item_queue_;
};

Expand Down
18 changes: 10 additions & 8 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1149,15 +1149,15 @@ void RdbSaver::StopSnapshotInShard(EngineShard* shard) {
impl_->StopSnapshotting(shard);
}

error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
error_code RdbSaver::SaveHeader(const GlobalData& glob_state) {
char magic[16];
// We should use RDB_VERSION here from rdb.h when we ditch redis 6 support
// For now we serialize to an older version.
size_t sz = absl::SNPrintF(magic, sizeof(magic), "REDIS%04d", RDB_SER_VERSION);
CHECK_EQ(9u, sz);

RETURN_ON_ERR(impl_->serializer()->WriteRaw(Bytes{reinterpret_cast<uint8_t*>(magic), sz}));
RETURN_ON_ERR(SaveAux(lua_scripts));
RETURN_ON_ERR(SaveAux(move(glob_state)));

return error_code{};
}
Expand Down Expand Up @@ -1186,7 +1186,7 @@ error_code RdbSaver::SaveBody(const Cancellation* cll, RdbTypeFreqMap* freq_map)
return error_code{};
}

error_code RdbSaver::SaveAux(const StringVec& lua_scripts) {
error_code RdbSaver::SaveAux(const GlobalData& glob_state) {
static_assert(sizeof(void*) == 8, "");

int aof_preamble = false;
Expand All @@ -1197,16 +1197,18 @@ error_code RdbSaver::SaveAux(const StringVec& lua_scripts) {
RETURN_ON_ERR(SaveAuxFieldStrInt("redis-bits", 64));

RETURN_ON_ERR(SaveAuxFieldStrInt("ctime", time(NULL)));

RETURN_ON_ERR(SaveAuxFieldStrInt("used-mem", used_mem_current.load(memory_order_relaxed)));

RETURN_ON_ERR(SaveAuxFieldStrInt("aof-preamble", aof_preamble));

// Save lua scripts only in rdb or summary file
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || lua_scripts.empty());
for (const string& s : lua_scripts) {
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.lua_scripts.empty());
for (const string& s : glob_state.lua_scripts)
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("lua", s));
}

// Save search index definitions only in summary dfs file.
DCHECK(save_mode_ == SaveMode::SUMMARY || glob_state.search_indices.empty());
for (const string& s : glob_state.search_indices)
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-index", s));

// TODO: "repl-stream-db", "repl-id", "repl-offset"
return error_code{};
Expand Down
12 changes: 9 additions & 3 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ enum class CompressionMode { NONE, SINGLE_ENTRY, MULTY_ENTRY_ZSTD, MULTY_ENTRY_L

class RdbSaver {
public:
// Global data which doesn't belong to shards and is serialized in header
struct GlobalData {
const StringVec lua_scripts; // bodies of lua scripts
const StringVec search_indices; // ft.create commands to re-create search indices
};

// single_shard - true means that we run RdbSaver on a single shard and we do not use
// to snapshot all the datastore shards.
// single_shard - false, means we capture all the data using a single RdbSaver instance
Expand All @@ -85,8 +91,8 @@ class RdbSaver {
// Stops serialization in journal streaming mode in the shard's thread.
void StopSnapshotInShard(EngineShard* shard);

// Stores auxiliary (meta) values and lua scripts.
std::error_code SaveHeader(const StringVec& lua_scripts);
// Stores auxiliary (meta) values and header_info
std::error_code SaveHeader(const GlobalData& header_info);

// Writes the RDB file into sink. Waits for the serialization to finish.
// Fills freq_map with the histogram of rdb types.
Expand All @@ -104,7 +110,7 @@ class RdbSaver {

std::error_code SaveEpilog();

std::error_code SaveAux(const StringVec& lua_scripts);
std::error_code SaveAux(const GlobalData&);
std::error_code SaveAuxFieldStrInt(std::string_view key, int64_t val);

std::unique_ptr<Impl> impl_;
Expand Down
2 changes: 1 addition & 1 deletion src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ error_code Replica::ConsumeRedisStream() {
}
}

facade::RespToArgList(LastResponseArgs(), &args_vector);
facade::RespExpr::VecToArgList(LastResponseArgs(), &args_vector);
CmdArgList arg_list{args_vector.data(), args_vector.size()};
service_.DispatchCommand(arg_list, &conn_context);
}
Expand Down
44 changes: 42 additions & 2 deletions src/server/search/doc_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "server/search/doc_index.h"

#include <absl/strings/str_join.h>

#include <memory>

#include "base/logging.h"
Expand Down Expand Up @@ -46,6 +48,12 @@ void TraverseAllMatching(const DocIndex& index, const OpArgs& op_args, F&& f) {
} while (cursor);
}

const absl::flat_hash_map<string_view, search::SchemaField::FieldType> kSchemaTypes = {
{"TAG"sv, search::SchemaField::TAG},
{"TEXT"sv, search::SchemaField::TEXT},
{"NUMERIC"sv, search::SchemaField::NUMERIC},
{"VECTOR"sv, search::SchemaField::VECTOR}};

} // namespace

search::FtVector BytesToFtVector(string_view value) {
Expand All @@ -61,6 +69,38 @@ search::FtVector BytesToFtVector(string_view value) {
return out;
}

optional<search::SchemaField::FieldType> ParseSearchFieldType(string_view name) {
auto it = kSchemaTypes.find(name);
return it != kSchemaTypes.end() ? make_optional(it->second) : nullopt;
}

string_view SearchFieldTypeToString(search::SchemaField::FieldType type) {
for (auto [it_name, it_type] : kSchemaTypes)
if (it_type == type)
return it_name;
ABSL_UNREACHABLE();
return "";
}

string DocIndexInfo::BuildRestoreCommand() const {
std::string out;

// ON HASH/JSON
absl::StrAppend(&out, "ON", " ", base_index.type == DocIndex::HASH ? "HASH" : "JSON");

// optional PREFIX 1 *prefix*
if (!base_index.prefix.empty())
absl::StrAppend(&out, " PREFIX", " 1 ", base_index.prefix);

absl::StrAppend(&out, " SCHEMA");
for (auto [fname, finfo] : base_index.schema.fields) {
absl::StrAppend(&out, " ", finfo.identifier, " AS ", fname, " ",
SearchFieldTypeToString(finfo.type));
}

return out;
}

ShardDocIndex::DocId ShardDocIndex::DocKeyIndex::Add(string_view key) {
DCHECK_EQ(ids_.count(key), 0u);

Expand Down Expand Up @@ -161,7 +201,7 @@ SearchResult ShardDocIndex::Search(const OpArgs& op_args, const SearchParams& pa
}

DocIndexInfo ShardDocIndex::GetInfo() const {
return {base_->schema, key_index_.Size()};
return {*base_, key_index_.Size()};
}

ShardDocIndex* ShardDocIndices::GetIndex(string_view name) {
Expand All @@ -188,7 +228,7 @@ bool ShardDocIndices::DropIndex(string_view name) {

// Clean caches that might have data from this index
auto info = it->second->GetInfo();
for (const auto& [_, field] : info.schema.fields)
for (const auto& [_, field] : info.base_index.schema.fields)
JsonAccessor::RemoveFieldFromCache(field.identifier);

indices_.erase(it);
Expand Down
Loading

0 comments on commit 9d23a5b

Please sign in to comment.