diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 70de2f3c3c49..9e1fc50a9b78 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1173,7 +1173,7 @@ Connection::WeakRef Connection::Borrow() { DCHECK(self_); DCHECK_GT(cc_->subscriptions, 0); - return WeakRef(self_, queue_backpressure_, socket_->proactor()->GetPoolIndex()); + return WeakRef(self_, queue_backpressure_, socket_->proactor()->GetPoolIndex(), id_); } void Connection::ShutdownThreadLocal() { @@ -1344,8 +1344,8 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const { } Connection::WeakRef::WeakRef(std::shared_ptr ptr, QueueBackpressure* backpressure, - unsigned thread) - : ptr_{ptr}, backpressure_{backpressure}, thread_{thread} { + unsigned thread, uint32_t client_id) + : ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} { } unsigned Connection::WeakRef::Thread() const { @@ -1357,6 +1357,14 @@ Connection* Connection::WeakRef::Get() const { return ptr_.lock().get(); } +bool Connection::WeakRef::IsExpired() const { + return ptr_.expired(); +} + +uint32_t Connection::WeakRef::GetClientId() const { + return client_id_; +} + bool Connection::WeakRef::EnsureMemoryBudget() const { // Simple optimization: If a connection was closed, don't check memory budget. if (!ptr_.expired()) { @@ -1368,4 +1376,12 @@ bool Connection::WeakRef::EnsureMemoryBudget() const { return false; } +bool Connection::WeakRef::operator<(const WeakRef& other) { + return client_id_ < other.client_id_; +} + +bool Connection::WeakRef::operator==(const WeakRef& other) { + return client_id_ == other.client_id_; +} + } // namespace facade diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 57c0a68299f1..957d68374ea2 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -156,17 +156,28 @@ class Connection : public util::Connection { // Can only be called from connection's thread. Connection* Get() const; + // Returns thue if the reference expired. Thread-safe. + bool IsExpired() const; + + // Returns client id.Thread-safe. + uint32_t GetClientId() const; + // Ensure owner thread's memory budget. If expired, skips and returns false. Thread-safe. bool EnsureMemoryBudget() const; + bool operator<(const WeakRef& other); + bool operator==(const WeakRef& other); + private: friend class Connection; - WeakRef(std::shared_ptr ptr, QueueBackpressure* backpressure, unsigned thread); + WeakRef(std::shared_ptr ptr, QueueBackpressure* backpressure, unsigned thread, + uint32_t client_id); std::weak_ptr ptr_; QueueBackpressure* backpressure_; unsigned thread_; + uint32_t client_id_; }; public: diff --git a/src/server/channel_store.cc b/src/server/channel_store.cc index 4307d5151059..7b5ffe2a1598 100644 --- a/src/server/channel_store.cc +++ b/src/server/channel_store.cc @@ -113,6 +113,8 @@ vector ChannelStore::FetchSubscribers(string_view chan void ChannelStore::Fill(const SubscribeMap& src, const string& pattern, vector* out) { out->reserve(out->size() + src.size()); for (const auto [cntx, thread_id] : src) { + // `cntx` is expected to be valid as it unregisters itself from the channel_store before + // closing. CHECK(cntx->conn_state.subscribe_info); Subscriber sub{cntx->conn()->Borrow(), pattern}; out->push_back(std::move(sub));