Skip to content

Commit

Permalink
command line improvement (#115)
Browse files Browse the repository at this point in the history
make -l easy to use:
-l wg wss
-l wg:12345
-l 12345

make -r use random port
  • Loading branch information
KKRainbow committed May 16, 2024
1 parent f665de9 commit 7532a7c
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 40 deletions.
123 changes: 100 additions & 23 deletions easytier/src/easytier-core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common::config::{
ConsoleLoggerConfig, FileLoggerConfig, NetworkIdentity, PeerConfig, VpnPortalConfig,
};
use instance::instance::Instance;
use tokio::net::TcpSocket;

use crate::{
common::{
Expand Down Expand Up @@ -70,7 +71,7 @@ struct Cli {
)]
ipv4: Option<String>,

#[arg(short, long, help = "peers to connect initially")]
#[arg(short, long, help = "peers to connect initially", num_args = 0..)]
peers: Vec<String>,

#[arg(short, long, help = "use a public shared node to discover peers")]
Expand All @@ -86,17 +87,29 @@ struct Cli {
#[arg(
short,
long,
default_value = "127.0.0.1:15888",
help = "rpc portal address to listen for management"
default_value = "0",
help = "rpc portal address to listen for management. 0 means random
port, 12345 means listen on 12345 of localhost, 0.0.0.0:12345 means
listen on 12345 of all interfaces. default is 0 and will try 15888 first"
)]
rpc_portal: SocketAddr,

#[arg(short, long, help = "listeners to accept connections, pass '' to avoid listening.",
default_values_t = ["tcp://0.0.0.0:11010".to_string(),
"udp://0.0.0.0:11010".to_string(),
"wg://0.0.0.0:11011".to_string()])]
rpc_portal: String,

#[arg(short, long, help = "listeners to accept connections, allow format:
a port number: 11010, means tcp/udp will listen on 11010, ws/wss will listen on 11010 and 11011, wg will listen on 11011
url: tcp://0.0.0.0:11010, tcp can be tcp, udp, ring, wg, ws, wss,
proto:port: wg:11011, means listen on 11011 with wireguard protocol
url and proto:port can occur multiple times.
", default_values_t = ["11010".to_string()],
num_args = 0..)]
listeners: Vec<String>,

#[arg(
long,
help = "do not listen on any port, only connect to peers",
default_value = "false"
)]
no_listener: bool,

#[arg(long, help = "console log level",
value_parser = clap::builder::PossibleValuesParser::new(["trace", "debug", "info", "warn", "error", "off"]))]
console_log_level: Option<String>,
Expand Down Expand Up @@ -161,6 +174,80 @@ and the vpn client is in network of 10.14.14.0/24"
latency_first: bool,
}

impl Cli {
fn parse_listeners(&self) -> Vec<String> {
println!("parsing listeners: {:?}", self.listeners);
let proto_port_offset = vec![("tcp", 0), ("udp", 0), ("wg", 1), ("ws", 1), ("wss", 2)];

if self.no_listener || self.listeners.is_empty() {
return vec![];
}

let origin_listners = self.listeners.clone();
let mut listeners: Vec<String> = Vec::new();
if origin_listners.len() == 1 {
if let Ok(port) = origin_listners[0].parse::<u16>() {
for (proto, offset) in proto_port_offset {
listeners.push(format!("{}://0.0.0.0:{}", proto, port + offset));
}
return listeners;
}
}

for l in &origin_listners {
let proto_port: Vec<&str> = l.split(':').collect();
if proto_port.len() > 2 {
if let Ok(url) = l.parse::<url::Url>() {
listeners.push(url.to_string());
} else {
panic!("failed to parse listener: {}", l);
}
} else {
let Some((proto, offset)) = proto_port_offset
.iter()
.find(|(proto, _)| *proto == proto_port[0])
else {
panic!("unknown protocol: {}", proto_port[0]);
};

let port = if proto_port.len() == 2 {
proto_port[1].parse::<u16>().unwrap()
} else {
11010 + offset
};

listeners.push(format!("{}://0.0.0.0:{}", proto, port));
}
}

println!("parsed listeners: {:?}", listeners);

listeners
}

fn check_tcp_available(port: u16) -> Option<SocketAddr> {
let s = format!("127.0.0.1:{}", port).parse::<SocketAddr>().unwrap();
TcpSocket::new_v4().unwrap().bind(s).map(|_| s).ok()
}

fn parse_rpc_portal(&self) -> SocketAddr {
if let Ok(port) = self.rpc_portal.parse::<u16>() {
if port == 0 {
// check tcp 15888 first
for i in 15888..15900 {
if let Some(s) = Cli::check_tcp_available(i) {
return s;
}
}
return "127.0.0.1:0".parse().unwrap();
}
return format!("127.0.0.1:{}", port).parse().unwrap();
}

self.rpc_portal.parse().unwrap()
}
}

