Skip to content

Commit

Permalink
split out select functions into their own module (#48035)
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Jul 9, 2024
1 parent 2d2744d commit a6b8621
Show file tree
Hide file tree
Showing 10 changed files with 208 additions and 186 deletions.
4 changes: 2 additions & 2 deletions src/connmgr/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use crate::core::event;
use crate::core::executor::{Executor, Spawner};
use crate::core::list;
use crate::core::reactor::Reactor;
use crate::core::select::{select_2, select_5, select_6, select_option, Select2, Select5, Select6};
use crate::core::tnetstring;
use crate::core::zmq::{MultipartHeader, SpecInfo};
use crate::future::{
event_wait, select_2, select_5, select_6, select_option, yield_to_local_events,
CancellationSender, CancellationToken, Select2, Select5, Select6, Timeout,
event_wait, yield_to_local_events, CancellationSender, CancellationToken, Timeout,
};
use arrayvec::ArrayVec;
use ipnet::IpNet;
Expand Down
6 changes: 3 additions & 3 deletions src/connmgr/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ use crate::core::http1::Error as CoreHttpError;
use crate::core::http1::{self, client, server, RecvStatus, SendStatus};
use crate::core::net::SocketAddr;
use crate::core::reactor::Reactor;
use crate::core::select::{select_2, select_3, select_4, select_option, Select2, Select3, Select4};
use crate::core::shuffle::random;
use crate::core::waker::RefWakerData;
use crate::core::zmq::MultipartHeader;
use crate::future::{
io_split, poll_async, select_2, select_3, select_4, select_option, AsyncRead, AsyncReadExt,
AsyncTcpStream, AsyncTlsStream, AsyncWrite, AsyncWriteExt, CancellationToken, ReadHalf,
Select2, Select3, Select4, StdWriteWrapper, Timeout, TlsWaker, WriteHalf,
io_split, poll_async, AsyncRead, AsyncReadExt, AsyncTcpStream, AsyncTlsStream, AsyncWrite,
AsyncWriteExt, CancellationToken, ReadHalf, StdWriteWrapper, Timeout, TlsWaker, WriteHalf,
};
use arrayvec::{ArrayString, ArrayVec};
use ipnet::IpNet;
Expand Down
3 changes: 2 additions & 1 deletion src/connmgr/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use crate::core::channel;
use crate::core::executor::Executor;
use crate::core::net::{NetListener, NetStream, SocketAddr};
use crate::core::reactor::Reactor;
use crate::future::{select_2, select_slice, AsyncNetListener, NetAcceptFuture, Select2};
use crate::core::select::{select_2, select_slice, Select2};
use crate::future::{AsyncNetListener, NetAcceptFuture};
use log::{debug, error};
use std::cmp;
use std::sync::mpsc;
Expand Down
8 changes: 5 additions & 3 deletions src/connmgr/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@ use crate::core::fs::{set_group, set_user};
use crate::core::list;
use crate::core::net::{set_socket_opts, NetListener, NetStream, SocketAddr};
use crate::core::reactor::Reactor;
use crate::core::select::{
select_2, select_3, select_6, select_8, select_option, Select2, Select3, Select6, Select8,
};
use crate::core::tnetstring;
use crate::core::waker::RefWakerData;
use crate::core::zmq::SpecInfo;
use crate::future::{
event_wait, select_2, select_3, select_6, select_8, select_option, yield_to_local_events,
AsyncTcpStream, AsyncTlsStream, AsyncUnixStream, CancellationSender, CancellationToken,
Select2, Select3, Select6, Select8, Timeout, TlsWaker,
event_wait, yield_to_local_events, AsyncTcpStream, AsyncTlsStream, AsyncUnixStream,
CancellationSender, CancellationToken, Timeout, TlsWaker,
};
use arrayvec::{ArrayString, ArrayVec};
use log::{debug, error, info, warn};
Expand Down
2 changes: 1 addition & 1 deletion src/connmgr/zhttpsocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use crate::core::event;
use crate::core::executor::Executor;
use crate::core::list;
use crate::core::reactor::Reactor;
use crate::core::select::{select_10, select_9, select_option, select_slice, Select10, Select9};
use crate::core::tnetstring;
use crate::core::zmq::{
AsyncZmqSocket, MultipartHeader, SpecInfo, ZmqSendFuture, ZmqSendToFuture, ZmqSocket,
REGISTRATIONS_PER_ZMQSOCKET,
};
use crate::future::{select_10, select_9, select_option, select_slice, Select10, Select9};
use arrayvec::{ArrayString, ArrayVec};
use log::{debug, error, log_enabled, trace, warn};
use slab::Slab;
Expand Down
5 changes: 2 additions & 3 deletions src/core/http1/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use crate::core::buffer::{Buffer, VecRingBuffer, VECTORED_MAX};
use crate::core::http1::error::Error;
use crate::core::http1::protocol::{self, BodySize, Header, ParseScratch, ParseStatus};
use crate::core::http1::util::*;
use crate::future::{
select_2, AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, Select2, StdWriteWrapper, WriteHalf,
};
use crate::core::select::{select_2, Select2};
use crate::future::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, StdWriteWrapper, WriteHalf};
use std::cell::RefCell;
use std::io::{self, Write};
use std::mem;
Expand Down
5 changes: 2 additions & 3 deletions src/core/http1/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use crate::core::buffer::{Buffer, ContiguousBuffer, VecRingBuffer, VECTORED_MAX}
use crate::core::http1::error::Error;
use crate::core::http1::protocol::{self, BodySize, Header, ParseScratch, ParseStatus};
use crate::core::http1::util::*;
use crate::future::{
select_2, AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, Select2, StdWriteWrapper, WriteHalf,
};
use crate::core::select::{select_2, Select2};
use crate::future::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, StdWriteWrapper, WriteHalf};
use std::cell::{Cell, RefCell};
use std::io::{self, Write};
use std::pin::pin;
Expand Down
1 change: 1 addition & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod list;
pub mod log;
pub mod net;
pub mod reactor;
pub mod select;
pub mod shuffle;
pub mod timer;
pub mod tnetstring;
Expand Down
190 changes: 190 additions & 0 deletions src/core/select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright (C) 2020-2023 Fanout, Inc.
* Copyright (C) 2024 Fastly, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use crate::core::shuffle::shuffle;
use paste::paste;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

