Skip to content
Merged

Bb8 #30

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
[workspace]
members = [
#
"async-ssh2-lite",
"async-ssh2-lite/demos/*",
#
"bb8-async-ssh2-lite",
"bb8-async-ssh2-lite/demo"
]
15 changes: 15 additions & 0 deletions bb8-async-ssh2-lite/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "bb8-async-ssh2-lite"
version = "0.1.0"
edition = "2021"

[features]
default = ["tokio"]

tokio = ["async-ssh2-lite/tokio"]

[dependencies]
async-ssh2-lite = { version = "0.4", default-features = false, path = "../async-ssh2-lite" }

bb8 = { version = "0.8", default-features = false }
async-trait = { version = "0.1", default-features = false }
14 changes: 14 additions & 0 deletions bb8-async-ssh2-lite/demo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "bb8-async-ssh2-lite-demo"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "bb8_asl_demo_tokio_tcp_stream"
path = "src/tokio_tcp_stream.rs"

[dependencies]
bb8-async-ssh2-lite = { path = "..", features = ["tokio"] }

tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
futures-util = { version = "0.3" }
53 changes: 53 additions & 0 deletions bb8-async-ssh2-lite/demo/src/tokio_tcp_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
RUST_BACKTRACE=1 RUST_LOG=trace cargo run -p bb8-async-ssh2-lite-demo --bin bb8_asl_demo_tokio_tcp_stream -- 127.0.0.1:22 root '~/.ssh/id_rsa'
*/

use std::env;

use bb8_async_ssh2_lite::{bb8, AsyncSessionManagerWithTokioTcpStream, AsyncSessionUserauthType};
use futures_util::{future::join_all, AsyncReadExt as _};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let socket_addr = env::args().nth(1).ok_or("socket_addr missing")?.parse()?;
let username = env::args().nth(2).ok_or("username missing")?;
let privatekey = env::args().nth(3).ok_or("privatekey missing")?.parse()?;

let mgr = AsyncSessionManagerWithTokioTcpStream::new(
socket_addr,
None,
username,
AsyncSessionUserauthType::PubkeyFile {
pubkey: None,
privatekey,
passphrase: None,
},
);

let pool = bb8::Pool::builder().build(mgr).await?;

let mut handles = vec![];
for i in 0..10 {
let pool = pool.clone();
let handle = tokio::spawn(async move {
let session = pool.get().await?;

let mut channel = session.channel_session().await?;
channel.exec("hostname").await?;
let mut s = String::new();
channel.read_to_string(&mut s).await?;
println!("exec hostname output:{s} i:{i}");
channel.close().await?;
println!("exec hostname exit_status:{} i:{i}", channel.exit_status()?);

Result::<(), Box<dyn std::error::Error + Send + Sync>>::Ok(())
});
handles.push(handle);
}

let rets = join_all(handles).await;
println!("rets:{rets:?}");
assert!(rets.iter().all(|x| x.is_ok()));

Ok(())
}
96 changes: 96 additions & 0 deletions bb8-async-ssh2-lite/src/impl_tokio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::net::SocketAddr;

use async_ssh2_lite::{AsyncSession, SessionConfiguration, TokioTcpStream};
use async_trait::async_trait;

use crate::{AsyncSessionManagerError, AsyncSessionUserauthType};

//
#[derive(Debug, Clone)]
pub struct AsyncSessionManagerWithTokioTcpStream {
socket_addr: SocketAddr,
configuration: Option<SessionConfiguration>,
username: String,
userauth_type: AsyncSessionUserauthType,
}

impl AsyncSessionManagerWithTokioTcpStream {
pub fn new(
socket_addr: SocketAddr,
configuration: impl Into<Option<SessionConfiguration>>,
username: impl AsRef<str>,
userauth_type: AsyncSessionUserauthType,
) -> Self {
Self {
socket_addr,
configuration: configuration.into(),
username: username.as_ref().into(),
userauth_type,
}
}
}

#[async_trait]
impl bb8::ManageConnection for AsyncSessionManagerWithTokioTcpStream {
type Connection = AsyncSession<TokioTcpStream>;

type Error = AsyncSessionManagerError;

async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let mut session = AsyncSession::<TokioTcpStream>::connect(
self.socket_addr,
self.configuration.to_owned(),
)
.await
.map_err(AsyncSessionManagerError::ConnectError)?;

session
.handshake()
.await
.map_err(AsyncSessionManagerError::HandshakeError)?;

match &self.userauth_type {
AsyncSessionUserauthType::Password { password } => {
session
.userauth_password(&self.username, password)
.await
.map_err(AsyncSessionManagerError::UserauthError)?;
}
AsyncSessionUserauthType::Agent => {
session
.userauth_agent(&self.username)
.await
.map_err(AsyncSessionManagerError::UserauthError)?;
}
AsyncSessionUserauthType::PubkeyFile {
pubkey,
privatekey,
passphrase,
} => {
session
.userauth_pubkey_file(
&self.username,
pubkey.as_deref(),
privatekey,
passphrase.as_deref(),
)
.await
.map_err(AsyncSessionManagerError::UserauthError)?;
}
}

if !session.authenticated() {
return Err(AsyncSessionManagerError::AssertAuthenticated);
}

Ok(session)
}

async fn is_valid(&self, _conn: &mut Self::Connection) -> Result<(), Self::Error> {
Ok(())
}

fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
false
}
}
37 changes: 37 additions & 0 deletions bb8-async-ssh2-lite/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
pub use async_ssh2_lite;
pub use bb8;

#[cfg(feature = "tokio")]
mod impl_tokio;
#[cfg(feature = "tokio")]
pub use impl_tokio::AsyncSessionManagerWithTokioTcpStream;

use std::path::PathBuf;

//
#[derive(Debug, Clone)]
pub enum AsyncSessionUserauthType {
Password {
password: String,
},
Agent,
PubkeyFile {
pubkey: Option<PathBuf>,
privatekey: PathBuf,
passphrase: Option<String>,
},
}

#[derive(Debug)]
pub enum AsyncSessionManagerError {
ConnectError(async_ssh2_lite::Error),
HandshakeError(async_ssh2_lite::Error),
UserauthError(async_ssh2_lite::Error),
AssertAuthenticated,
}
impl core::fmt::Display for AsyncSessionManagerError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{self:?}")
}
}
impl std::error::Error for AsyncSessionManagerError {}