Skip to content

Commit

Permalink
feat: ignore MULTI/EXEC if the whole transaction is made of EVAL comm…
Browse files Browse the repository at this point in the history
…ands.

Together with `default_lua_config` solves #781.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange committed Apr 28, 2023
1 parent 688f8f5 commit f08fd71
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 23 deletions.
5 changes: 3 additions & 2 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,10 @@ struct Connection::DispatchOperations {
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
auto* next = storage.data();
for (size_t i = 0; i < args.size(); ++i) {
auto buf = args[i].GetBuf();
RespExpr::Buffer buf = args[i].GetBuf();
size_t s = buf.size();
memcpy(next, buf.data(), s);
if (s)
memcpy(next, buf.data(), s);
this->args[i] = MutableSlice(next, s);
next += s;
}
Expand Down
7 changes: 7 additions & 0 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ enum CommandOpt : uint32_t {

const char* OptName(CommandOpt fl);

constexpr inline bool IsEvalKind(std::string_view name) {
return name.compare(0, 4, "EVAL") == 0;
}

static_assert(IsEvalKind("EVAL") && IsEvalKind("EVALSHA"));
static_assert(!IsEvalKind(""));

}; // namespace CO

class CommandId {
Expand Down
3 changes: 2 additions & 1 deletion src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ StoredCmd::StoredCmd(string&& buffer, const CommandId* cid, CmdArgList args, fac
}

void StoredCmd::Fill(CmdArgList args) {
CHECK_GE(args.size(), sizes_.size());
DCHECK_GE(args.size(), sizes_.size());

unsigned offset = 0;
for (unsigned i = 0; i < sizes_.size(); i++) {
args[i] = MutableSlice{buffer_.data() + offset, sizes_[i]};
Expand Down
5 changes: 5 additions & 0 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ class StoredCmd {
// Between filling and invocation, cmd should NOT be moved.
void Fill(CmdArgList args);

void Fill(CmdArgVec* dest) {
dest->resize(sizes_.size());
Fill(absl::MakeSpan(*dest));
}

const CommandId* Cid() const;

facade::ReplyMode ReplyMode() const;
Expand Down
72 changes: 56 additions & 16 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,29 @@ void TxTable(const http::QueryArgs& args, HttpContext* send) {
send->Invoke(std::move(resp));
}

enum class ExecEvalState {
NONE = 0,
ALL = 1,
SOME = 2,
};

ExecEvalState DetermineEvalPresense(const std::vector<StoredCmd>& body) {
unsigned eval_cnt = 0;
for (const auto& scmd : body) {
if (CO::IsEvalKind(scmd.Cid()->name())) {
eval_cnt++;
}
}

if (eval_cnt == 0)
return ExecEvalState::NONE;

if (eval_cnt == body.size())
return ExecEvalState::ALL;

return ExecEvalState::SOME;
}

} // namespace

Service::Service(ProactorPool* pp) : pp_(*pp), server_family_(this) {
Expand Down Expand Up @@ -582,7 +605,9 @@ OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const C

const auto& key_index = *key_index_res;
for (unsigned i = key_index.start; i < key_index.end; ++i) {
if (!eval_info.keys.contains(ArgS(args, i))) {
string_view key = ArgS(args, i);
if (!eval_info.keys.contains(key)) {
VLOG(1) << "Key " << key << " is not declared for command " << cid->name();
return OpStatus::KEY_NOTFOUND;
}
}
Expand Down Expand Up @@ -727,15 +752,6 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
etl.connection_stats.cmd_count_map[cid->name()]++;

if (dfly_cntx->conn_state.exec_info.IsActive() && !is_trans_cmd) {
auto cmd_name = ArgS(args, 0);
if (cmd_name == "EVAL" || cmd_name == "EVALSHA") {
auto error =
absl::StrCat("'", cmd_name,
"' Dragonfly does not allow execution of a server-side Lua script inside "
"transaction block");
SetMultiExecErrorFlag(dfly_cntx);
return (*cntx)->SendError(error);
}
// TODO: protect against aggregating huge transactions.
StoredCmd stored_cmd{cid, args.subspan(1)};
dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd));
Expand Down Expand Up @@ -1325,8 +1341,7 @@ template <typename F> void IterateAllKeys(ConnectionState::ExecInfo* exec_info,
if (!scmd.Cid()->IsTransactional())
continue;

arg_vec.resize(scmd.NumArgs());
scmd.Fill(absl::MakeSpan(arg_vec));
scmd.Fill(&arg_vec);

auto key_res = DetermineKeys(scmd.Cid(), absl::MakeSpan(arg_vec));
if (!key_res.ok())
Expand Down Expand Up @@ -1412,7 +1427,31 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
return rb->SendNull();
}

CmdArgVec tmp_keys;
ExecEvalState state = DetermineEvalPresense(exec_info.body);
if (state == ExecEvalState::SOME) {
auto error =
"Dragonfly does not allow execution of a server-side Lua script inside "
"MULTI/EXEC block";

return rb->SendError(error);
}

CmdArgVec arg_vec, tmp_keys;

// We ignore transaction mode in case it's filled only with EVAL-like commands.
// This is done to support OptimalBits/bull js framework
// that for some reason uses MULTI to send multiple jobs via EVAL(SHA) commands,
// instead of using pipeline mode.
// TODO: to check with BullMQ developers if this is a bug or a feature.
if (state == ExecEvalState::ALL) {
rb->StartArray(exec_info.body.size());
for (auto& scmd : exec_info.body) {
scmd.Fill(&arg_vec);
DispatchCommand(absl::MakeSpan(arg_vec), cntx);
}
return;
}

bool scheduled = StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, &tmp_keys);

// EXEC should not run if any of the watched keys expired.
Expand All @@ -1428,15 +1467,16 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (absl::GetFlag(FLAGS_multi_exec_squash)) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx);
} else {
CmdArgVec arg_vec{};

for (auto& scmd : exec_info.body) {
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();

cntx->transaction->MultiSwitchCmd(scmd.Cid());
cntx->cid = scmd.Cid();

arg_vec.resize(scmd.NumArgs());
scmd.Fill(&arg_vec);

CmdArgList args = absl::MakeSpan(arg_vec);
scmd.Fill(args);

if (scmd.Cid()->IsTransactional()) {
OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, args);
Expand Down
6 changes: 2 additions & 4 deletions src/server/multi_command_squasher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
(cmd->Cid()->opt_mask() & CO::GLOBAL_TRANS))
return SquashResult::NOT_SQUASHED;

tmp_keylist_.resize(cmd->NumArgs());
cmd->Fill(&tmp_keylist_);
auto args = absl::MakeSpan(tmp_keylist_);
cmd->Fill(args);

auto keys = DetermineKeys(cmd->Cid(), args);
if (!keys.ok())
Expand Down Expand Up @@ -94,9 +93,8 @@ void MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) {
tx->MultiSwitchCmd(cmd->Cid());
cntx_->cid = cmd->Cid();

tmp_keylist_.resize(cmd->NumArgs());
cmd->Fill(&tmp_keylist_);
auto args = absl::MakeSpan(tmp_keylist_);
cmd->Fill(args);

if (cmd->Cid()->IsTransactional())
tx->InitByArgs(cntx_->conn_state.db_index, args);
Expand Down

0 comments on commit f08fd71

Please sign in to comment.