fn range_unordered(dest: &mut [usize]) -> &[usize] {
for (index, v) in dest.iter_mut().enumerate() {
*v = index;
}

shuffle(dest);

dest
}

fn map_poll<F, W, V>(cx: &mut Context, fut: &mut F, wrap_func: W) -> Poll<V>
where
F: Future + Unpin,
W: FnOnce(F::Output) -> V,
{
match Pin::new(fut).poll(cx) {
Poll::Ready(v) => Poll::Ready(wrap_func(v)),
Poll::Pending => Poll::Pending,
}
}

macro_rules! declare_select {
($count: literal, ( $($num:literal),* )) => {
paste! {
pub enum [<Select $count>]<$([<O $num>], )*> {
$(
[<R $num>]([<O $num>]),
)*
}

pub struct [<Select $count Future>]<$([<F $num>], )*> {
$(
[<f $num>]: [<F $num>],
)*
}

impl<$([<F $num>], )*> Future for [<Select $count Future>]<$([<F $num>], )*>
where
$(
[<F $num>]: Future + Unpin,
)*
{
type Output = [<Select $count>]<$([<F $num>]::Output, )*>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut indexes = [0; $count];

for i in range_unordered(&mut indexes) {
let s = &mut *self;

let p = match i + 1 {
$(
$num => map_poll(cx, &mut s.[<f $num>], |v| [<Select $count>]::[<R $num>](v)),
)*
_ => unreachable!(),
};

if p.is_ready() {
return p;
}
}

Poll::Pending
}
}

#[allow(clippy::too_many_arguments)]
pub fn [<select_ $count>]<$([<F $num>], )*>(
$(
[<f $num>]: [<F $num>],
)*
) -> [<Select $count Future>]<$([<F $num>], )*>
where
$(
[<F $num>]: Future + Unpin,
)*
{
[<Select $count Future>] {
$(
[<f $num>],
)*
}
}
}
}
}

declare_select!(2, (1, 2));
declare_select!(3, (1, 2, 3));
declare_select!(4, (1, 2, 3, 4));
declare_select!(5, (1, 2, 3, 4, 5));
declare_select!(6, (1, 2, 3, 4, 5, 6));
declare_select!(8, (1, 2, 3, 4, 5, 6, 7, 8));
declare_select!(9, (1, 2, 3, 4, 5, 6, 7, 8, 9));
declare_select!(10, (1, 2, 3, 4, 5, 6, 7, 8, 9, 10));

pub struct SelectSliceFuture<'a, F> {
futures: &'a mut [F],
scratch: &'a mut Vec<usize>,
}

impl<F, O> Future for SelectSliceFuture<'_, F>
where
F: Future<Output = O> + Unpin,
{
type Output = (usize, F::Output);

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let s = &mut *self;

let indexes = &mut s.scratch;
indexes.resize(s.futures.len(), 0);

for i in range_unordered(&mut indexes[..s.futures.len()]) {
if let Poll::Ready(v) = Pin::new(&mut s.futures[*i]).poll(cx) {
return Poll::Ready((*i, v));
}
}

Poll::Pending
}
}

pub fn select_slice<'a, F, O>(
futures: &'a mut [F],
scratch: &'a mut Vec<usize>,
) -> SelectSliceFuture<'a, F>
where
F: Future<Output = O> + Unpin,
{
if futures.len() > scratch.capacity() {
panic!(
"select_slice scratch is not large enough: {}, need {}",
scratch.capacity(),
futures.len()
);
}

SelectSliceFuture { futures, scratch }
}

pub struct SelectOptionFuture<F> {
fut: Option<F>,
}

impl<F, O> Future for SelectOptionFuture<F>
where
F: Future<Output = O> + Unpin,
{
type Output = O;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let s = &mut *self;

match Pin::new(&mut s.fut).as_pin_mut() {
Some(f) => f.poll(cx),
None => Poll::Pending,
}
}
}

pub fn select_option<F, O>(fut: Option<F>) -> SelectOptionFuture<F>
where
F: Future<Output = O>,
{
SelectOptionFuture { fut }
}
Loading

0 comments on commit a6b8621

Please sign in to comment.