diff --git a/admin-web/src/view/network/NetworkDetailPage.tsx b/admin-web/src/view/network/NetworkDetailPage.tsx index bd099cc..b1887eb 100644 --- a/admin-web/src/view/network/NetworkDetailPage.tsx +++ b/admin-web/src/view/network/NetworkDetailPage.tsx @@ -67,7 +67,7 @@ export default function NetworkDetailPage() { - @@ -84,4 +84,4 @@ export default function NetworkDetailPage() { // add Nodes Navigator, Invite Code // // -// \ No newline at end of file +// diff --git a/backend/src/main/scala/com/timzaak/fornet/pubsub/NodeChangeNotifyService.scala b/backend/src/main/scala/com/timzaak/fornet/pubsub/NodeChangeNotifyService.scala index dff44a5..d0a9689 100644 --- a/backend/src/main/scala/com/timzaak/fornet/pubsub/NodeChangeNotifyService.scala +++ b/backend/src/main/scala/com/timzaak/fornet/pubsub/NodeChangeNotifyService.scala @@ -119,16 +119,6 @@ class NodeChangeNotifyService( val network = networkDao.findById(node.networkId).get val peer = EntityConvert.toPeer(node, network) - connectionManager.sendNetworkMessage( - node.networkId, - NetworkMessage( - networkId = networkId, - NetworkMessage.Info.Peer( - PeerChange(addPeer = Some(peer)) - ) - ) - ) - val notifyNodes = nodeService.getAllRelativeNodes(node) connectionManager.sendClientMessage( @@ -142,6 +132,16 @@ class NodeChangeNotifyService( ), ) ) + + connectionManager.sendNetworkMessage( + node.networkId, + NetworkMessage( + networkId = networkId, + NetworkMessage.Info.Peer( + PeerChange(addPeer = Some(peer)) + ) + ) + ) case _ => // do nothing. } diff --git a/client/.cargo/config.toml b/client/.cargo/config.toml new file mode 100644 index 0000000..2d3ed61 --- /dev/null +++ b/client/.cargo/config.toml @@ -0,0 +1,3 @@ +[build] +# This is for tokio-console +#rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/client/Cargo.lock b/client/Cargo.lock index e4fbd3d..f29e898 100644 --- a/client/Cargo.lock +++ b/client/Cargo.lock @@ -2404,6 +2404,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.45.0", ] diff --git a/client/bin/Cargo.toml b/client/bin/Cargo.toml index 4a76e3a..589abe6 100644 --- a/client/bin/Cargo.toml +++ b/client/bin/Cargo.toml @@ -18,9 +18,10 @@ fornet-lib = {path = "../lib"} clap = { version = "3.2.20", features = ["env"] } anyhow = "1.0" -tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread", "net", "signal"] } +tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread", "net", "signal", "tracing"] } tracing = "0.1.36" tracing-subscriber = {version = "0.3.15", features = ["env-filter"]} +#console-subscriber = "0.1.9" serde = { version = "1.0.144" } serde_json = "1.0.85" diff --git a/client/bin/src/fornet.rs b/client/bin/src/fornet.rs index 275881c..153c63a 100644 --- a/client/bin/src/fornet.rs +++ b/client/bin/src/fornet.rs @@ -16,48 +16,43 @@ async fn main() -> anyhow::Result<()> { let matches = Command::new(APP_NAME) .version(env!("CARGO_PKG_VERSION")) - .author("timzaak ") + .author("ForNetCode ") .args(&[ Arg::new("config") .long("config") .short('c') .env("FORNET_CONFIG") .help("config directory path") - .default_value(&default_config_path) + .default_value(&default_config_path), ]) .get_matches(); + let config_dir = matches.value_of("config").unwrap().to_owned(); - match matches.subcommand() { - None => { - let log_level = if cfg!(debug_assertions) { - "debug" - } else { - "info" - }; - let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::from_str(log_level).unwrap()); - if cfg!(debug_assertions) { - tracing_subscriber::fmt() - .with_env_filter(env_filter) - .init(); - } else { - tracing_subscriber::fmt() - .with_env_filter(env_filter) - .with_target(false) - .with_ansi(false) - .init(); - } - //tracing::info!("log level: {log_level}"); - ServerManager::start_server(config_dir, StartMethod::CommandLine).await?; - tokio::signal::ctrl_c().await.unwrap(); - //TODO: add signal - } - _ => { - println!("please type: fornet --help for more information.") - } + //console_subscriber::init(); + + let log_level = if cfg!(debug_assertions) { + "debug" + } else { + "info" }; + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::from_str(log_level).unwrap()); + if cfg!(debug_assertions) { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .init(); + } else { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_target(false) + .with_ansi(false) + .init(); + } + + ServerManager::start_server(config_dir, StartMethod::CommandLine).await?; + tokio::signal::ctrl_c().await.unwrap(); Ok(()) } diff --git a/client/bin/src/fornet_cli.rs b/client/bin/src/fornet_cli.rs index c1b56d3..bf0dc15 100644 --- a/client/bin/src/fornet_cli.rs +++ b/client/bin/src/fornet_cli.rs @@ -8,7 +8,7 @@ use fornet_lib::wr_manager::DeviceInfoResp; pub async fn main() -> anyhow::Result<()> { let mut command = Command::new("fornet-cli") .version(env!("CARGO_PKG_VERSION")) - .author("timzaak ") + .author("ForNetCode ") .subcommand( Command::new("join").arg(Arg::new("invite_token").help("base64 token").required(true)), ).subcommand( diff --git a/client/lib/Cargo.toml b/client/lib/Cargo.toml index d954dca..5a7a652 100644 --- a/client/lib/Cargo.toml +++ b/client/lib/Cargo.toml @@ -61,8 +61,6 @@ paho-mqtt = "0.12" - - [dependencies.boringtun] version = "0.5.2" path = "../../third/boringtun/boringtun" @@ -76,5 +74,9 @@ windows = {version = "0.44", features=["Win32_Networking_WinSock"]} cidr-utils = "0.5.10" #winapi = {version = "0.3", features=["ws2def"]} + + + + [build-dependencies] tonic-build = "0.8" \ No newline at end of file diff --git a/client/lib/examples/tokio_select_example.rs b/client/lib/examples/tokio_select_example.rs new file mode 100644 index 0000000..8eaa047 --- /dev/null +++ b/client/lib/examples/tokio_select_example.rs @@ -0,0 +1,28 @@ +use std::time::Duration; + +async fn interval() { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + interval.tick().await; + println!("interval...") + } +} +#[tokio::main] +async fn main() -> anyhow::Result<()>{ + let task = tokio::spawn(async move { + loop { + tokio::select! { + _ = interval() => { + println!("interval finish"); + } + } + } + }); + tokio::time::sleep(Duration::from_secs(5)).await; + task.abort(); + println!("task is finish: {}", task.is_finished()); + tokio::time::sleep(Duration::from_secs(5)).await; + println!("task is finish: {}", task.is_finished()); + tokio::signal::ctrl_c().await.unwrap(); + Ok(()) +} \ No newline at end of file diff --git a/client/lib/src/device/mod.rs b/client/lib/src/device/mod.rs index 8e6fe32..d9afff8 100644 --- a/client/lib/src/device/mod.rs +++ b/client/lib/src/device/mod.rs @@ -32,7 +32,8 @@ use prost::bytes::BufMut; use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::sync::{Mutex, RwLock}; use tokio::time; -use tokio::io::{AsyncReadExt, AsyncWriteExt};//keep +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +//keep use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use allowed_ips::AllowedIps; @@ -46,7 +47,7 @@ use self::tun::WritePart; const HANDSHAKE_RATE_LIMIT: u64 = 100; // The number of handshakes per second we can tolerate before using cookies const MAX_UDP_SIZE: usize = (1 << 16) - 1; -const MAX_TCP_SIZE: usize = (1 << 16) -1; +const MAX_TCP_SIZE: usize = (1 << 16) - 1; // const MAX_ITR: usize = 100; // Number of packets to handle per handler call #[derive(Debug)] @@ -140,6 +141,7 @@ pub struct DeviceData { pub key_pair: (x25519_dalek::StaticSecret, x25519_dalek::PublicKey), pub listen_port: u16, pub scripts: Scripts, + pub node_type: NodeType, } impl DeviceData { @@ -147,7 +149,8 @@ impl DeviceData { peers: Arc>, key_pair: (x25519_dalek::StaticSecret, x25519_dalek::PublicKey), listen_port: u16, - scripts:Scripts, + scripts: Scripts, + node_type:NodeType, ) -> Self { Self { name, @@ -156,6 +159,7 @@ impl DeviceData { key_pair, listen_port, scripts, + node_type, } } pub fn next_index(&mut self) -> u32 { @@ -207,14 +211,13 @@ impl DeviceData { let peer = Peer::new(tunn, next_index, endpoint, allowed_ips, ip, preshared_key); let peer = Arc::new(Mutex::new(peer)); let mut peers = self.peers.write().await; - - peers.by_key.insert(pub_key, Arc::clone(&peer)); peers.by_idx.insert(next_index, Arc::clone(&peer)); for AllowedIP { addr, cidr } in allowed_ips { peers.by_ip .insert(*addr, *cidr as _, Arc::clone(&peer)); } + peers.by_key.insert(pub_key, peer.clone()); tracing::info!("Peer added"); } @@ -299,7 +302,7 @@ pub async fn rate_limiter_timer(rate_limiter: &Arc) { pub async fn peers_timer(peers: &Arc>, udp4: &UdpSocket, udp6: &UdpSocket) { let mut interval = time::interval(Duration::from_millis(250)); - let mut dst_buf: Vec= vec![0; MAX_UDP_SIZE]; + let mut dst_buf: Vec = vec![0; MAX_UDP_SIZE]; loop { interval.tick().await; @@ -318,7 +321,6 @@ pub async fn peers_timer(peers: &Arc>, udp4: &UdpSocket, udp6: &Ud } TunnResult::Err(e) => tracing::error!(message = "Timer error", error = ?e), TunnResult::WriteToNetwork(packet) => { - let _ = match endpoint_addr { SocketAddr::V4(_) => udp4.send_to(packet, endpoint_addr).await, SocketAddr::V6(_) => udp6.send_to(packet, endpoint_addr).await, @@ -340,7 +342,8 @@ pub async fn tcp_peers_timer( node_type: NodeType, ) { let mut interval = time::interval(Duration::from_millis(250)); - let mut dst_buf: Vec= vec![0; MAX_UDP_SIZE]; + + let mut dst_buf: Vec = vec![0; MAX_UDP_SIZE]; let error_recovery_duration = Duration::from_secs(10); loop { @@ -364,7 +367,7 @@ pub async fn tcp_peers_timer( let (reader, writer) = conn.into_split(); p.endpoint.tcp_conn = TcpConnection::Connected(writer); tcp_handler(reader, WriterState::PeerWriter(peer.clone()), endpoint_addr, key_pair.clone(), rate_limiter.clone(), peers.clone(), iface.clone(), pi); - }, + } Err(error) => { tracing::debug!("connect {endpoint_addr:?} failure, error: {error:?}"); p.endpoint.tcp_conn = TcpConnection::ConnectedFailure(error, SystemTime::now()); @@ -373,11 +376,13 @@ pub async fn tcp_peers_timer( } continue; } - TcpConnection::Connecting(_) => { + TcpConnection::Connecting(_)| TcpConnection::End => { //TODO: add check of time, and reconnect continue; } - _ => {} + _ => { + + } }; match p.update_timers(&mut dst_buf) { TunnResult::Done => {} @@ -451,7 +456,7 @@ pub async fn udp_handler(udp: &UdpSocket, } TunnResult::WriteToTunnelV4(packet, addr) => { // tracing::debug!("{addr:?}"); - if p.is_allowed_ip(addr) { + if p.is_allowed_ip(addr) { if pi { let mut buf: Vec = Vec::new(); buf.put_slice(&IP4_HEADER); @@ -496,7 +501,6 @@ pub async fn udp_handler(udp: &UdpSocket, } } }; - } } }; @@ -506,11 +510,9 @@ pub async fn udp_handler(udp: &UdpSocket, while let TunnResult::WriteToNetwork(packet) = p.tunnel.decapsulate(None, &[], &mut dst_buf[..]) - { + { let _ = udp.send_to(packet, addr).await; } - - } p.set_endpoint(addr); } @@ -528,18 +530,19 @@ pub async fn tcp_listener_handler( peers: Arc>, iface: Arc>, pi: bool, -) ->anyhow::Result<()> { +) -> anyhow::Result<()> { loop { let (socket, addr) = listener.accept().await?; let key_pair = key_pair.clone(); let rate_limiter = rate_limiter.clone(); let peers = peers.clone(); let iface = iface.clone(); - let (reader, writer ) = socket.into_split(); - tcp_handler(reader, WriterState::PureWriter(writer), addr,key_pair, rate_limiter, peers, iface, pi); + let (reader, writer) = socket.into_split(); + tcp_handler(reader, WriterState::PureWriter(writer), addr, key_pair, rate_limiter, peers, iface, pi); } //Ok(()) } + pub fn tcp_handler( //socket: TcpStream, reader: OwnedReadHalf, @@ -555,12 +558,11 @@ pub fn tcp_handler( let (private_key, public_key) = key_pair.as_ref(); let mut writer = writer; let mut reader = reader; - //let (mut reader, writer ) = socket.into_split(); - //let mut writer = WriterState::PureWriter(writer); let mut src_buf: Vec = vec![0; MAX_UDP_SIZE]; let mut dst_buf: Vec = vec![0; MAX_UDP_SIZE]; while let Ok(size) = reader.read(&mut src_buf).await { if size > 0 { + //tracing::debug!("tcp receive message"); let parsed_packet = match rate_limiter.as_ref().verify_packet(Some(addr.ip()), &src_buf[..size], &mut dst_buf) { Ok(packet) => packet, @@ -568,8 +570,8 @@ pub fn tcp_handler( match &mut writer { WriterState::PureWriter(writer) => { let _ = writer.write_all(cookie).await; - }, - WriterState::PeerWriter(peer)=> { + } + WriterState::PeerWriter(peer) => { let mut p = peer.lock().await; p.endpoint.tcp_write(cookie).await; } @@ -596,15 +598,23 @@ pub fn tcp_handler( Some(peer) => peer, }; + let mut p = peer.lock().await; - if let TcpConnection::Nothing | TcpConnection::ConnectedFailure(..) = p.endpoint.tcp_conn { - if let WriterState::PureWriter(_) = &mut writer { - let pure_writer = mem::replace(&mut writer,WriterState::PeerWriter(peer.clone())); - if let WriterState::PureWriter(_writer) = pure_writer { - p.endpoint.tcp_conn = TcpConnection::Connected(_writer); + match p.endpoint.tcp_conn { + TcpConnection::Nothing | TcpConnection::ConnectedFailure(..) => { + if let WriterState::PureWriter(_) = &mut writer { + let pure_writer = mem::replace(&mut writer, WriterState::PeerWriter(peer.clone())); + if let WriterState::PureWriter(_writer) = pure_writer { + p.endpoint.tcp_conn = TcpConnection::Connected(_writer); + } } } + TcpConnection::End => { + break; + } + _ => {} } + // We found a peer, use it to decapsulate the message+ let mut flush = false; // Are there packets to send from the queue? match p @@ -619,7 +629,7 @@ pub fn tcp_handler( } TunnResult::WriteToTunnelV4(packet, addr) => { // tracing::debug!("{addr:?}"); - if p.is_allowed_ip(addr) { + if p.is_allowed_ip(addr) { if pi { let mut buf: Vec = Vec::new(); buf.put_slice(&IP4_HEADER); @@ -631,61 +641,60 @@ pub fn tcp_handler( let _ = iface.lock().await.write(&buf).await; } } - } else { - cfg_if! { + } else { + cfg_if! { if #[cfg(target_os="windows")] { let _ = iface.lock().await.write(&packet); } else { let _ = iface.lock().await.write(&packet).await; } } - } - } else {} - } - TunnResult::WriteToTunnelV6(packet, addr) => { - if p.is_allowed_ip(addr) { - if pi { - let mut buf: Vec = Vec::new(); - buf.put_slice(&IP6_HEADER); - buf.put_slice(&packet); - cfg_if! { + } + } else {} + } + TunnResult::WriteToTunnelV6(packet, addr) => { + if p.is_allowed_ip(addr) { + if pi { + let mut buf: Vec = Vec::new(); + buf.put_slice(&IP6_HEADER); + buf.put_slice(&packet); + cfg_if! { if #[cfg(target_os="windows")] { let _ = iface.lock().await.write(&buf); } else { let _ = iface.lock().await.write(&buf).await; } } - } else { - cfg_if! { + } else { + cfg_if! { if #[cfg(target_os="windows")] { let _ = iface.lock().await.write(packet); } else { let _ = iface.lock().await.write(packet).await; } } - }; - } + }; } - }; + } + }; - if flush { + if flush { // Flush pending queue - while let TunnResult::WriteToNetwork(packet) = - p.tunnel.decapsulate(None, &[], &mut dst_buf[..]) - { - p.endpoint.tcp_write(packet).await; - } + while let TunnResult::WriteToNetwork(packet) = + p.tunnel.decapsulate(None, &[], &mut dst_buf[..]) { + p.endpoint.tcp_write(packet).await; } } + } else { + // if writer drop ,size would be zero, this may change in future tokio!! + break; } - tracing::info!("tcp: {addr:?} close"); - }); - - + } + tracing::info!("tcp read: {addr:?} close"); + }); } - pub struct Peers { pub by_key: HashMap>>, pub by_ip: AllowedIps>>, @@ -701,3 +710,59 @@ impl Default for Peers { } } } + + +#[cfg(test)] +mod test { + use std::time::Duration; + use crate::device::Device; + //use tracing_test::traced_test; + use crate::config::Identity; + use std::str::FromStr; + use crate::device::peer::AllowedIP; + use crate::device::script_run::Scripts; + use crate::protobuf::config::{NodeType, Protocol}; + + + fn new_client(protocol: Protocol, node_type:NodeType, allowed_ip:&str) ->Device { + let allowed_addr = vec![AllowedIP::from_str(allowed_ip).unwrap()]; + let identity = Identity::new(); + let key_pair = (identity.x25519_sk, identity.x25519_pk); + let port = Some(25516); + let mtu = 1000; + let scripts = Scripts::default(); + + let mut device = Device::new( + "for0", + &allowed_addr, + key_pair, + port, mtu, scripts, protocol, node_type, + ).unwrap(); + device + } + + + //sudo cargo test --lib device::test::udp_client_device_new_and_close + #[tokio::test] + pub async fn udp_client_device_new_and_close() { + let mut device = new_client(Protocol::Udp, NodeType::NodeClient, "10.0.0.1:32"); + tokio::time::sleep(Duration::from_secs(5)).await; + device.close().await; + tokio::time::sleep(Duration::from_secs(5)).await; + let mut device = new_client(Protocol::Udp, NodeType::NodeClient, "10.0.0.1:32"); + tokio::time::sleep(Duration::from_secs(5)).await; + device.close().await; + } + + //sudo cargo test --lib device::test::tcp_client_device_new_and_close + #[tokio::test] + pub async fn tcp_client_device_new_and_close() { + let mut device = new_client(Protocol::Tcp, NodeType::NodeClient, "10.0.0.1/32"); + tokio::time::sleep(Duration::from_secs(5)).await; + device.close().await; + tokio::time::sleep(Duration::from_secs(5)).await; + let mut device = new_client(Protocol::Tcp, NodeType::NodeClient, "10.0.0.1/32"); + tokio::time::sleep(Duration::from_secs(5)).await; + device.close().await; + } +} \ No newline at end of file diff --git a/client/lib/src/device/peer.rs b/client/lib/src/device/peer.rs index 7e7753e..6b0fdf4 100644 --- a/client/lib/src/device/peer.rs +++ b/client/lib/src/device/peer.rs @@ -22,6 +22,7 @@ pub enum TcpConnection { Connecting(SystemTime), Connected(OwnedWriteHalf), ConnectedFailure(std::io::Error, SystemTime), + End, } #[derive(Debug)] pub struct Endpoint { @@ -42,6 +43,8 @@ impl Endpoint { self.tcp_conn = TcpConnection::ConnectedFailure(e, SystemTime::now()); } }; + } else if let TcpConnection::End = self.tcp_conn { + tracing::debug!("the tcp conn has ended"); } } } @@ -56,6 +59,11 @@ pub struct Peer { pub ip: IpAddr, preshared_key: Option<[u8; 32]>, } +impl Drop for Peer { + fn drop(&mut self) { + tracing::debug!("peer: {} has been dropped", self.index); + } +} #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] pub struct AllowedIP { @@ -122,9 +130,9 @@ impl Peer { if let Some(_) = &mut self.endpoint.udp_conn.take() { tracing::info!("disconnecting from endpoint"); } else if let TcpConnection::Connected(_) = &mut self.endpoint.tcp_conn { + self.endpoint.tcp_conn = TcpConnection::End; tracing::info!("disconnecting tcp connection"); } - self.endpoint.tcp_conn = TcpConnection::Nothing; } pub fn set_endpoint(&mut self, addr: SocketAddr) { diff --git a/client/lib/src/device/tun/sys/linux.rs b/client/lib/src/device/tun/sys/linux.rs index df7a248..eb28e5b 100644 --- a/client/lib/src/device/tun/sys/linux.rs +++ b/client/lib/src/device/tun/sys/linux.rs @@ -1,13 +1,13 @@ use std::process::Command; use crate::device::peer::AllowedIP; -pub fn add_addr(iface_name:&str, address: &AllowedIP) -> anyhow::Result<()> { +pub fn set_address(iface_name:&str, address: &AllowedIP) -> anyhow::Result<()> { let inet = if address.addr.is_ipv4() { "-4" } else { "-6" }; Command::new("ip").args(&[inet, "address", "add", &address.to_string(), "dev", iface_name]).status()?; Ok(()) } - +/* pub fn set_route(iface_name:&str, allowed_ip: &AllowedIP) -> anyhow::Result<()> { //TODO: support allowed_ip is 0.0.0.0/0 // ip -4 route add 10.0.0.1/24 dev ForT @@ -17,7 +17,8 @@ pub fn set_route(iface_name:&str, allowed_ip: &AllowedIP) -> anyhow::Result<()> } pub fn remove_route(iface_name:&str, allowed_ip:&AllowedIP) -> anyhow::Result<()> { - let inet = if allowed_ip.addr.is_ipv4() { "-net" } else { "-6net" }; - Command::new("route").args(&["del", inet,&allowed_ip.to_string(),"dev", iface_name]).status()?; + //let inet = if allowed_ip.addr.is_ipv4() { "-net" } else { "-6net" }; + //Command::new("route").args(&["del", inet,&allowed_ip.to_string(),"dev", iface_name]).status()?; Ok(()) -} \ No newline at end of file +} +*/ \ No newline at end of file diff --git a/client/lib/src/device/tun/sys/macos.rs b/client/lib/src/device/tun/sys/macos.rs index 7a35f7f..4eaab25 100644 --- a/client/lib/src/device/tun/sys/macos.rs +++ b/client/lib/src/device/tun/sys/macos.rs @@ -1,5 +1,6 @@ use std::process::Command; use crate::device::peer::AllowedIP; +use cmd_lib::run_cmd; pub fn set_alias(iface_name: &str, address: &AllowedIP) -> anyhow::Result<()> { @@ -20,6 +21,6 @@ pub fn set_route(iface_name: &str, allowed_ip: &AllowedIP) -> anyhow::Result<()> pub fn remove_route(iface_name:&str, allowed_ip:&AllowedIP) -> anyhow::Result<()> { let inet = if allowed_ip.addr.is_ipv4() { "-inet" } else { "-inet6" }; - Command::new("route").args(&["-q", "-n", "delete", inet, &allowed_ip.to_string()]).status()?; + //Command::new("route").args(&["-q", "-n", "delete", inet, &allowed_ip.to_string(), "-interface", iface_name]).status()?; Ok(()) -} +} \ No newline at end of file diff --git a/client/lib/src/device/tun/sys/mod.rs b/client/lib/src/device/tun/sys/mod.rs index 1bf319b..a1c6c98 100644 --- a/client/lib/src/device/tun/sys/mod.rs +++ b/client/lib/src/device/tun/sys/mod.rs @@ -1,36 +1,11 @@ -use crate::device::peer::AllowedIP; - -#[cfg(target_os = "macos")] -mod macos; -#[cfg(target_os = "linux")] -mod linux; - - -pub fn set_alias(iface_name:&str, address: &AllowedIP) -> anyhow::Result<()> { - #[cfg(target_os = "macos")] - macos::set_alias(iface_name, address)?; - - #[cfg(target_os = "linux")] - linux::add_addr(iface_name, address)?; - - Ok(()) +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(target_os = "macos")] { + mod macos; + pub use macos::{set_route,set_alias, remove_route}; + } else if #[cfg(target_os = "linux")]{ + mod linux; + pub use linux::{set_address}; + } } - -pub fn set_route(iface_name:&str, allowed_ip: &AllowedIP) -> anyhow::Result<()> { - #[cfg(target_os = "macos")] - macos::set_route(iface_name, allowed_ip)?; - - #[cfg(target_os = "linux")] - linux::set_route(iface_name, allowed_ip)?; - Ok(()) -} - -pub fn remove_route(iface_name:&str, allowed_ip:&AllowedIP) -> anyhow::Result<()> { - #[cfg(target_os = "macos")] - macos::remove_route(iface_name, allowed_ip)?; - - #[cfg(target_os ="linux")] - linux::remove_route(iface_name, allowed_ip)?; - - Ok(()) -} \ No newline at end of file diff --git a/client/lib/src/device/tun/unix.rs b/client/lib/src/device/tun/unix.rs index 65e1156..b6c9c8f 100644 --- a/client/lib/src/device/tun/unix.rs +++ b/client/lib/src/device/tun/unix.rs @@ -36,14 +36,24 @@ pub fn create_async_tun(name: &str, mtu: u32, address:&[AllowedIP], // IFF_NO_PI preventing excessive buffer reallocating config.packet_information(false); }); + let mut device = tun::create_as_async(&config).context(format!("create tun/tap fail"))?; let pi = device.get_mut().has_packet_information(); let name = device.get_ref().name().to_string(); for add in address { - sys::set_alias(&name, add)?; - sys::set_route(&name, add)?; - tracing::info!("set alias and route:{}", &add.to_string()); + #[cfg(target_os = "macos")] + { + sys::set_alias(&name, add)?; + sys::set_route(&name, add)?; + tracing::info!("set alias and route:{}", &add.to_string()); + } + #[cfg(target_os = "linux")] + { + sys::set_address(&name, add)?; + tracing::info!("set address:{}", &add.to_string()); + } + } let (tun_read,tun_write) = tokio::io::split(device); diff --git a/client/lib/src/device/unix_device.rs b/client/lib/src/device/unix_device.rs index b5f0a54..7a0aa42 100644 --- a/client/lib/src/device/unix_device.rs +++ b/client/lib/src/device/unix_device.rs @@ -85,6 +85,7 @@ impl Device { let task:JoinHandle<()> = tokio::spawn(async move { loop { + tokio::select! { _ = device::rate_limiter_timer(&rate_limiter) => {} _ = device::tcp_peers_timer( @@ -98,24 +99,29 @@ impl Device { ) => {} // iface listen Ok(len) = iface_reader.read(&mut tun_src_buf) => { - let src_buf = if pi { - &tun_src_buf[4..(len+4)] - } else { - &tun_src_buf[0..len] - }; - device::tun_read_tcp_handle(&peers, src_buf, &mut tun_dst_buf).await; + if len > 0 { + let src_buf = if pi { + &tun_src_buf[4..(len+4)] + } else { + &tun_src_buf[0..len] + }; + device::tun_read_tcp_handle(&peers, src_buf, &mut tun_dst_buf).await; + } } //_ = device::tcp_listener_handler(&tcp4, key_pair.clone(), rate_limiter.clone(), Arc::clone(&peers), Arc::clone(&iface_writer), pi) => {break} - _ = device::tcp_listener_handler(&tcp6, key_pair.clone(), rate_limiter.clone(), Arc::clone(&peers), Arc::clone(&iface_writer), pi) => {break} + _ = device::tcp_listener_handler(&tcp6, key_pair.clone(), rate_limiter.clone(), Arc::clone(&peers), Arc::clone(&iface_writer), pi) => { + break; + } + } } }); (port, task) } }; - let device = Device { - device_data: DeviceData::new(name,peers1, key_pair1, port, scripts), + device_data: DeviceData::new(name,peers1, key_pair1, port, scripts, node_type, + ), task, protocol, }; @@ -128,6 +134,7 @@ impl Device { pub async fn close(&mut self) { self.task.abort();// close all connections. + //tracing::debug!("tun/rpc task is finish: {}", self.task.is_finished()); self.device_data.close().await; // task abort would cost some time. tokio::time::sleep(Duration::from_secs(2)).await; @@ -148,6 +155,13 @@ impl DerefMut for Device { } } +impl Drop for Device { + fn drop(&mut self) { + self.task.abort(); + tracing::debug!("device has been dropped"); + } +} + cfg_if!{ //#[cfg(any(target_os = "macos", target_os = "ios"))] if #[cfg(target_os = "macos")] { diff --git a/client/lib/src/sc_manager.rs b/client/lib/src/sc_manager.rs index 60b9ffc..4fa63a2 100644 --- a/client/lib/src/sc_manager.rs +++ b/client/lib/src/sc_manager.rs @@ -87,6 +87,7 @@ impl SCManager { let _ = sender.send( ServerMessage::StopWR("node has been forbid or delete".to_owned()) ).await; + deduplication.wr_config = None; } _ => { // this would conflict with Info::Config message, so ignore this. diff --git a/client/lib/src/wr_manager.rs b/client/lib/src/wr_manager.rs index 50e78fb..b8b57fc 100644 --- a/client/lib/src/wr_manager.rs +++ b/client/lib/src/wr_manager.rs @@ -1,5 +1,6 @@ use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; +use std::time::Duration; use anyhow::anyhow; use serde_derive::{Deserialize, Serialize}; use crate::config::{Config, Identity}; @@ -63,14 +64,22 @@ impl WRManager { //TODO: check if need restart // if interface not equal, restart // check peers, remove or add new ones. - self.close().await; - tracing::info!("close device before restart"); + let has_alive = self.is_alive(); + if has_alive { + let node_type = self.device.as_ref().map(|x|x.node_type).unwrap_or(NodeType::NodeClient); + tracing::info!("close device"); + self.close().await; + let sleep_time = if node_type == NodeType::NodeRelay {10} else {20}; + tokio::time::sleep(Duration::from_secs(sleep_time)).await; + } + let tun_name = config.get_tun_name(); let protocol = Protocol::from_i32(interface.protocol).unwrap_or(Protocol::Udp); let node_type = NodeType::from_i32(wr_config.r#type).unwrap(); let scripts = Scripts::load_from_interface(&interface); let key_pair = (config.identity.x25519_sk.clone(), config.identity.x25519_pk.clone()); + tracing::debug!("begin to start device"); let wr_interface = Device::new( &tun_name, &address,