Skip to content

Commit

Permalink
feat: support to wait for sockets (#1)
Browse files Browse the repository at this point in the history
Introduces

```bash
wait-on socket
```

Which allows to wait on a socket to be available.

<!--
Developer's Certificate of Origin 1.1

By making a contribution to this project, I certify that:

(a) The contribution was created in whole or in part by me and I
    have the right to submit it under the open source license
    indicated in the file; or

(b) The contribution is based upon previous work that, to the best
    of my knowledge, is covered under an appropriate open source
    license and I have the right under that license to submit that
    work with modifications, whether created in whole or in part
    by me, under the same open source license (unless I am
    permitted to submit under a different license), as indicated
    in the file; or

(c) The contribution was provided directly to me by some other
    person who certified (a), (b) or (c) and I have not modified
    it.

(d) I understand and agree that this project and the contribution
    are public and that a record of the contribution (including all
    personal information I submit with it, including my sign-off) is
    maintained indefinitely and may be redistributed consistent with
    this project or the open source license(s) involved.
-->
  • Loading branch information
EstebanBorai committed Apr 7, 2024
1 parent 1952a67 commit 83bd442
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 1 deletion.
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ path = "src/bin/main.rs"
[dependencies]
anyhow = "1.0.81"
clap = { version = "4.5.4", features = ["std", "derive", "env"] }
tokio = { version = "1.37.0", features = ["rt-multi-thread", "macros"] }
pin-project = "1.1.5"
tokio = { version = "1.37.0", features = ["io-util", "rt-multi-thread", "macros", "net"] }
notify = "6.1.1"
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ cargo install wait-on
wait-on file /path/to/file
```

### Wait for a Socket to be available using TCP Protocol

```bash
wait-on socket -i 127.0.0.1 -p 8080
```

## License

This project is licensed under the MIT license and the Apache License 2.0.
1 change: 1 addition & 0 deletions src/bin/command/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod file;
pub mod socket;
22 changes: 22 additions & 0 deletions src/bin/command/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::net::IpAddr;

use anyhow::Result;
use clap::Args;

use wait_on::resource::socket::SocketWaiter;
use wait_on::{WaitOptions, Waitable};

#[derive(Args, Debug)]
pub struct SocketOpt {
#[clap(short = 'p', long = "port")]
pub port: u16,
#[clap(short = 'i', long = "ip", default_value = "127.0.0.1")]
pub addr: IpAddr,
}

impl SocketOpt {
pub async fn exec(&self) -> Result<()> {
let waiter = SocketWaiter::new(self.addr, self.port);
waiter.wait(WaitOptions::default()).await
}
}
5 changes: 5 additions & 0 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::Result;
use clap::Parser;

use self::command::file::FileOpt;
use self::command::socket::SocketOpt;

#[derive(Debug, Parser)]
#[command(
Expand All @@ -13,7 +14,10 @@ use self::command::file::FileOpt;
next_line_help = true
)]
pub enum Command {
/// Wait on a file to be available
File(FileOpt),
/// Wait on a socket to be available using the TCP Protocol
Socket(SocketOpt),
}

#[derive(Debug, Parser)]
Expand All @@ -28,5 +32,6 @@ async fn main() -> Result<()> {

match args.command {
Command::File(opt) => opt.exec().await,
Command::Socket(opt) => opt.exec().await,
}
}
4 changes: 4 additions & 0 deletions src/resource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,25 @@
//! own configuration based on the protocols used.

pub mod file;
pub mod socket;

use anyhow::Result;

use crate::{WaitOptions, Waitable};

use self::file::FileWaiter;
use self::socket::SocketWaiter;

pub enum Resource {
File(FileWaiter),
Socket(SocketWaiter),
}

impl Waitable for Resource {
async fn wait(self, options: WaitOptions) -> Result<()> {
match self {
Resource::File(file) => file.wait(options).await,
Resource::Socket(socket) => socket.wait(options).await,
}
}
}
127 changes: 127 additions & 0 deletions src/resource/socket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::net::{IpAddr, SocketAddr};
use std::pin::Pin;
use std::task::{Context, Poll};

use anyhow::{Error, Result};
use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf};
use tokio::net::{TcpListener, TcpStream};

use crate::{WaitOptions, Waitable};

/// Listens on a specific IP Address and Port using TCP protocol
pub struct SocketWaiter {
pub addr: IpAddr,
pub port: u16,
}

impl SocketWaiter {
pub fn new(addr: IpAddr, port: u16) -> Self {
Self { addr, port }
}

pub fn socket(&self) -> SocketAddr {
SocketAddr::new(self.addr, self.port)
}
}

#[pin_project]
pub struct PacketExtractor<const B: usize> {
pub header: [u8; B],
pub forwarded: usize,
#[pin]
pub socket: TcpStream,
}

impl<const B: usize> PacketExtractor<B> {
pub async fn read(socket: TcpStream) -> Result<Self> {
let mut extractor = Self {
header: [0; B],
forwarded: 0,
socket,
};

extractor.socket.read_exact(&mut extractor.header).await?;

Ok(extractor)
}

pub fn get_header(&mut self) -> &[u8; B] {
&self.header
}
}

impl<const B: usize> AsyncRead for PacketExtractor<B> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buff: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let extractor = self.project();

if *extractor.forwarded < extractor.header.len() {
let leftover = &extractor.header[*extractor.forwarded..];
let num_forward_now = leftover.len().min(buff.remaining());
let forward = &leftover[..num_forward_now];

buff.put_slice(forward);
*extractor.forwarded += num_forward_now;

return Poll::Ready(Ok(()));
}

extractor.socket.poll_read(cx, buff)
}
}

impl<const B: usize> AsyncWrite for PacketExtractor<B> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buff: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
let extractor = self.project();
extractor.socket.poll_write(cx, buff)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
let extractor = self.project();
extractor.socket.poll_flush(cx)
}

fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let extractor = self.project();
extractor.socket.poll_shutdown(cx)
}
}

impl Waitable for SocketWaiter {
async fn wait(self, _: WaitOptions) -> Result<()> {
let tcp_listener = TcpListener::bind(self.socket()).await?;
let (socket, _) = tcp_listener.accept().await?;
let mut socket = PacketExtractor::<8>::read(socket).await?;

tokio::spawn(async move {
let mut buf = vec![0; 1024];

loop {
let n = socket
.read(&mut buf)
.await
.expect("failed to read data from socket");

if n == 0 {
// socket closed
return;
}
}
})
.await
.map_err(|err| Error::msg(err.to_string()))?;

Ok(())
}
}

0 comments on commit 83bd442

Please sign in to comment.