From 9ad2ec167d003409b8ba0aa8506b269d7e568e7e Mon Sep 17 00:00:00 2001 From: timzaak Date: Mon, 19 Jun 2023 15:12:07 +0800 Subject: [PATCH 01/11] add test case --- client/lib/Cargo.toml | 6 +- client/lib/src/device/mod.rs | 169 +++++++++++++++++++++++++---------- 2 files changed, 124 insertions(+), 51 deletions(-) diff --git a/client/lib/Cargo.toml b/client/lib/Cargo.toml index 55b9227..3c9fd7b 100644 --- a/client/lib/Cargo.toml +++ b/client/lib/Cargo.toml @@ -61,8 +61,6 @@ paho-mqtt = "0.12" - - [dependencies.boringtun] version = "0.5.2" path = "../../third/boringtun/boringtun" @@ -76,5 +74,9 @@ windows = {version = "0.44", features=["Win32_Networking_WinSock"]} cidr-utils = "0.5.10" #winapi = {version = "0.3", features=["ws2def"]} + + + + [build-dependencies] tonic-build = "0.8" \ No newline at end of file diff --git a/client/lib/src/device/mod.rs b/client/lib/src/device/mod.rs index e7dc2db..9f2a3f0 100644 --- a/client/lib/src/device/mod.rs +++ b/client/lib/src/device/mod.rs @@ -32,7 +32,8 @@ use prost::bytes::BufMut; use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::sync::{Mutex, RwLock}; use tokio::time; -use tokio::io::{AsyncReadExt, AsyncWriteExt};//keep +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +//keep use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use allowed_ips::AllowedIps; @@ -46,7 +47,7 @@ use self::tun::WritePart; const HANDSHAKE_RATE_LIMIT: u64 = 100; // The number of handshakes per second we can tolerate before using cookies const MAX_UDP_SIZE: usize = (1 << 16) - 1; -const MAX_TCP_SIZE: usize = (1 << 16) -1; +const MAX_TCP_SIZE: usize = (1 << 16) - 1; // const MAX_ITR: usize = 100; // Number of packets to handle per handler call #[derive(Debug)] @@ -147,7 +148,7 @@ impl DeviceData { peers: Arc>, key_pair: (x25519_dalek::StaticSecret, x25519_dalek::PublicKey), listen_port: u16, - scripts:Scripts, + scripts: Scripts, ) -> Self { Self { name, @@ -300,7 +301,7 @@ pub async fn rate_limiter_timer(rate_limiter: &Arc) { pub async fn peers_timer(peers: &Arc>, udp4: &UdpSocket, udp6: &UdpSocket) { let mut interval = time::interval(Duration::from_millis(250)); - let mut dst_buf: Vec= vec![0; MAX_UDP_SIZE]; + let mut dst_buf: Vec = vec![0; MAX_UDP_SIZE]; loop { interval.tick().await; @@ -319,7 +320,6 @@ pub async fn peers_timer(peers: &Arc>, udp4: &UdpSocket, udp6: &Ud } TunnResult::Err(e) => tracing::error!(message = "Timer error", error = ?e), TunnResult::WriteToNetwork(packet) => { - let _ = match endpoint_addr { SocketAddr::V4(_) => udp4.send_to(packet, endpoint_addr).await, SocketAddr::V6(_) => udp6.send_to(packet, endpoint_addr).await, @@ -341,7 +341,7 @@ pub async fn tcp_peers_timer( node_type: NodeType, ) { let mut interval = time::interval(Duration::from_millis(250)); - let mut dst_buf: Vec= vec![0; MAX_UDP_SIZE]; + let mut dst_buf: Vec = vec![0; MAX_UDP_SIZE]; loop { interval.tick().await; @@ -361,7 +361,7 @@ pub async fn tcp_peers_timer( let (reader, writer) = conn.into_split(); p.endpoint.tcp_conn = TcpConnection::Connected(writer); tcp_handler(reader, WriterState::PeerWriter(peer.clone()), endpoint_addr, key_pair.clone(), rate_limiter.clone(), peers.clone(), iface.clone(), pi); - }, + } Err(error) => { tracing::debug!("connect {endpoint_addr:?} failure, error: {error:?}"); p.endpoint.tcp_conn = TcpConnection::ConnectedFailure(error) @@ -387,7 +387,6 @@ pub async fn tcp_peers_timer( if let TcpConnection::Connected(connection) = &mut p.endpoint.tcp_conn { let _ = connection.write_all(packet).await; } - } _ => tracing::warn!("Unexpected result from update_timers"), }; @@ -451,7 +450,7 @@ pub async fn udp_handler(udp: &UdpSocket, } TunnResult::WriteToTunnelV4(packet, addr) => { // tracing::debug!("{addr:?}"); - if p.is_allowed_ip(addr) { + if p.is_allowed_ip(addr) { if pi { let mut buf: Vec = Vec::new(); buf.put_slice(&IP4_HEADER); @@ -496,7 +495,6 @@ pub async fn udp_handler(udp: &UdpSocket, } } }; - } } }; @@ -506,11 +504,9 @@ pub async fn udp_handler(udp: &UdpSocket, while let TunnResult::WriteToNetwork(packet) = p.tunnel.decapsulate(None, &[], &mut dst_buf[..]) - { + { let _ = udp.send_to(packet, addr).await; } - - } p.set_endpoint(addr); } @@ -528,18 +524,19 @@ pub async fn tcp_listener_handler( peers: Arc>, iface: Arc>, pi: bool, -) ->anyhow::Result<()> { +) -> anyhow::Result<()> { loop { let (socket, addr) = listener.accept().await?; let key_pair = key_pair.clone(); let rate_limiter = rate_limiter.clone(); let peers = peers.clone(); let iface = iface.clone(); - let (reader, writer ) = socket.into_split(); - tcp_handler(reader, WriterState::PureWriter(writer), addr,key_pair, rate_limiter, peers, iface, pi); + let (reader, writer) = socket.into_split(); + tcp_handler(reader, WriterState::PureWriter(writer), addr, key_pair, rate_limiter, peers, iface, pi); } //Ok(()) } + pub fn tcp_handler( //socket: TcpStream, reader: OwnedReadHalf, @@ -568,12 +565,12 @@ pub fn tcp_handler( match &mut writer { WriterState::PureWriter(writer) => { let _ = writer.write_all(cookie).await; - }, - WriterState::PeerWriter(peer)=> { + } + WriterState::PeerWriter(peer) => { let mut p = peer.lock().await; if let TcpConnection::Connected(w) = &mut p.endpoint.tcp_conn { let _ = w.write_all(cookie).await; - }else { + } else { tracing::warn!("should not come here"); } } @@ -603,7 +600,7 @@ pub fn tcp_handler( let mut p = peer.lock().await; if let TcpConnection::Nothing | TcpConnection::ConnectedFailure(_) = p.endpoint.tcp_conn { if let WriterState::PureWriter(_) = &mut writer { - let pure_writer = mem::replace(&mut writer,WriterState::PeerWriter(peer.clone())); + let pure_writer = mem::replace(&mut writer, WriterState::PeerWriter(peer.clone())); if let WriterState::PureWriter(_writer) = pure_writer { p.endpoint.tcp_conn = TcpConnection::Connected(_writer); } @@ -626,7 +623,7 @@ pub fn tcp_handler( } TunnResult::WriteToTunnelV4(packet, addr) => { // tracing::debug!("{addr:?}"); - if p.is_allowed_ip(addr) { + if p.is_allowed_ip(addr) { if pi { let mut buf: Vec = Vec::new(); buf.put_slice(&IP4_HEADER); @@ -638,61 +635,60 @@ pub fn tcp_handler( let _ = iface.lock().await.write(&buf).await; } } - } else { - cfg_if! { + } else { + cfg_if! { if #[cfg(target_os="windows")] { let _ = iface.lock().await.write(&packet); } else { let _ = iface.lock().await.write(&packet).await; } } - } - } else {} - } - TunnResult::WriteToTunnelV6(packet, addr) => { - if p.is_allowed_ip(addr) { - if pi { - let mut buf: Vec = Vec::new(); - buf.put_slice(&IP6_HEADER); - buf.put_slice(&packet); - cfg_if! { + } + } else {} + } + TunnResult::WriteToTunnelV6(packet, addr) => { + if p.is_allowed_ip(addr) { + if pi { + let mut buf: Vec = Vec::new(); + buf.put_slice(&IP6_HEADER); + buf.put_slice(&packet); + cfg_if! { if #[cfg(target_os="windows")] { let _ = iface.lock().await.write(&buf); } else { let _ = iface.lock().await.write(&buf).await; } } - } else { - cfg_if! { + } else { + cfg_if! { if #[cfg(target_os="windows")] { let _ = iface.lock().await.write(packet); } else { let _ = iface.lock().await.write(packet).await; } } - }; - } + }; } - }; + } + }; - if flush { - // Flush pending queue - while let TunnResult::WriteToNetwork(packet) = - p.tunnel.decapsulate(None, &[], &mut dst_buf[..]) - { - if let TcpConnection::Connected(conn) = &mut p.endpoint.tcp_conn { - let _ = conn.write_all(packet).await; - } + if flush { + // Flush pending queue + while let TunnResult::WriteToNetwork(packet) = + p.tunnel.decapsulate(None, &[], &mut dst_buf[..]) + { + if let TcpConnection::Connected(conn) = &mut p.endpoint.tcp_conn { + let _ = conn.write_all(packet).await; } } } } - tracing::info!("tcp: {addr:?} close"); - }); + } + tracing::info!("tcp: {addr:?} close"); + }); } - pub struct Peers { pub by_key: HashMap>>, pub by_ip: AllowedIps>>, @@ -708,3 +704,78 @@ impl Default for Peers { } } } + + +#[cfg(test)] +mod test { + use std::time::Duration; + use ed25519_compact::KeyPair; + use crate::device::Device; + //use tracing_test::traced_test; + use crate::config::Identity; + use std::str::FromStr; + use crate::device::peer::AllowedIP; + use crate::device::script_run::Scripts; + use crate::protobuf::config::{NodeType, Protocol}; + + fn new_udp_client_device() -> Device{ + let allowed_addr = vec![AllowedIP::from_str("10.0.0.1/32").unwrap()]; + let identity = Identity::new(); + let key_pair = (identity.x25519_sk, identity.x25519_pk); + let port = Some(25516); + let mtu = 1000; + let scripts = Scripts::default(); + let protocol = Protocol::Tcp; + let node_type = NodeType::NodeClient; + + let mut device = Device::new( + "for0", + &allowed_addr, + key_pair, + port, mtu, scripts, protocol, node_type, + ).unwrap(); + device + } + + fn new_client(protocol: Protocol, node_type:NodeType, allowed_ip:&str) ->Device { + let allowed_addr = vec![AllowedIP::from_str(allowed_ip).unwrap()]; + let identity = Identity::new(); + let key_pair = (identity.x25519_sk, identity.x25519_pk); + let port = Some(25516); + let mtu = 1000; + let scripts = Scripts::default(); + + let mut device = Device::new( + "for0", + &allowed_addr, + key_pair, + port, mtu, scripts, protocol, node_type, + ).unwrap(); + device + } + + + //sudo cargo test --lib device::test::udp_client_device_new_and_close + #[tokio::test] + pub async fn udp_client_device_new_and_close() { + let mut device = new_udp_client_device(); + tokio::time::sleep(Duration::from_secs(5)).await; + device.close().await; + tokio::time::sleep(Duration::from_secs(5)).await; + let mut device = new_udp_client_device(); + tokio::time::sleep(Duration::from_secs(5)).await; + device.close().await; + } + + //sudo cargo test --lib device::test::tcp_client_device_new_and_close + #[tokio::test] + pub async fn tcp_client_device_new_and_close() { + let mut device = new_client(Protocol::Tcp, NodeType::NodeClient, "10.0.0.1/32"); + tokio::time::sleep(Duration::from_secs(5)).await; + device.close().await; + tokio::time::sleep(Duration::from_secs(5)).await; + let mut device = new_client(Protocol::Tcp, NodeType::NodeClient, "10.0.0.1/32"); + tokio::time::sleep(Duration::from_secs(5)).await; + device.close().await; + } +} \ No newline at end of file From 376e456d3a98c91acdbbd05e6e75178b6b08e4ff Mon Sep 17 00:00:00 2001 From: timzaak Date: Mon, 19 Jun 2023 15:17:59 +0800 Subject: [PATCH 02/11] bak --- client/lib/src/device/mod.rs | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/client/lib/src/device/mod.rs b/client/lib/src/device/mod.rs index 9f2a3f0..06de441 100644 --- a/client/lib/src/device/mod.rs +++ b/client/lib/src/device/mod.rs @@ -709,7 +709,6 @@ impl Default for Peers { #[cfg(test)] mod test { use std::time::Duration; - use ed25519_compact::KeyPair; use crate::device::Device; //use tracing_test::traced_test; use crate::config::Identity; @@ -718,24 +717,6 @@ mod test { use crate::device::script_run::Scripts; use crate::protobuf::config::{NodeType, Protocol}; - fn new_udp_client_device() -> Device{ - let allowed_addr = vec![AllowedIP::from_str("10.0.0.1/32").unwrap()]; - let identity = Identity::new(); - let key_pair = (identity.x25519_sk, identity.x25519_pk); - let port = Some(25516); - let mtu = 1000; - let scripts = Scripts::default(); - let protocol = Protocol::Tcp; - let node_type = NodeType::NodeClient; - - let mut device = Device::new( - "for0", - &allowed_addr, - key_pair, - port, mtu, scripts, protocol, node_type, - ).unwrap(); - device - } fn new_client(protocol: Protocol, node_type:NodeType, allowed_ip:&str) ->Device { let allowed_addr = vec![AllowedIP::from_str(allowed_ip).unwrap()]; @@ -758,11 +739,11 @@ mod test { //sudo cargo test --lib device::test::udp_client_device_new_and_close #[tokio::test] pub async fn udp_client_device_new_and_close() { - let mut device = new_udp_client_device(); + let mut device = new_client(Protocol::Udp, NodeType::NodeClient, "10.0.0.1:32"); tokio::time::sleep(Duration::from_secs(5)).await; device.close().await; tokio::time::sleep(Duration::from_secs(5)).await; - let mut device = new_udp_client_device(); + let mut device = new_client(Protocol::Udp, NodeType::NodeClient, "10.0.0.1:32"); tokio::time::sleep(Duration::from_secs(5)).await; device.close().await; } From d4673afbdc1f607c743bd6128a0a97cef0410854 Mon Sep 17 00:00:00 2001 From: timzaak Date: Mon, 19 Jun 2023 21:20:30 +0800 Subject: [PATCH 03/11] bak --- admin-web/src/view/network/NetworkDetailPage.tsx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/admin-web/src/view/network/NetworkDetailPage.tsx b/admin-web/src/view/network/NetworkDetailPage.tsx index bd099cc..b1887eb 100644 --- a/admin-web/src/view/network/NetworkDetailPage.tsx +++ b/admin-web/src/view/network/NetworkDetailPage.tsx @@ -67,7 +67,7 @@ export default function NetworkDetailPage() { - @@ -84,4 +84,4 @@ export default function NetworkDetailPage() { // add Nodes Navigator, Invite Code // // -// \ No newline at end of file +// From 4242451e39fdafad42143c1ce207c27fa679a611 Mon Sep 17 00:00:00 2001 From: timzaak Date: Sat, 24 Jun 2023 12:29:45 +0800 Subject: [PATCH 04/11] bak it --- client/lib/src/device/mod.rs | 5 +++ client/lib/src/device/tun/sys/linux.rs | 4 +-- client/lib/src/device/tun/sys/macos.rs | 8 ++++- client/lib/src/device/tun/sys/mod.rs | 45 ++++++-------------------- client/lib/src/device/tun/unix.rs | 1 + client/lib/src/device/unix_device.rs | 9 +++++- client/lib/src/sc_manager.rs | 1 + 7 files changed, 34 insertions(+), 39 deletions(-) diff --git a/client/lib/src/device/mod.rs b/client/lib/src/device/mod.rs index ea08fd2..d0a236d 100644 --- a/client/lib/src/device/mod.rs +++ b/client/lib/src/device/mod.rs @@ -231,6 +231,11 @@ impl DeviceData { impl Drop for DeviceData { fn drop(&mut self) { let _ = run_opt_script(&self.scripts.post_down); + //TODO: iface should be destroy + #[cfg(target_os = "macos")] + if let Err(e) = tun::sys::destroy_iface(&self.name) { + tracing::error!("remove route error: {e}"); + } } } diff --git a/client/lib/src/device/tun/sys/linux.rs b/client/lib/src/device/tun/sys/linux.rs index df7a248..36f4fe8 100644 --- a/client/lib/src/device/tun/sys/linux.rs +++ b/client/lib/src/device/tun/sys/linux.rs @@ -17,7 +17,7 @@ pub fn set_route(iface_name:&str, allowed_ip: &AllowedIP) -> anyhow::Result<()> } pub fn remove_route(iface_name:&str, allowed_ip:&AllowedIP) -> anyhow::Result<()> { - let inet = if allowed_ip.addr.is_ipv4() { "-net" } else { "-6net" }; - Command::new("route").args(&["del", inet,&allowed_ip.to_string(),"dev", iface_name]).status()?; + //let inet = if allowed_ip.addr.is_ipv4() { "-net" } else { "-6net" }; + //Command::new("route").args(&["del", inet,&allowed_ip.to_string(),"dev", iface_name]).status()?; Ok(()) } \ No newline at end of file diff --git a/client/lib/src/device/tun/sys/macos.rs b/client/lib/src/device/tun/sys/macos.rs index 7a35f7f..bfb2f86 100644 --- a/client/lib/src/device/tun/sys/macos.rs +++ b/client/lib/src/device/tun/sys/macos.rs @@ -1,5 +1,6 @@ use std::process::Command; use crate::device::peer::AllowedIP; +use cmd_lib::run_cmd; pub fn set_alias(iface_name: &str, address: &AllowedIP) -> anyhow::Result<()> { @@ -20,6 +21,11 @@ pub fn set_route(iface_name: &str, allowed_ip: &AllowedIP) -> anyhow::Result<()> pub fn remove_route(iface_name:&str, allowed_ip:&AllowedIP) -> anyhow::Result<()> { let inet = if allowed_ip.addr.is_ipv4() { "-inet" } else { "-inet6" }; - Command::new("route").args(&["-q", "-n", "delete", inet, &allowed_ip.to_string()]).status()?; + //Command::new("route").args(&["-q", "-n", "delete", inet, &allowed_ip.to_string(), "-interface", iface_name]).status()?; + Ok(()) +} + +pub fn destroy_iface(iface_name:&str) -> anyhow::Result<()> { + let r = run_cmd!(ifconfig ${iface_name} delete)?; Ok(()) } diff --git a/client/lib/src/device/tun/sys/mod.rs b/client/lib/src/device/tun/sys/mod.rs index 1bf319b..5a45b9d 100644 --- a/client/lib/src/device/tun/sys/mod.rs +++ b/client/lib/src/device/tun/sys/mod.rs @@ -1,36 +1,11 @@ -use crate::device::peer::AllowedIP; - -#[cfg(target_os = "macos")] -mod macos; -#[cfg(target_os = "linux")] -mod linux; - - -pub fn set_alias(iface_name:&str, address: &AllowedIP) -> anyhow::Result<()> { - #[cfg(target_os = "macos")] - macos::set_alias(iface_name, address)?; - - #[cfg(target_os = "linux")] - linux::add_addr(iface_name, address)?; - - Ok(()) +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(target_os = "macos")] { + mod macos; + pub use macos::{set_route,set_alias, destroy_iface, remove_route}; + } else if #[cfg(target_os = "linux")]{ + mod linux; + pub use macos::{set_route,set_alias, remove_route}; + } } - -pub fn set_route(iface_name:&str, allowed_ip: &AllowedIP) -> anyhow::Result<()> { - #[cfg(target_os = "macos")] - macos::set_route(iface_name, allowed_ip)?; - - #[cfg(target_os = "linux")] - linux::set_route(iface_name, allowed_ip)?; - Ok(()) -} - -pub fn remove_route(iface_name:&str, allowed_ip:&AllowedIP) -> anyhow::Result<()> { - #[cfg(target_os = "macos")] - macos::remove_route(iface_name, allowed_ip)?; - - #[cfg(target_os ="linux")] - linux::remove_route(iface_name, allowed_ip)?; - - Ok(()) -} \ No newline at end of file diff --git a/client/lib/src/device/tun/unix.rs b/client/lib/src/device/tun/unix.rs index 65e1156..0e3e980 100644 --- a/client/lib/src/device/tun/unix.rs +++ b/client/lib/src/device/tun/unix.rs @@ -36,6 +36,7 @@ pub fn create_async_tun(name: &str, mtu: u32, address:&[AllowedIP], // IFF_NO_PI preventing excessive buffer reallocating config.packet_information(false); }); + let mut device = tun::create_as_async(&config).context(format!("create tun/tap fail"))?; let pi = device.get_mut().has_packet_information(); let name = device.get_ref().name().to_string(); diff --git a/client/lib/src/device/unix_device.rs b/client/lib/src/device/unix_device.rs index b5f0a54..a4aa1ea 100644 --- a/client/lib/src/device/unix_device.rs +++ b/client/lib/src/device/unix_device.rs @@ -115,7 +115,8 @@ impl Device { }; let device = Device { - device_data: DeviceData::new(name,peers1, key_pair1, port, scripts), + device_data: DeviceData::new(name,peers1, key_pair1, port, scripts, + ), task, protocol, }; @@ -148,6 +149,12 @@ impl DerefMut for Device { } } +impl Drop for Device { + fn drop(&mut self) { + tracing::debug!("device has been dropped"); + } +} + cfg_if!{ //#[cfg(any(target_os = "macos", target_os = "ios"))] if #[cfg(target_os = "macos")] { diff --git a/client/lib/src/sc_manager.rs b/client/lib/src/sc_manager.rs index 60b9ffc..4fa63a2 100644 --- a/client/lib/src/sc_manager.rs +++ b/client/lib/src/sc_manager.rs @@ -87,6 +87,7 @@ impl SCManager { let _ = sender.send( ServerMessage::StopWR("node has been forbid or delete".to_owned()) ).await; + deduplication.wr_config = None; } _ => { // this would conflict with Info::Config message, so ignore this. From 6e82bce25dac252e3ce10c9c16fc8ce3b29047cc Mon Sep 17 00:00:00 2001 From: Timzaak Date: Sat, 24 Jun 2023 12:38:59 +0800 Subject: [PATCH 05/11] bak --- .../pubsub/NodeChangeNotifyService.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/backend/src/main/scala/com/timzaak/fornet/pubsub/NodeChangeNotifyService.scala b/backend/src/main/scala/com/timzaak/fornet/pubsub/NodeChangeNotifyService.scala index dff44a5..d0a9689 100644 --- a/backend/src/main/scala/com/timzaak/fornet/pubsub/NodeChangeNotifyService.scala +++ b/backend/src/main/scala/com/timzaak/fornet/pubsub/NodeChangeNotifyService.scala @@ -119,16 +119,6 @@ class NodeChangeNotifyService( val network = networkDao.findById(node.networkId).get val peer = EntityConvert.toPeer(node, network) - connectionManager.sendNetworkMessage( - node.networkId, - NetworkMessage( - networkId = networkId, - NetworkMessage.Info.Peer( - PeerChange(addPeer = Some(peer)) - ) - ) - ) - val notifyNodes = nodeService.getAllRelativeNodes(node) connectionManager.sendClientMessage( @@ -142,6 +132,16 @@ class NodeChangeNotifyService( ), ) ) + + connectionManager.sendNetworkMessage( + node.networkId, + NetworkMessage( + networkId = networkId, + NetworkMessage.Info.Peer( + PeerChange(addPeer = Some(peer)) + ) + ) + ) case _ => // do nothing. } From 53a7d019e43f826e29077cf67ea600c2246c4506 Mon Sep 17 00:00:00 2001 From: Timzaak Date: Sat, 24 Jun 2023 14:17:41 +0800 Subject: [PATCH 06/11] bak it --- client/lib/src/device/mod.rs | 32 ++++++++++++++++++-------- client/lib/src/device/peer.rs | 10 +++++++- client/lib/src/device/tun/sys/linux.rs | 7 +++--- client/lib/src/device/tun/sys/mod.rs | 2 +- client/lib/src/device/tun/unix.rs | 15 +++++++++--- client/lib/src/device/unix_device.rs | 1 + 6 files changed, 50 insertions(+), 17 deletions(-) diff --git a/client/lib/src/device/mod.rs b/client/lib/src/device/mod.rs index d0a236d..1fe976a 100644 --- a/client/lib/src/device/mod.rs +++ b/client/lib/src/device/mod.rs @@ -208,14 +208,13 @@ impl DeviceData { let peer = Peer::new(tunn, next_index, endpoint, allowed_ips, ip, preshared_key); let peer = Arc::new(Mutex::new(peer)); let mut peers = self.peers.write().await; - - peers.by_key.insert(pub_key, Arc::clone(&peer)); peers.by_idx.insert(next_index, Arc::clone(&peer)); for AllowedIP { addr, cidr } in allowed_ips { peers.by_ip .insert(*addr, *cidr as _, Arc::clone(&peer)); } + peers.by_key.insert(pub_key, peer.clone()); tracing::info!("Peer added"); } @@ -379,11 +378,13 @@ pub async fn tcp_peers_timer( } continue; } - TcpConnection::Connecting(_) => { + TcpConnection::Connecting(_)| TcpConnection::End => { //TODO: add check of time, and reconnect continue; } - _ => {} + _ => { + tracing::warn!("should not come here"); + } }; match p.update_timers(&mut dst_buf) { TunnResult::Done => {} @@ -600,15 +601,23 @@ pub fn tcp_handler( Some(peer) => peer, }; + let mut p = peer.lock().await; - if let TcpConnection::Nothing | TcpConnection::ConnectedFailure(..) = p.endpoint.tcp_conn { - if let WriterState::PureWriter(_) = &mut writer { - let pure_writer = mem::replace(&mut writer, WriterState::PeerWriter(peer.clone())); - if let WriterState::PureWriter(_writer) = pure_writer { - p.endpoint.tcp_conn = TcpConnection::Connected(_writer); + match p.endpoint.tcp_conn { + TcpConnection::Nothing | TcpConnection::ConnectedFailure(..) => { + if let WriterState::PureWriter(_) = &mut writer { + let pure_writer = mem::replace(&mut writer, WriterState::PeerWriter(peer.clone())); + if let WriterState::PureWriter(_writer) = pure_writer { + p.endpoint.tcp_conn = TcpConnection::Connected(_writer); + } } } + TcpConnection::End => { + break; + } + _ => {} } + // We found a peer, use it to decapsulate the message+ let mut flush = false; // Are there packets to send from the queue? match p @@ -757,4 +766,9 @@ mod test { tokio::time::sleep(Duration::from_secs(5)).await; device.close().await; } + + #[tokio::test] + pub async fn tcp_split_can_close() { + + } } \ No newline at end of file diff --git a/client/lib/src/device/peer.rs b/client/lib/src/device/peer.rs index 7e7753e..6b0fdf4 100644 --- a/client/lib/src/device/peer.rs +++ b/client/lib/src/device/peer.rs @@ -22,6 +22,7 @@ pub enum TcpConnection { Connecting(SystemTime), Connected(OwnedWriteHalf), ConnectedFailure(std::io::Error, SystemTime), + End, } #[derive(Debug)] pub struct Endpoint { @@ -42,6 +43,8 @@ impl Endpoint { self.tcp_conn = TcpConnection::ConnectedFailure(e, SystemTime::now()); } }; + } else if let TcpConnection::End = self.tcp_conn { + tracing::debug!("the tcp conn has ended"); } } } @@ -56,6 +59,11 @@ pub struct Peer { pub ip: IpAddr, preshared_key: Option<[u8; 32]>, } +impl Drop for Peer { + fn drop(&mut self) { + tracing::debug!("peer: {} has been dropped", self.index); + } +} #[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)] pub struct AllowedIP { @@ -122,9 +130,9 @@ impl Peer { if let Some(_) = &mut self.endpoint.udp_conn.take() { tracing::info!("disconnecting from endpoint"); } else if let TcpConnection::Connected(_) = &mut self.endpoint.tcp_conn { + self.endpoint.tcp_conn = TcpConnection::End; tracing::info!("disconnecting tcp connection"); } - self.endpoint.tcp_conn = TcpConnection::Nothing; } pub fn set_endpoint(&mut self, addr: SocketAddr) { diff --git a/client/lib/src/device/tun/sys/linux.rs b/client/lib/src/device/tun/sys/linux.rs index 36f4fe8..eb28e5b 100644 --- a/client/lib/src/device/tun/sys/linux.rs +++ b/client/lib/src/device/tun/sys/linux.rs @@ -1,13 +1,13 @@ use std::process::Command; use crate::device::peer::AllowedIP; -pub fn add_addr(iface_name:&str, address: &AllowedIP) -> anyhow::Result<()> { +pub fn set_address(iface_name:&str, address: &AllowedIP) -> anyhow::Result<()> { let inet = if address.addr.is_ipv4() { "-4" } else { "-6" }; Command::new("ip").args(&[inet, "address", "add", &address.to_string(), "dev", iface_name]).status()?; Ok(()) } - +/* pub fn set_route(iface_name:&str, allowed_ip: &AllowedIP) -> anyhow::Result<()> { //TODO: support allowed_ip is 0.0.0.0/0 // ip -4 route add 10.0.0.1/24 dev ForT @@ -20,4 +20,5 @@ pub fn remove_route(iface_name:&str, allowed_ip:&AllowedIP) -> anyhow::Result<() //let inet = if allowed_ip.addr.is_ipv4() { "-net" } else { "-6net" }; //Command::new("route").args(&["del", inet,&allowed_ip.to_string(),"dev", iface_name]).status()?; Ok(()) -} \ No newline at end of file +} +*/ \ No newline at end of file diff --git a/client/lib/src/device/tun/sys/mod.rs b/client/lib/src/device/tun/sys/mod.rs index 5a45b9d..00f5c9f 100644 --- a/client/lib/src/device/tun/sys/mod.rs +++ b/client/lib/src/device/tun/sys/mod.rs @@ -6,6 +6,6 @@ cfg_if! { pub use macos::{set_route,set_alias, destroy_iface, remove_route}; } else if #[cfg(target_os = "linux")]{ mod linux; - pub use macos::{set_route,set_alias, remove_route}; + pub use linux::{set_address}; } } diff --git a/client/lib/src/device/tun/unix.rs b/client/lib/src/device/tun/unix.rs index 0e3e980..b6c9c8f 100644 --- a/client/lib/src/device/tun/unix.rs +++ b/client/lib/src/device/tun/unix.rs @@ -42,9 +42,18 @@ pub fn create_async_tun(name: &str, mtu: u32, address:&[AllowedIP], let name = device.get_ref().name().to_string(); for add in address { - sys::set_alias(&name, add)?; - sys::set_route(&name, add)?; - tracing::info!("set alias and route:{}", &add.to_string()); + #[cfg(target_os = "macos")] + { + sys::set_alias(&name, add)?; + sys::set_route(&name, add)?; + tracing::info!("set alias and route:{}", &add.to_string()); + } + #[cfg(target_os = "linux")] + { + sys::set_address(&name, add)?; + tracing::info!("set address:{}", &add.to_string()); + } + } let (tun_read,tun_write) = tokio::io::split(device); diff --git a/client/lib/src/device/unix_device.rs b/client/lib/src/device/unix_device.rs index a4aa1ea..a0f06d1 100644 --- a/client/lib/src/device/unix_device.rs +++ b/client/lib/src/device/unix_device.rs @@ -151,6 +151,7 @@ impl DerefMut for Device { impl Drop for Device { fn drop(&mut self) { + self.task.abort(); tracing::debug!("device has been dropped"); } } From 8a764f9ad0651a9025e9073d94085509c7b98d99 Mon Sep 17 00:00:00 2001 From: timzaak Date: Sat, 24 Jun 2023 21:29:54 +0800 Subject: [PATCH 07/11] bak it --- client/.cargo/config.toml | 3 + client/Cargo.lock | 137 +++++++++++++++++++- client/bin/Cargo.toml | 3 +- client/bin/src/fornet.rs | 53 ++++---- client/bin/src/fornet_cli.rs | 2 +- client/lib/examples/tokio_select_example.rs | 28 ++++ client/lib/src/device/mod.rs | 22 +--- client/lib/src/device/tun/sys/macos.rs | 7 +- client/lib/src/device/tun/sys/mod.rs | 2 +- client/lib/src/device/unix_device.rs | 28 ++-- 10 files changed, 221 insertions(+), 64 deletions(-) create mode 100644 client/.cargo/config.toml create mode 100644 client/lib/examples/tokio_select_example.rs diff --git a/client/.cargo/config.toml b/client/.cargo/config.toml new file mode 100644 index 0000000..2d3ed61 --- /dev/null +++ b/client/.cargo/config.toml @@ -0,0 +1,3 @@ +[build] +# This is for tokio-console +#rustflags = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/client/Cargo.lock b/client/Cargo.lock index e4fbd3d..05a0075 100644 --- a/client/Cargo.lock +++ b/client/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aead" version = "0.5.1" @@ -419,6 +425,42 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console-api" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" +dependencies = [ + "prost", + "prost-types", + "tonic 0.9.2", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57ab2224a0311582eb03adba4caaf18644f7b1f10a760803a803b9b605187fc7" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures", + "hdrhistogram", + "humantime", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic 0.9.2", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -454,6 +496,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-channel" version = "0.5.7" @@ -704,6 +755,16 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "flate2" +version = "1.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flutter_rust_bridge" version = "1.72.0" @@ -771,6 +832,7 @@ version = "0.0.3" dependencies = [ "anyhow", "clap", + "console-subscriber", "fornet-lib", "serde", "serde_json", @@ -813,7 +875,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util 0.6.10", - "tonic", + "tonic 0.8.3", "tonic-build", "tracing", "tracing-subscriber", @@ -1004,6 +1066,19 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hdrhistogram" +version = "7.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" +dependencies = [ + "base64 0.13.1", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "heck" version = "0.4.1" @@ -1083,6 +1158,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hwaddr" version = "0.1.7" @@ -1384,6 +1465,21 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + +[[package]] +name = "miniz_oxide" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.6" @@ -1434,6 +1530,16 @@ dependencies = [ "pin-utils", ] +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2404,6 +2510,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.45.0", ] @@ -2509,6 +2616,34 @@ dependencies = [ "tracing-futures", ] +[[package]] +name = "tonic" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +dependencies = [ + "async-trait", + "axum", + "base64 0.21.0", + "bytes", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.8.4" diff --git a/client/bin/Cargo.toml b/client/bin/Cargo.toml index 4a76e3a..3848fb0 100644 --- a/client/bin/Cargo.toml +++ b/client/bin/Cargo.toml @@ -18,9 +18,10 @@ fornet-lib = {path = "../lib"} clap = { version = "3.2.20", features = ["env"] } anyhow = "1.0" -tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread", "net", "signal"] } +tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread", "net", "signal", "tracing"] } tracing = "0.1.36" tracing-subscriber = {version = "0.3.15", features = ["env-filter"]} +console-subscriber = "0.1.9" serde = { version = "1.0.144" } serde_json = "1.0.85" diff --git a/client/bin/src/fornet.rs b/client/bin/src/fornet.rs index 275881c..153c63a 100644 --- a/client/bin/src/fornet.rs +++ b/client/bin/src/fornet.rs @@ -16,48 +16,43 @@ async fn main() -> anyhow::Result<()> { let matches = Command::new(APP_NAME) .version(env!("CARGO_PKG_VERSION")) - .author("timzaak ") + .author("ForNetCode ") .args(&[ Arg::new("config") .long("config") .short('c') .env("FORNET_CONFIG") .help("config directory path") - .default_value(&default_config_path) + .default_value(&default_config_path), ]) .get_matches(); + let config_dir = matches.value_of("config").unwrap().to_owned(); - match matches.subcommand() { - None => { - let log_level = if cfg!(debug_assertions) { - "debug" - } else { - "info" - }; - let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::from_str(log_level).unwrap()); - if cfg!(debug_assertions) { - tracing_subscriber::fmt() - .with_env_filter(env_filter) - .init(); - } else { - tracing_subscriber::fmt() - .with_env_filter(env_filter) - .with_target(false) - .with_ansi(false) - .init(); - } - //tracing::info!("log level: {log_level}"); - ServerManager::start_server(config_dir, StartMethod::CommandLine).await?; - tokio::signal::ctrl_c().await.unwrap(); - //TODO: add signal - } - _ => { - println!("please type: fornet --help for more information.") - } + //console_subscriber::init(); + + let log_level = if cfg!(debug_assertions) { + "debug" + } else { + "info" }; + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::from_str(log_level).unwrap()); + if cfg!(debug_assertions) { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .init(); + } else { + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_target(false) + .with_ansi(false) + .init(); + } + + ServerManager::start_server(config_dir, StartMethod::CommandLine).await?; + tokio::signal::ctrl_c().await.unwrap(); Ok(()) } diff --git a/client/bin/src/fornet_cli.rs b/client/bin/src/fornet_cli.rs index c1b56d3..bf0dc15 100644 --- a/client/bin/src/fornet_cli.rs +++ b/client/bin/src/fornet_cli.rs @@ -8,7 +8,7 @@ use fornet_lib::wr_manager::DeviceInfoResp; pub async fn main() -> anyhow::Result<()> { let mut command = Command::new("fornet-cli") .version(env!("CARGO_PKG_VERSION")) - .author("timzaak ") + .author("ForNetCode ") .subcommand( Command::new("join").arg(Arg::new("invite_token").help("base64 token").required(true)), ).subcommand( diff --git a/client/lib/examples/tokio_select_example.rs b/client/lib/examples/tokio_select_example.rs new file mode 100644 index 0000000..8eaa047 --- /dev/null +++ b/client/lib/examples/tokio_select_example.rs @@ -0,0 +1,28 @@ +use std::time::Duration; + +async fn interval() { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { + interval.tick().await; + println!("interval...") + } +} +#[tokio::main] +async fn main() -> anyhow::Result<()>{ + let task = tokio::spawn(async move { + loop { + tokio::select! { + _ = interval() => { + println!("interval finish"); + } + } + } + }); + tokio::time::sleep(Duration::from_secs(5)).await; + task.abort(); + println!("task is finish: {}", task.is_finished()); + tokio::time::sleep(Duration::from_secs(5)).await; + println!("task is finish: {}", task.is_finished()); + tokio::signal::ctrl_c().await.unwrap(); + Ok(()) +} \ No newline at end of file diff --git a/client/lib/src/device/mod.rs b/client/lib/src/device/mod.rs index 1fe976a..9bc06b2 100644 --- a/client/lib/src/device/mod.rs +++ b/client/lib/src/device/mod.rs @@ -230,11 +230,6 @@ impl DeviceData { impl Drop for DeviceData { fn drop(&mut self) { let _ = run_opt_script(&self.scripts.post_down); - //TODO: iface should be destroy - #[cfg(target_os = "macos")] - if let Err(e) = tun::sys::destroy_iface(&self.name) { - tracing::error!("remove route error: {e}"); - } } } @@ -383,7 +378,7 @@ pub async fn tcp_peers_timer( continue; } _ => { - tracing::warn!("should not come here"); + } }; match p.update_timers(&mut dst_buf) { @@ -560,12 +555,11 @@ pub fn tcp_handler( let (private_key, public_key) = key_pair.as_ref(); let mut writer = writer; let mut reader = reader; - //let (mut reader, writer ) = socket.into_split(); - //let mut writer = WriterState::PureWriter(writer); let mut src_buf: Vec = vec![0; MAX_UDP_SIZE]; let mut dst_buf: Vec = vec![0; MAX_UDP_SIZE]; while let Ok(size) = reader.read(&mut src_buf).await { if size > 0 { + tracing::debug!("tcp receive message"); let parsed_packet = match rate_limiter.as_ref().verify_packet(Some(addr.ip()), &src_buf[..size], &mut dst_buf) { Ok(packet) => packet, @@ -689,9 +683,12 @@ pub fn tcp_handler( p.endpoint.tcp_write(packet).await; } } - } + } else { + // if writer drop ,size would be zero, this may change in future tokio!! + break; + } } - tracing::info!("tcp: {addr:?} close"); + tracing::info!("tcp read: {addr:?} close"); }); } @@ -766,9 +763,4 @@ mod test { tokio::time::sleep(Duration::from_secs(5)).await; device.close().await; } - - #[tokio::test] - pub async fn tcp_split_can_close() { - - } } \ No newline at end of file diff --git a/client/lib/src/device/tun/sys/macos.rs b/client/lib/src/device/tun/sys/macos.rs index bfb2f86..4eaab25 100644 --- a/client/lib/src/device/tun/sys/macos.rs +++ b/client/lib/src/device/tun/sys/macos.rs @@ -23,9 +23,4 @@ pub fn remove_route(iface_name:&str, allowed_ip:&AllowedIP) -> anyhow::Result<() let inet = if allowed_ip.addr.is_ipv4() { "-inet" } else { "-inet6" }; //Command::new("route").args(&["-q", "-n", "delete", inet, &allowed_ip.to_string(), "-interface", iface_name]).status()?; Ok(()) -} - -pub fn destroy_iface(iface_name:&str) -> anyhow::Result<()> { - let r = run_cmd!(ifconfig ${iface_name} delete)?; - Ok(()) -} +} \ No newline at end of file diff --git a/client/lib/src/device/tun/sys/mod.rs b/client/lib/src/device/tun/sys/mod.rs index 00f5c9f..a1c6c98 100644 --- a/client/lib/src/device/tun/sys/mod.rs +++ b/client/lib/src/device/tun/sys/mod.rs @@ -3,7 +3,7 @@ use cfg_if::cfg_if; cfg_if! { if #[cfg(target_os = "macos")] { mod macos; - pub use macos::{set_route,set_alias, destroy_iface, remove_route}; + pub use macos::{set_route,set_alias, remove_route}; } else if #[cfg(target_os = "linux")]{ mod linux; pub use linux::{set_address}; diff --git a/client/lib/src/device/unix_device.rs b/client/lib/src/device/unix_device.rs index a0f06d1..4a3940f 100644 --- a/client/lib/src/device/unix_device.rs +++ b/client/lib/src/device/unix_device.rs @@ -85,8 +85,9 @@ impl Device { let task:JoinHandle<()> = tokio::spawn(async move { loop { + tokio::select! { - _ = device::rate_limiter_timer(&rate_limiter) => {} + _ = device::rate_limiter_timer(&rate_limiter) => {println!("rate_limiter_timer");} _ = device::tcp_peers_timer( &ip, &peers, @@ -95,25 +96,31 @@ impl Device { iface_writer.clone(), pi, node_type, - ) => {} + ) => {println!("tcp_peers_timer");} // iface listen Ok(len) = iface_reader.read(&mut tun_src_buf) => { - let src_buf = if pi { - &tun_src_buf[4..(len+4)] - } else { - &tun_src_buf[0..len] - }; - device::tun_read_tcp_handle(&peers, src_buf, &mut tun_dst_buf).await; + if len > 0 { + let src_buf = if pi { + &tun_src_buf[4..(len+4)] + } else { + &tun_src_buf[0..len] + }; + device::tun_read_tcp_handle(&peers, src_buf, &mut tun_dst_buf).await; + } + println!("tun_read_tcp_handle"); } //_ = device::tcp_listener_handler(&tcp4, key_pair.clone(), rate_limiter.clone(), Arc::clone(&peers), Arc::clone(&iface_writer), pi) => {break} - _ = device::tcp_listener_handler(&tcp6, key_pair.clone(), rate_limiter.clone(), Arc::clone(&peers), Arc::clone(&iface_writer), pi) => {break} + _ = device::tcp_listener_handler(&tcp6, key_pair.clone(), rate_limiter.clone(), Arc::clone(&peers), Arc::clone(&iface_writer), pi) => { + println!("tcp_listener_handler"); + break; + } + } } }); (port, task) } }; - let device = Device { device_data: DeviceData::new(name,peers1, key_pair1, port, scripts, ), @@ -129,6 +136,7 @@ impl Device { pub async fn close(&mut self) { self.task.abort();// close all connections. + //tracing::debug!("tun/rpc task is finish: {}", self.task.is_finished()); self.device_data.close().await; // task abort would cost some time. tokio::time::sleep(Duration::from_secs(2)).await; From d4ceccb1153442848bacc231ce7e9425ee8fc81f Mon Sep 17 00:00:00 2001 From: Timzaak Date: Sat, 24 Jun 2023 21:31:11 +0800 Subject: [PATCH 08/11] bak --- client/Cargo.lock | 136 +----------------------------------------- client/bin/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 136 deletions(-) diff --git a/client/Cargo.lock b/client/Cargo.lock index 05a0075..f29e898 100644 --- a/client/Cargo.lock +++ b/client/Cargo.lock @@ -2,12 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "adler" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" - [[package]] name = "aead" version = "0.5.1" @@ -425,42 +419,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "console-api" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2895653b4d9f1538a83970077cb01dfc77a4810524e51a110944688e916b18e" -dependencies = [ - "prost", - "prost-types", - "tonic 0.9.2", - "tracing-core", -] - -[[package]] -name = "console-subscriber" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57ab2224a0311582eb03adba4caaf18644f7b1f10a760803a803b9b605187fc7" -dependencies = [ - "console-api", - "crossbeam-channel", - "crossbeam-utils", - "futures", - "hdrhistogram", - "humantime", - "prost-types", - "serde", - "serde_json", - "thread_local", - "tokio", - "tokio-stream", - "tonic 0.9.2", - "tracing", - "tracing-core", - "tracing-subscriber", -] - [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -496,15 +454,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc32fast" -version = "1.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" -dependencies = [ - "cfg-if", -] - [[package]] name = "crossbeam-channel" version = "0.5.7" @@ -755,16 +704,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" -[[package]] -name = "flate2" -version = "1.0.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" -dependencies = [ - "crc32fast", - "miniz_oxide", -] - [[package]] name = "flutter_rust_bridge" version = "1.72.0" @@ -832,7 +771,6 @@ version = "0.0.3" dependencies = [ "anyhow", "clap", - "console-subscriber", "fornet-lib", "serde", "serde_json", @@ -875,7 +813,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util 0.6.10", - "tonic 0.8.3", + "tonic", "tonic-build", "tracing", "tracing-subscriber", @@ -1066,19 +1004,6 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" -[[package]] -name = "hdrhistogram" -version = "7.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f19b9f54f7c7f55e31401bb647626ce0cf0f67b0004982ce815b3ee72a02aa8" -dependencies = [ - "base64 0.13.1", - "byteorder", - "flate2", - "nom", - "num-traits", -] - [[package]] name = "heck" version = "0.4.1" @@ -1158,12 +1083,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" -[[package]] -name = "humantime" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" - [[package]] name = "hwaddr" version = "0.1.7" @@ -1465,21 +1384,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "minimal-lexical" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" - -[[package]] -name = "miniz_oxide" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" -dependencies = [ - "adler", -] - [[package]] name = "mio" version = "0.8.6" @@ -1530,16 +1434,6 @@ dependencies = [ "pin-utils", ] -[[package]] -name = "nom" -version = "7.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" -dependencies = [ - "memchr", - "minimal-lexical", -] - [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -2616,34 +2510,6 @@ dependencies = [ "tracing-futures", ] -[[package]] -name = "tonic" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" -dependencies = [ - "async-trait", - "axum", - "base64 0.21.0", - "bytes", - "futures-core", - "futures-util", - "h2", - "http", - "http-body", - "hyper", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost", - "tokio", - "tokio-stream", - "tower", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "tonic-build" version = "0.8.4" diff --git a/client/bin/Cargo.toml b/client/bin/Cargo.toml index 3848fb0..589abe6 100644 --- a/client/bin/Cargo.toml +++ b/client/bin/Cargo.toml @@ -21,7 +21,7 @@ anyhow = "1.0" tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread", "net", "signal", "tracing"] } tracing = "0.1.36" tracing-subscriber = {version = "0.3.15", features = ["env-filter"]} -console-subscriber = "0.1.9" +#console-subscriber = "0.1.9" serde = { version = "1.0.144" } serde_json = "1.0.85" From 16031de203a870da0b6b30df2d81d6fb729fb5d4 Mon Sep 17 00:00:00 2001 From: timzaak Date: Sat, 24 Jun 2023 21:41:22 +0800 Subject: [PATCH 09/11] bak --- client/lib/src/device/mod.rs | 23 +++++++++++------------ client/lib/src/device/unix_device.rs | 6 ++---- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/client/lib/src/device/mod.rs b/client/lib/src/device/mod.rs index 9bc06b2..3b02e4d 100644 --- a/client/lib/src/device/mod.rs +++ b/client/lib/src/device/mod.rs @@ -559,7 +559,7 @@ pub fn tcp_handler( let mut dst_buf: Vec = vec![0; MAX_UDP_SIZE]; while let Ok(size) = reader.read(&mut src_buf).await { if size > 0 { - tracing::debug!("tcp receive message"); + //tracing::debug!("tcp receive message"); let parsed_packet = match rate_limiter.as_ref().verify_packet(Some(addr.ip()), &src_buf[..size], &mut dst_buf) { Ok(packet) => packet, @@ -675,21 +675,20 @@ pub fn tcp_handler( } }; - if flush { + if flush { // Flush pending queue - while let TunnResult::WriteToNetwork(packet) = - p.tunnel.decapsulate(None, &[], &mut dst_buf[..]) - { - p.endpoint.tcp_write(packet).await; - } + while let TunnResult::WriteToNetwork(packet) = + p.tunnel.decapsulate(None, &[], &mut dst_buf[..]) { + p.endpoint.tcp_write(packet).await; } - } else { - // if writer drop ,size would be zero, this may change in future tokio!! + } + } else { + // if writer drop ,size would be zero, this may change in future tokio!! break; } - } - tracing::info!("tcp read: {addr:?} close"); - }); + } + tracing::info!("tcp read: {addr:?} close"); + }); } diff --git a/client/lib/src/device/unix_device.rs b/client/lib/src/device/unix_device.rs index 4a3940f..8534712 100644 --- a/client/lib/src/device/unix_device.rs +++ b/client/lib/src/device/unix_device.rs @@ -87,7 +87,7 @@ impl Device { loop { tokio::select! { - _ = device::rate_limiter_timer(&rate_limiter) => {println!("rate_limiter_timer");} + _ = device::rate_limiter_timer(&rate_limiter) => {} _ = device::tcp_peers_timer( &ip, &peers, @@ -96,7 +96,7 @@ impl Device { iface_writer.clone(), pi, node_type, - ) => {println!("tcp_peers_timer");} + ) => {} // iface listen Ok(len) = iface_reader.read(&mut tun_src_buf) => { if len > 0 { @@ -107,11 +107,9 @@ impl Device { }; device::tun_read_tcp_handle(&peers, src_buf, &mut tun_dst_buf).await; } - println!("tun_read_tcp_handle"); } //_ = device::tcp_listener_handler(&tcp4, key_pair.clone(), rate_limiter.clone(), Arc::clone(&peers), Arc::clone(&iface_writer), pi) => {break} _ = device::tcp_listener_handler(&tcp6, key_pair.clone(), rate_limiter.clone(), Arc::clone(&peers), Arc::clone(&iface_writer), pi) => { - println!("tcp_listener_handler"); break; } From 34a947be0bd4675c876cdd6c97d319d9793bc355 Mon Sep 17 00:00:00 2001 From: timzaak Date: Sat, 24 Jun 2023 21:53:15 +0800 Subject: [PATCH 10/11] bak --- client/lib/src/wr_manager.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/client/lib/src/wr_manager.rs b/client/lib/src/wr_manager.rs index 50e78fb..678264f 100644 --- a/client/lib/src/wr_manager.rs +++ b/client/lib/src/wr_manager.rs @@ -1,5 +1,6 @@ use std::net::{IpAddr, SocketAddr}; use std::str::FromStr; +use std::time::Duration; use anyhow::anyhow; use serde_derive::{Deserialize, Serialize}; use crate::config::{Config, Identity}; @@ -63,14 +64,20 @@ impl WRManager { //TODO: check if need restart // if interface not equal, restart // check peers, remove or add new ones. - self.close().await; - tracing::info!("close device before restart"); + let has_alive = self.is_alive(); + if has_alive { + tracing::info!("close device"); + self.close().await; + tokio::time::sleep(Duration::from_secs(10)).await; + } + let tun_name = config.get_tun_name(); let protocol = Protocol::from_i32(interface.protocol).unwrap_or(Protocol::Udp); let node_type = NodeType::from_i32(wr_config.r#type).unwrap(); let scripts = Scripts::load_from_interface(&interface); let key_pair = (config.identity.x25519_sk.clone(), config.identity.x25519_pk.clone()); + tracing::debug!("begin to start device"); let wr_interface = Device::new( &tun_name, &address, From 3c01ccb4379fc191dd7e2c61744abed934d7a96a Mon Sep 17 00:00:00 2001 From: timzaak Date: Sat, 24 Jun 2023 22:03:08 +0800 Subject: [PATCH 11/11] bak it --- client/lib/src/device/mod.rs | 3 +++ client/lib/src/device/unix_device.rs | 2 +- client/lib/src/wr_manager.rs | 4 +++- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/client/lib/src/device/mod.rs b/client/lib/src/device/mod.rs index 3b02e4d..d9afff8 100644 --- a/client/lib/src/device/mod.rs +++ b/client/lib/src/device/mod.rs @@ -141,6 +141,7 @@ pub struct DeviceData { pub key_pair: (x25519_dalek::StaticSecret, x25519_dalek::PublicKey), pub listen_port: u16, pub scripts: Scripts, + pub node_type: NodeType, } impl DeviceData { @@ -149,6 +150,7 @@ impl DeviceData { key_pair: (x25519_dalek::StaticSecret, x25519_dalek::PublicKey), listen_port: u16, scripts: Scripts, + node_type:NodeType, ) -> Self { Self { name, @@ -157,6 +159,7 @@ impl DeviceData { key_pair, listen_port, scripts, + node_type, } } pub fn next_index(&mut self) -> u32 { diff --git a/client/lib/src/device/unix_device.rs b/client/lib/src/device/unix_device.rs index 8534712..7a0aa42 100644 --- a/client/lib/src/device/unix_device.rs +++ b/client/lib/src/device/unix_device.rs @@ -120,7 +120,7 @@ impl Device { } }; let device = Device { - device_data: DeviceData::new(name,peers1, key_pair1, port, scripts, + device_data: DeviceData::new(name,peers1, key_pair1, port, scripts, node_type, ), task, protocol, diff --git a/client/lib/src/wr_manager.rs b/client/lib/src/wr_manager.rs index 678264f..b8b57fc 100644 --- a/client/lib/src/wr_manager.rs +++ b/client/lib/src/wr_manager.rs @@ -66,9 +66,11 @@ impl WRManager { // check peers, remove or add new ones. let has_alive = self.is_alive(); if has_alive { + let node_type = self.device.as_ref().map(|x|x.node_type).unwrap_or(NodeType::NodeClient); tracing::info!("close device"); self.close().await; - tokio::time::sleep(Duration::from_secs(10)).await; + let sleep_time = if node_type == NodeType::NodeRelay {10} else {20}; + tokio::time::sleep(Duration::from_secs(sleep_time)).await; } let tun_name = config.get_tun_name();