impl From<Cli> for TomlConfigLoader {
fn from(cli: Cli) -> Self {
if let Some(config_file) = &cli.config_file {
Expand Down Expand Up @@ -205,19 +292,9 @@ impl From<Cli> for TomlConfigLoader {
);

cfg.set_listeners(
cli.listeners
.iter()
.filter_map(|s| {
if s.is_empty() {
return None;
}

Some(
s.parse()
.with_context(|| format!("failed to parse listener uri: {}", s))
.unwrap(),
)
})
cli.parse_listeners()
.into_iter()
.map(|s| s.parse().unwrap())
.collect(),
);

Expand All @@ -229,7 +306,7 @@ impl From<Cli> for TomlConfigLoader {
);
}

cfg.set_rpc_portal(cli.rpc_portal);
cfg.set_rpc_portal(cli.parse_rpc_portal());

if cli.external_node.is_some() {
let mut old_peers = cfg.get_peers();
Expand Down
32 changes: 16 additions & 16 deletions easytier/src/gateway/icmp_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub struct IcmpProxy {
peer_manager: Arc<PeerManager>,

cidr_set: CidrSet,
socket: socket2::Socket,
socket: std::sync::Mutex<Option<socket2::Socket>>,

nat_table: IcmpNatTable,

Expand Down Expand Up @@ -158,23 +158,11 @@ impl IcmpProxy {
peer_manager: Arc<PeerManager>,
) -> Result<Arc<Self>, Error> {
let cidr_set = CidrSet::new(global_ctx.clone());

let _g = global_ctx.net_ns.guard();
let socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::RAW,
Some(socket2::Protocol::ICMPV4),
)?;
socket.bind(&socket2::SockAddr::from(SocketAddrV4::new(
std::net::Ipv4Addr::UNSPECIFIED,
0,
)))?;

let ret = Self {
global_ctx,
peer_manager,
cidr_set,
socket,
socket: std::sync::Mutex::new(None),

nat_table: Arc::new(dashmap::DashMap::new()),
tasks: Mutex::new(JoinSet::new()),
Expand All @@ -184,6 +172,18 @@ impl IcmpProxy {
}

pub async fn start(self: &Arc<Self>) -> Result<(), Error> {
let _g = self.global_ctx.net_ns.guard();
let socket = socket2::Socket::new(
socket2::Domain::IPV4,
socket2::Type::RAW,
Some(socket2::Protocol::ICMPV4),
)?;
socket.bind(&socket2::SockAddr::from(SocketAddrV4::new(
std::net::Ipv4Addr::UNSPECIFIED,
0,
)))?;
self.socket.lock().unwrap().replace(socket);

self.start_icmp_proxy().await?;
self.start_nat_table_cleaner().await?;
Ok(())
Expand All @@ -204,7 +204,7 @@ impl IcmpProxy {
}

async fn start_icmp_proxy(self: &Arc<Self>) -> Result<(), Error> {
let socket = self.socket.try_clone()?;
let socket = self.socket.lock().unwrap().as_ref().unwrap().try_clone()?;
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let nat_table = self.nat_table.clone();
thread::spawn(|| {
Expand Down Expand Up @@ -237,7 +237,7 @@ impl IcmpProxy {
dst_ip: Ipv4Addr,
icmp_packet: &icmp::echo_request::EchoRequestPacket,
) -> Result<(), Error> {
self.socket.send_to(
self.socket.lock().unwrap().as_ref().unwrap().send_to(
icmp_packet.packet(),
&SocketAddrV4::new(dst_ip.into(), 0).into(),
)?;
Expand Down
2 changes: 1 addition & 1 deletion easytier/src/instance/virtual_nic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Stream for TunStream {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<StreamItem>> {
let mut self_mut = self.project();
let mut g = ready!(self_mut.l.poll_lock(cx));
reserve_buf(&mut self_mut.cur_buf, 2500, 128 * 1024);
reserve_buf(&mut self_mut.cur_buf, 2500, 32 * 1024);
if self_mut.cur_buf.len() == 0 {
unsafe {
self_mut.cur_buf.set_len(*self_mut.payload_offset);
Expand Down

0 comments on commit 7532a7c

Please sign in to comment.