Skip to content

Commit

Permalink
feat: Add client_id, comparison operators, expiration check
Browse files Browse the repository at this point in the history
  • Loading branch information
dranikpg committed Dec 5, 2023
1 parent 2cdb8a9 commit 6f8af7d
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 4 deletions.
22 changes: 19 additions & 3 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ Connection::WeakRef Connection::Borrow() {
DCHECK(self_);
DCHECK_GT(cc_->subscriptions, 0);

return WeakRef(self_, queue_backpressure_, socket_->proactor()->GetPoolIndex());
return WeakRef(self_, queue_backpressure_, socket_->proactor()->GetPoolIndex(), id_);
}

void Connection::ShutdownThreadLocal() {
Expand Down Expand Up @@ -1344,8 +1344,8 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const {
}

Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread} {
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {
}

unsigned Connection::WeakRef::Thread() const {
Expand All @@ -1357,6 +1357,14 @@ Connection* Connection::WeakRef::Get() const {
return ptr_.lock().get();
}

bool Connection::WeakRef::IsExpired() const {
return ptr_.expired();
}

uint32_t Connection::WeakRef::GetClientId() const {
return client_id_;
}

bool Connection::WeakRef::EnsureMemoryBudget() const {
// Simple optimization: If a connection was closed, don't check memory budget.
if (!ptr_.expired()) {
Expand All @@ -1368,4 +1376,12 @@ bool Connection::WeakRef::EnsureMemoryBudget() const {
return false;
}

bool Connection::WeakRef::operator<(const WeakRef& other) {
return client_id_ < other.client_id_;
}

bool Connection::WeakRef::operator==(const WeakRef& other) {
return client_id_ == other.client_id_;
}

} // namespace facade
13 changes: 12 additions & 1 deletion src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,28 @@ class Connection : public util::Connection {
// Can only be called from connection's thread.
Connection* Get() const;

// Returns thue if the reference expired. Thread-safe.
bool IsExpired() const;

// Returns client id.Thread-safe.
uint32_t GetClientId() const;

// Ensure owner thread's memory budget. If expired, skips and returns false. Thread-safe.
bool EnsureMemoryBudget() const;

bool operator<(const WeakRef& other);
bool operator==(const WeakRef& other);

private:
friend class Connection;

WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure, unsigned thread);
WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure, unsigned thread,
uint32_t client_id);

std::weak_ptr<Connection> ptr_;
QueueBackpressure* backpressure_;
unsigned thread_;
uint32_t client_id_;
};

public:
Expand Down
2 changes: 2 additions & 0 deletions src/server/channel_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ vector<ChannelStore::Subscriber> ChannelStore::FetchSubscribers(string_view chan
void ChannelStore::Fill(const SubscribeMap& src, const string& pattern, vector<Subscriber>* out) {
out->reserve(out->size() + src.size());
for (const auto [cntx, thread_id] : src) {
// `cntx` is expected to be valid as it unregisters itself from the channel_store before
// closing.
CHECK(cntx->conn_state.subscribe_info);
Subscriber sub{cntx->conn()->Borrow(), pattern};
out->push_back(std::move(sub));
Expand Down

0 comments on commit 6f8af7d

Please sign in to comment.