Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): support client side tracking (resp3 protocol only) #2233

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
91fed71
fix(connection): Add ConnectionRef to replace pubsub wait token
dranikpg Nov 27, 2023
0300a15
fix: add check to migrate
dranikpg Nov 27, 2023
21ac976
fix: rename to weakref
dranikpg Nov 27, 2023
814f73d
fix: add back migration backpressure update
dranikpg Nov 27, 2023
9300710
fix: use monostate
dranikpg Nov 27, 2023
402fb31
fix: rebase on new helio GetPoolIndex
dranikpg Nov 27, 2023
e2eb0b1
fix: fix internal compiler error??
dranikpg Nov 28, 2023
e35dbac
fix: fix test framework
dranikpg Nov 28, 2023
2cdb8a9
fix: add comment about EnsureMemoryBudget
dranikpg Nov 28, 2023
fbd1748
support client side tracking using resp3
theyueli Nov 29, 2023
31fd097
support verbatim string for resp3 and let INFO command uses it.
theyueli Nov 30, 2023
0cbdcfe
tidy
theyueli Nov 30, 2023
9992a85
clean
theyueli Nov 30, 2023
203f9ed
Refurbish transaction before its reuse
theyueli Nov 30, 2023
15e62b3
if hrandfield returns one element, just send bulkstring
theyueli Nov 30, 2023
8e0fc9a
only track keys of its own shard.
theyueli Dec 1, 2023
7f44f1d
review comments
theyueli Dec 1, 2023
32bd6b3
rebase against vlad's fork (safe-ptr branch)
theyueli Dec 1, 2023
062f227
Merge branch 'main' into relay
theyueli Dec 1, 2023
ed6c75e
fix typo
theyueli Dec 1, 2023
002bc58
use weakref and fix hrandfield output
theyueli Dec 3, 2023
a17eed1
allow client tracking only when resp3 is used.
theyueli Dec 3, 2023
bfdd399
fix QUIT's property, makes WeakRef remember client id.
theyueli Dec 5, 2023
3991c92
fix reply format of hrandfield when count is negative
theyueli Dec 5, 2023
508bf21
fix reply format of hrandfield when count and withvalues are used.
theyueli Dec 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 71 additions & 4 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ struct Connection::DispatchOperations {
void operator()(const AclUpdateMessage& msg);
void operator()(const MigrationRequestMessage& msg);
void operator()(CheckpointMessage msg);
void operator()(const InvalidationMessage& msg);

template <typename T, typename D> void operator()(unique_ptr<T, D>& ptr) {
operator()(*ptr.get());
Expand Down Expand Up @@ -215,6 +216,9 @@ size_t Connection::MessageHandle::UsedMemory() const {
size_t operator()(const CheckpointMessage& msg) {
return 0; // no access to internal type, memory usage negligible
}
size_t operator()(const InvalidationMessage& msg) {
return 0;
}
};

return sizeof(MessageHandle) + visit(MessageSize{}, this->handle);
Expand Down Expand Up @@ -282,6 +286,19 @@ void Connection::DispatchOperations::operator()(CheckpointMessage msg) {
msg.bc.Dec();
}

void Connection::DispatchOperations::operator()(const InvalidationMessage& msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
rbuilder->SetResp3(true);
rbuilder->StartCollection(2, facade::RedisReplyBuilder::CollectionType::PUSH);
rbuilder->SendBulkString("invalidate");
if (msg.invalidate_due_to_flush) {
rbuilder->SendNull();
} else {
std::vector<string_view> keys{msg.key};
rbuilder->SendStringArr(keys);
}
}

Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service)
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service), name_{} {
Expand Down Expand Up @@ -313,6 +330,10 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,

migration_enabled_ = absl::GetFlag(FLAGS_migrate_connections);

// Create shared_ptr with empty value and associate it with `this` pointer (aliasing constructor).
// We use it for reference counting and accessing `this` (without managing it).
self_ = {std::make_shared<std::monostate>(std::monostate{}), this};

#ifdef DFLY_USE_SSL
// Increment reference counter so Listener won't free the context while we're
// still using it.
Expand Down Expand Up @@ -1168,11 +1189,19 @@ void Connection::Migrate(util::fb2::ProactorBase* dest) {
// connections
CHECK(!cc_->async_dispatch);
CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches
CHECK_EQ(self_.use_count(), 1u); // references cache our thread and backpressure
CHECK(!dispatch_fb_.IsJoinable()); // can't move once it started

listener()->Migrate(this, dest);
}

Connection::WeakRef Connection::Borrow() {
DCHECK(self_);
DCHECK_GT(cc_->subscriptions, 0);

return WeakRef(self_, queue_backpressure_, socket_->proactor()->GetPoolIndex());
}

void Connection::ShutdownThreadLocal() {
pipeline_req_pool_.clear();
}
Expand All @@ -1189,6 +1218,10 @@ void Connection::SendPubMessageAsync(PubMessage msg) {
SendAsync({PubMessagePtr{new (ptr) PubMessage{move(msg)}, MessageDeleter{}}});
}

void Connection::SendInvalidationMessageAsync(InvalidationMessage msg) {
SendAsync({std::move(msg)});
}

void Connection::SendMonitorMessageAsync(string msg) {
SendAsync({MonitorMessage{move(msg)}});
}
Expand Down Expand Up @@ -1277,10 +1310,6 @@ void Connection::RecycleMessage(MessageHandle msg) {
}
}

