Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incomplete: Wake waiting tower-batch clones on close #1764

Merged
merged 3 commits into from
Feb 17, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 48 additions & 4 deletions tower-batch/src/semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
// Copied from tower/src/semaphore.rs
// When/if tower-batch is upstreamed, delete this file
// and use the common tower semaphore implementation
// Copied from tower/src/semaphore.rs, commit:
// d4d1c67 hedge: use auto-resizing histograms (#484)
//
// When we upgrade to tower 0.4, we can use tokio's PollSemaphore, like tower's:
// ccfaffc buffer, limit: use `tokio-util`'s `PollSemaphore` (#556)

// Ignore lints on this copied code
#![allow(dead_code)]

pub(crate) use self::sync::OwnedSemaphorePermit as Permit;
use futures_core::ready;
Expand All @@ -9,7 +14,7 @@ use std::{
future::Future,
mem,
pin::Pin,
sync::Arc,
sync::{Arc, Weak},
task::{Context, Poll},
};
use tokio::sync;
Expand All @@ -20,13 +25,32 @@ pub(crate) struct Semaphore {
state: State,
}

#[derive(Debug)]
pub(crate) struct Close {
semaphore: Weak<sync::Semaphore>,
permits: usize,
}

enum State {
Waiting(Pin<Box<dyn Future<Output = Permit> + Send + Sync + 'static>>),
Ready(Permit),
Empty,
}

impl Semaphore {
pub(crate) fn new_with_close(permits: usize) -> (Self, Close) {
let semaphore = Arc::new(sync::Semaphore::new(permits));
let close = Close {
semaphore: Arc::downgrade(&semaphore),
permits,
};
let semaphore = Self {
semaphore,
state: State::Empty,
};
(semaphore, close)
}

pub(crate) fn new(permits: usize) -> Self {
Self {
semaphore: Arc::new(sync::Semaphore::new(permits)),
Expand Down Expand Up @@ -76,3 +100,23 @@ impl fmt::Debug for State {
}
}
}

impl Close {
/// Close the semaphore, waking any remaining tasks currently awaiting a permit.
pub(crate) fn close(self) {
// The maximum number of permits that a `tokio::sync::Semaphore`
// can hold is usize::MAX >> 3. If we attempt to add more than that
// number of permits, the semaphore will panic.
// XXX(eliza): another shift is kinda janky but if we add (usize::MAX
// > 3 - initial permits) the semaphore impl panics (I think due to a
// bug in tokio?).
// TODO(eliza): Tokio should _really_ just expose `Semaphore::close`
// publicly so we don't have to do this nonsense...
const MAX: usize = std::usize::MAX >> 4;
if let Some(semaphore) = self.semaphore.upgrade() {
// If we added `MAX - available_permits`, any tasks that are
// currently holding permits could drop them, overflowing the max.
semaphore.add_permits(MAX - self.permits);
}
}
}