Skip to content

Commit

Permalink
remove ErrorCode::NoConsumers
Browse files Browse the repository at this point in the history
  • Loading branch information
danclive committed Jul 3, 2020
1 parent f5cdfab commit bc9ee0c
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 202 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "queen"
version = "0.14.1"
version = "0.14.2"
license = "MIT"
authors = ["danclive <dangcheng@hotmail.com>"]
description = "message queue"
Expand Down
2 changes: 0 additions & 2 deletions src/error.rs
Expand Up @@ -93,7 +93,6 @@ pub enum ErrorCode {
Unauthorized,
AuthenticationFailed,
PermissionDenied,
NoConsumers,
DuplicateSlotId,
TargetSlotIdNotExist,
RefuseReceiveMessage,
Expand Down Expand Up @@ -134,7 +133,6 @@ impl ErrorCode {
ErrorCode::Unauthorized => "Unauthorized",
ErrorCode::AuthenticationFailed => "AuthenticationFailed",
ErrorCode::PermissionDenied => "PermissionDenied",
ErrorCode::NoConsumers => "NoConsumers",
ErrorCode::DuplicateSlotId => "DuplicatePortId",
ErrorCode::TargetSlotIdNotExist => "TargetPortIdNotExist",
ErrorCode::RefuseReceiveMessage => "RefuseReceiveMessage",
Expand Down
18 changes: 2 additions & 16 deletions src/socket/switch.rs
Expand Up @@ -385,12 +385,10 @@ impl Switch {
};
}

let mut no_consumers = true;

// 发送 P2P 消息
// 注意: 如果 TO 为数组,且为空,会按照常规消息发送
if !to_ids.is_empty() {
no_consumers = false;
// no_consumers = false;

// 如果存在 SHARE 字段,且值为 true,则只会随机给其中一个 SLOT 发送消息
if message.get_bool(SHARE).ok().unwrap_or(false) {
Expand Down Expand Up @@ -444,7 +442,7 @@ impl Switch {
}

if !array.is_empty() {
no_consumers = false;
// no_consumers = false;

if array.len() == 1 {
if let Some(slot) = self.slots.get(array[0]) {
Expand Down Expand Up @@ -473,23 +471,11 @@ impl Switch {
}
}

no_consumers = false;

send!(self, hook, slot, message);
}
}
}

if no_consumers {
message.remove(FROM);

ErrorCode::NoConsumers.insert(&mut message);

self.send_message(hook, token, message);

return
}

// slot event
// {
// CHAN: SLOT_SEND,
Expand Down
8 changes: 8 additions & 0 deletions src/wire.rs
Expand Up @@ -76,27 +76,33 @@ impl<T: Send> Wire<T> {
Ok((wire1, wire2))
}

#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}

#[inline]
pub fn attr(&self) -> LockGuard<Message> {
self.attr.lock()
}

#[inline]
pub fn close(&self) {
self.tx.push(Err(RecvError::Disconnected));
self.close.store(true, Ordering::Release);
}

#[inline]
pub fn is_close(&self) -> bool {
self.close.load(Ordering::Acquire)
}

#[inline]
pub fn is_full(&self) -> bool {
self.tx.pending() >= self.capacity
}

#[inline]
pub fn pending(&self) -> usize {
self.tx.pending()
}
Expand All @@ -117,6 +123,7 @@ impl<T: Send> Wire<T> {
Ok(())
}

#[inline]
pub fn send_num(&self) -> usize {
self.send_num.get()
}
Expand All @@ -133,6 +140,7 @@ impl<T: Send> Wire<T> {
}
}

#[inline]
pub fn recv_num(&self) -> usize {
self.recv_num.get()
}
Expand Down

0 comments on commit bc9ee0c

Please sign in to comment.