void Connection::EnsureAsyncMemoryBudget() {
queue_backpressure_->EnsureBelowLimit();
}

std::string Connection::LocalBindStr() const {
if (socket_->IsUDS())
return "unix-domain-socket";
Expand Down Expand Up @@ -1327,6 +1356,15 @@ void Connection::RequestAsyncMigration(util::fb2::ProactorBase* dest) {
migration_request_ = dest;
}

void Connection::EnableTracking() {
tracking_enabled_ = true;
cc_->subscriptions++;
}

void Connection::DisableTracking() {
tracking_enabled_ = false;
}

Connection::MemoryUsage Connection::GetMemoryUsage() const {
size_t mem = sizeof(*this) + dfly::HeapSize(dispatch_q_) + dfly::HeapSize(name_) +
dfly::HeapSize(tmp_parse_args_) + dfly::HeapSize(tmp_cmd_vec_) +
Expand All @@ -1344,6 +1382,35 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const {
};
}

Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{ptr->GetClientId()} {
}

unsigned Connection::WeakRef::Thread() const {
return thread_;
}

Connection* Connection::WeakRef::Get() const {
// DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dranikpg i had to comment this check... otherwise i'm running into the following segfault:

F20231203 03:55:48.743674 2493204 dragonfly_connection.cc:1395] Check failed: ProactorBase::me()->GetPoolIndex() == int(thread_) (8 vs. 0) 
*** Check failure stack trace: ***                                                                               
    @     0x559a589b7221  google::LogMessage::Fail()                                                             
    @     0x559a589b7167  google::LogMessage::SendToLog()                                                        
    @     0x559a589b693c  google::LogMessage::Flush()                                                            
    @     0x559a589ba7b4  google::LogMessageFatal::~LogMessageFatal()                                            
    @     0x559a5822077f  facade::Connection::WeakRef::Get()                                                     
    @     0x559a57d4f0d7  dfly::DbSlice::TrackKeys()                                                             
    @     0x559a571398c8  dfly::OpTrackKeys()                                             
    @     0x559a57139c44  _ZZN4dfly7Service15DispatchCommandEN4absl12lts_202308024SpanINS3_IcEEEEPN6facade17ConnectionContextEENKUlPNS_11TransactionEPNS_11EngineShardEE_clESA_SC_
    @     0x559a57165021  _ZZN4dfly11Transaction18ScheduleSingleHopTIRZNS_7Service15DispatchCommandEN4absl12lts_202308024SpanINS5_IcEEEEPN6facade17ConnectionContextEEUlPS0_PNS_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_ENKUlSB_SD_E_clESB_SD_
    @     0x559a571951c6  _ZSt13__invoke_implIN6facade8OpStatusERKZN4dfly11Transaction18ScheduleSingleHopTIRZNS2_7Service15DispatchCommandEN4absl12lts_202308024SpanINS8_IcEEEEPNS0_17ConnectionContextEEUlPS3_PNS2_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSD_SF_E_JSD_SF_EESJ_St14__invoke_otherOT0_DpOT1_
    @     0x559a57188730  _ZSt8__invokeIRKZN4dfly11Transaction18ScheduleSingleHopTIRZNS0_7Service15DispatchCommandEN4absl12lts_202308024SpanINS6_IcEEEEPN6facade17ConnectionContextEEUlPS1_PNS0_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSC_SE_E_JSC_SE_EENSt15__invoke_resultISI_JDpT0_EE4typeESJ_DpOSO_
    @     0x559a57180324  _ZSt6invokeIRKZN4dfly11Transaction18ScheduleSingleHopTIRZNS0_7Service15DispatchCommandEN4absl12lts_202308024SpanINS6_IcEEEEPN6facade17ConnectionContextEEUlPS1_PNS0_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSC_SE_E_JSC_SE_EENSt13invoke_resultISI_JDpT0_EE4typeESJ_DpOSO_
    @     0x559a57177ce5  _ZN4absl12lts_2023080219functional_internal12InvokeObjectIZN4dfly11Transaction18ScheduleSingleHopTIRZNS3_7Service15DispatchCommandENS0_4SpanINS7_IcEEEEPN6facade17ConnectionContextEEUlPS4_PNS3_11EngineShardEE_EEDTclfp_fpTLDnEEEOT_EUlSD_SF_E_NSA_8OpStatusEJSD_SF_EEET0_NS1_7VoidPtrEDpNS1_8ForwardTIT1_E4typeE
    @     0x559a57e9f08b  absl::lts_20230802::FunctionRef<>::operator()()                                                                                                            
    @     0x559a57e71dc3  dfly::Transaction::RunQuickie()                                                                                                                            
    @     0x559a57e744c4  dfly::Transaction::ScheduleUniqueShard()                                                                                                                   
    @     0x559a57e68feb  _ZZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS0_PNS_11EngineShardEEEEENKUlvE_clEv
    @     0x559a57e902e5  _ZSt13__invoke_implIvRZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS1_PNS0_11EngineShardEEEEEUlvE_JEET_St14__invoke_otherOT0_DpOT1_
    @     0x559a57e8e347  _ZSt10__invoke_rIvRZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS1_PNS0_11EngineShardEEEEEUlvE_JEENSt9enable_ifIX16is_invocable_r_vIT_T0_DpT1_EESF_E4typeEOSG_DpOSH_
    @     0x559a57e8c7fd  _ZNSt17_Function_handlerIFvvEZN4dfly11Transaction17ScheduleSingleHopEN4absl12lts_2023080211FunctionRefIFN6facade8OpStatusEPS2_PNS1_11EngineShardEEEEEUlvE_E9_M_invokeERKSt9_Any_data
    @     0x559a5728e48f  std::function<>::operator()()                                   
    @     0x559a588f556d  util::fb2::FiberQueue::Run()                                    
    @     0x559a578e710e  _ZZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sENKUlvE0_clEv                                                                                
    @     0x559a57901b8a  _ZSt13__invoke_implIvZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_JEET_St14__invoke_otherOT0_DpOT1_
    @     0x559a578fee41  _ZSt8__invokeIZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_JEENSt15__invoke_resultIT_JDpT0_EE4typeEOSA_DpOSB_
    @     0x559a578fc35d  _ZSt12__apply_implIZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_St5tupleIJEEJEEDcOT_OT0_St16integer_sequenceImJXspT1_EEE
    @     0x559a578fc3da  _ZSt5applyIZN4dfly11EngineShardC4EPN4util3fb212ProactorBaseEP9mi_heap_sEUlvE0_St5tupleIJEEEDcOT_OT0_
    @     0x559a578fc5ea  _ZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS0_12ProactorBaseEP9mi_heap_sEUlvE0_JEE4run_EON5boost7context5fiberE
    @     0x559a578f97d3  _ZZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS0_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4IN5boost7context21basic_fixedsize_stackINSD_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNSD_12preallocatedEOT_OS9_ENKUlONSD_5fiberEE_clESS_
    @     0x559a579091c2  _ZSt13__invoke_implIN5boost7context5fiberERZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS4_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4INS1_21basic_fixedsize_stackINS1_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNS1_12preallocatedEOT_OSD_EUlOS2_E_JS2_EESQ_St14__invoke_otherOT0_DpOT1_
    @     0x559a57907685  _ZSt8__invokeIRZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS1_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4IN5boost7context21basic_fixedsize_stackINSE_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNSE_12preallocatedEOT_OSA_EUlONSE_5fiberEE_JSS_EENSt15__invoke_resultISP_JDpT0_EE4typeESQ_DpOSX_
    @     0x559a579054e2  _ZSt6invokeIRZN4util3fb26detail15WorkerFiberImplIZN4dfly11EngineShardC4EPNS1_12ProactorBaseEP9mi_heap_sEUlvE0_JEEC4IN5boost7context21basic_fixedsize_stackINSE_12stack_traitsEEEEESt17basic_string_viewIcSt11char_traitsIcEERKNSE_12preallocatedEOT_OSA_EUlONSE_5fiberEE_JSS_EENSt13invoke_resultISP_JDpT0_EE4typeESQ_DpOSX_
*** SIGABRT received at time=1701604548 on cpu 8 ***                                      
PC: @     0x7f72c3dc29fc  (unknown)  pthread_kill                                         
    @     0x559a58a3c3fb         64  absl::lts_20230802::WriteFailureInfo()                                                                                                          
    @     0x559a58a3c655         96  absl::lts_20230802::AbslFailureSignalHandler()                                                                                                  
    @     0x7f72c3d6e520  (unknown)  (unknown)                                            

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it was there for a reason. Will update the interface 🙂

return ptr_.lock().get();
}

uint32_t Connection::WeakRef::GetClientId() const {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dranikpg added this feature to WeakRef as well.

return client_id_;
}

bool Connection::WeakRef::EnsureMemoryBudget() const {
// Simple optimization: If a connection was closed, don't check memory budget.
if (!ptr_.expired()) {
// We don't rely on the connection ptr staying valid because we only access
// the threads backpressure
backpressure_->EnsureBelowLimit();
return true;
}
return false;
}

void Connection::DecreaseStatsOnClose() {
stats_->read_buf_capacity -= io_buf_.Capacity();

Expand Down
66 changes: 64 additions & 2 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class SinkReplyBuilder;
// For pipelined requests, monitor and pubsub messages it uses
// a separate dispatch queue that is processed on a separate fiber.
class Connection : public util::Connection {
struct QueueBackpressure;

public:
Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service);
Expand All @@ -75,7 +77,12 @@ class Connection : public util::Connection {
size_t message_len);
};

// Pipeline message, accumulated command to be executed.
struct InvalidationMessage {
std::string_view key;
bool invalidate_due_to_flush = false;
};

// Pipeline message, accumulated command to be execute.d
struct PipelineMessage {
PipelineMessage(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
}
Expand Down Expand Up @@ -134,7 +141,7 @@ class Connection : public util::Connection {
bool IsPubMsg() const;

std::variant<MonitorMessage, PubMessagePtr, PipelineMessagePtr, AclUpdateMessagePtr,
MigrationRequestMessage, CheckpointMessage>
MigrationRequestMessage, CheckpointMessage, InvalidationMessage>
handle;
};

Expand All @@ -143,11 +150,47 @@ class Connection : public util::Connection {

enum Phase { SETUP, READ_SOCKET, PROCESS, SHUTTING_DOWN, PRECLOSE, NUM_PHASES };

// Weak reference to a connection, invalidated upon connection close.
// Used to dispatch async operations for the connection without worrying about pointer lifetime.
struct WeakRef {
public:
// Get residing thread of connection. Thread-safe.
unsigned Thread() const;

// Get pointer to connection if still valid, nullptr if expired.
// Can only be called from connection's thread.
Connection* Get() const;

uint32_t GetClientId() const;

// Ensure owner thread's memory budget. If expired, skips and returns false. Thread-safe.
bool EnsureMemoryBudget() const;

bool operator==(const WeakRef rhs) const {
auto rhs_ptr = rhs.ptr_.lock();
auto lhs_ptr = ptr_.lock();
return (rhs_ptr == lhs_ptr);
};

private:
friend class Connection;

WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure, unsigned thread);

std::weak_ptr<Connection> ptr_;
QueueBackpressure* backpressure_;
unsigned thread_;
uint32_t client_id_;
};

public:
// Add PubMessage to dispatch queue.
// Virtual because behavior is overridden in test_utils.
virtual void SendPubMessageAsync(PubMessage);

// Add InvalidationMessage to dispatch queue.
void SendInvalidationMessageAsync(InvalidationMessage);

// Add monitor message to dispatch queue.
void SendMonitorMessageAsync(std::string);

Expand Down Expand Up @@ -176,6 +219,9 @@ class Connection : public util::Connection {
// Migrate this connecton to a different thread.
void Migrate(util::fb2::ProactorBase* dest);

// Borrow weak reference to connection. Can be called from any thread.
WeakRef Borrow();

static void ShutdownThreadLocal();

bool IsCurrentlyDispatching() const;
Expand Down Expand Up @@ -220,6 +266,16 @@ class Connection : public util::Connection {
// Connections will migrate at most once, and only when the flag --migrate_connections is true.
void RequestAsyncMigration(util::fb2::ProactorBase* dest);

// Set the flag to enable client side tracking
void EnableTracking();

// Set the flag to disable client side tracking
void DisableTracking();

bool IsTrackingOn() const {
return tracking_enabled_;
}

protected:
void OnShutdown() override;
void OnPreMigrateThread() override;
Expand Down Expand Up @@ -340,6 +396,9 @@ class Connection : public util::Connection {
RespVec tmp_parse_args_;
CmdArgVec tmp_cmd_vec_;

// Used to keep track of borrowed references. Does not really own itself
std::shared_ptr<Connection> self_;

// Pointer to corresponding queue backpressure struct.
// Needed for access from different threads by EnsureAsyncMemoryBudget().
QueueBackpressure* queue_backpressure_;
Expand All @@ -354,6 +413,9 @@ class Connection : public util::Connection {

// Per-thread queue backpressure structs.
static thread_local QueueBackpressure tl_queue_backpressure_;

// whether client tracking is enabled
bool tracking_enabled_ = false;
};

} // namespace facade
33 changes: 32 additions & 1 deletion src/facade/reply_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ constexpr char kCRLF[] = "\r\n";
constexpr char kErrPref[] = "-ERR ";
constexpr char kSimplePref[] = "+";

constexpr char kRET[] = "$1\r\n\r\r\n";

constexpr unsigned kConvFlags =
DoubleToStringConverter::UNIQUE_ZERO | DoubleToStringConverter::EMIT_POSITIVE_EXPONENT_SIGN;

Expand Down Expand Up @@ -261,6 +263,10 @@ char* RedisReplyBuilder::FormatDouble(double val, char* dest, unsigned dest_len)
RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) {
}

bool RedisReplyBuilder::IsResp3() const {
return is_resp3_;
}

void RedisReplyBuilder::SetResp3(bool is_resp3) {
is_resp3_ = is_resp3;
}
Expand Down Expand Up @@ -291,7 +297,6 @@ void RedisReplyBuilder::SendProtocolError(std::string_view str) {

void RedisReplyBuilder::SendSimpleString(std::string_view str) {
iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)};

Send(v, ABSL_ARRAYSIZE(v));
}

Expand Down Expand Up @@ -331,6 +336,32 @@ void RedisReplyBuilder::SendBulkString(std::string_view str) {
return Send(v, ABSL_ARRAYSIZE(v));
}

void RedisReplyBuilder::SendVerbatimString(std::string_view str, VerbatimFormat format) {
if (!is_resp3_)
return SendBulkString(str);

char tmp[absl::numbers_internal::kFastToBufferSize + 3];
tmp[0] = '=';
// + 4 because format is three byte, and need to be followed by a ":"
char* next = absl::numbers_internal::FastIntToBuffer(uint32_t(str.size() + 4), tmp + 1);
*next++ = '\r';
*next++ = '\n';

std::string_view lenpref{tmp, size_t(next - tmp)};

std::string_view format_str;
if (format == VerbatimFormat::TXT)
format_str = "txt:";
else if (format == VerbatimFormat::MARKDOWN)
format_str = "mkd:";
else {
DVLOG(1) << "Unknown verbatim reply format: " << format;
return;
}
iovec v[4] = {IoVec(lenpref), IoVec(format_str), IoVec(str), IoVec(kCRLF)};
return Send(v, ABSL_ARRAYSIZE(v));
}

void RedisReplyBuilder::SendLong(long num) {
string str = absl::StrCat(":", num, kCRLF);
SendRaw(str);
Expand Down
6 changes: 6 additions & 0 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,14 @@ class RedisReplyBuilder : public SinkReplyBuilder {
public:
enum CollectionType { ARRAY, SET, MAP, PUSH };

enum VerbatimFormat { TXT, MARKDOWN };

using StrSpan = std::variant<absl::Span<const std::string>, absl::Span<const std::string_view>>;

RedisReplyBuilder(::io::Sink* stream);

bool IsResp3() const;

void SetResp3(bool is_resp3);

void SendError(std::string_view str, std::string_view type = {}) override;
Expand Down Expand Up @@ -247,6 +251,8 @@ class RedisReplyBuilder : public SinkReplyBuilder {

static char* FormatDouble(double val, char* dest, unsigned dest_len);

void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT);

protected:
struct WrappedStrSpan : public StrSpan {
size_t Size() const;
Expand Down
22 changes: 6 additions & 16 deletions src/server/channel_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,12 @@ bool Matches(string_view pattern, string_view channel) {

} // namespace

ChannelStore::Subscriber::Subscriber(ConnectionContext* cntx, uint32_t tid)
: conn_cntx(cntx), borrow_token(cntx->conn_state.subscribe_info->borrow_token), thread_id(tid) {
}

ChannelStore::Subscriber::Subscriber(uint32_t tid)
: conn_cntx(nullptr), borrow_token(0), thread_id(tid) {
bool ChannelStore::Subscriber::ByThread(const Subscriber& lhs, const Subscriber& rhs) {
return ByThreadId(lhs, rhs.Thread());
}

bool ChannelStore::Subscriber::ByThread(const Subscriber& lhs, const Subscriber& rhs) {
if (lhs.thread_id == rhs.thread_id)
return (lhs.conn_cntx != nullptr) < (rhs.conn_cntx != nullptr);
return lhs.thread_id < rhs.thread_id;
bool ChannelStore::Subscriber::ByThreadId(const Subscriber& lhs, const unsigned thread) {
return lhs.Thread() < thread;
}

ChannelStore::UpdatablePointer::UpdatablePointer(const UpdatablePointer& other) {
Expand Down Expand Up @@ -120,12 +114,8 @@ void ChannelStore::Fill(const SubscribeMap& src, const string& pattern, vector<S
out->reserve(out->size() + src.size());
for (const auto [cntx, thread_id] : src) {
CHECK(cntx->conn_state.subscribe_info);

Subscriber s(cntx, thread_id);
s.pattern = pattern;
s.borrow_token.Inc();

out->push_back(std::move(s));
Subscriber sub{cntx->conn()->Borrow(), pattern};
out->push_back(std::move(sub));
}
}

Expand Down
Loading
Loading