Skip to content

Commit

Permalink
Add implementation for stop command
Browse files Browse the repository at this point in the history
This adds implementation for stopping the tunnel
via the `Stop` command.
  • Loading branch information
JettChenT committed Feb 24, 2024
1 parent 29d2bfa commit c4c342d
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 38 deletions.
11 changes: 11 additions & 0 deletions Apple/NetworkExtension/PacketTunnelProvider.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ class PacketTunnelProvider: NEPacketTunnelProvider {
}
}

override func stopTunnel(with reason: NEProviderStopReason) async {
do {
let client = try Client()
let command = BurrowRequest(id: 0, command: "Stop")
let data = try await client.request(command, type: Response<BurrowResult<String>>.self)
self.logger.log("Stopped client.")
} catch {
self.logger.error("Failed to stop tunnel: \(error)")
}
}

private func generateTunSettings(from: ServerConfigData) -> NETunnelNetworkSettings? {
let cfig = from.ServerConfig
let nst = NEPacketTunnelNetworkSettings(tunnelRemoteAddress: "1.1.1.1")
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ daemon:
start:
@$(cargo_norm) start

stop:
@$(cargo_norm) stop

test-dns:
@sudo route delete 8.8.8.8
@sudo route add 8.8.8.8 -interface $(tun)
Expand Down
26 changes: 11 additions & 15 deletions burrow/src/daemon/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enum RunState {
pub struct DaemonInstance {
rx: async_channel::Receiver<DaemonCommand>,
sx: async_channel::Sender<DaemonResponse>,
tun_interface: Option<Arc<RwLock<TunInterface>>>,
tun_interface: Arc<RwLock<Option<TunInterface>>>,
wg_interface: Arc<RwLock<Interface>>,
wg_state: RunState,
}
Expand All @@ -36,7 +36,7 @@ impl DaemonInstance {
rx,
sx,
wg_interface,
tun_interface: None,
tun_interface: Arc::new(RwLock::new(None)),
wg_state: RunState::Idle,
}
}
Expand All @@ -50,15 +50,15 @@ impl DaemonInstance {
warn!("Got start, but tun interface already up.");
}
RunState::Idle => {
let tun_if = Arc::new(RwLock::new(st.tun.open()?));
let tun_if = st.tun.open()?;
debug!("Setting tun on wg_interface");
self.wg_interface.read().await.set_tun(tun_if).await;
debug!("tun set on wg_interface");

debug!("Setting tun_interface");
self.tun_interface = Some(tun_if.clone());
self.tun_interface = self.wg_interface.read().await.get_tun();
debug!("tun_interface set: {:?}", self.tun_interface);

debug!("Setting tun on wg_interface");
self.wg_interface.write().await.set_tun(tun_if);
debug!("tun set on wg_interface");

debug!("Cloning wg_interface");
let tmp_wg = self.wg_interface.clone();
Expand All @@ -82,22 +82,18 @@ impl DaemonInstance {
}
Ok(DaemonResponseData::None)
}
DaemonCommand::ServerInfo => match &self.tun_interface {
DaemonCommand::ServerInfo => match &self.tun_interface.read().await.as_ref() {
None => Ok(DaemonResponseData::None),
Some(ti) => {
info!("{:?}", ti);
Ok(DaemonResponseData::ServerInfo(ServerInfo::try_from(
ti.read().await.inner.get_ref(),
ti.inner.get_ref(),
)?))
}
},
DaemonCommand::Stop => {
if self.tun_interface.is_some() {
self.tun_interface = None;
info!("Daemon stopping tun interface.");
} else {
warn!("Got stop, but tun interface is not up.")
}
self.wg_interface.read().await.remove_tun().await;
self.wg_state = RunState::Idle;
Ok(DaemonResponseData::None)
}
DaemonCommand::ServerConfig => {
Expand Down
73 changes: 53 additions & 20 deletions burrow/src/wireguard/iface.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{net::IpAddr, sync::Arc};
use std::ops::Deref;

use anyhow::Error;
use fehler::throws;
use futures::future::join_all;
use ip_network_table::IpNetworkTable;
use tokio::sync::RwLock;
use tokio::sync::{RwLock, Notify};
use tracing::{debug, error};
use tun::tokio::TunInterface;

Expand Down Expand Up @@ -46,9 +47,21 @@ impl FromIterator<PeerPcb> for IndexedPcbs {
}
}

enum IfaceStatus {
Running,
Idle
}

pub struct Interface {
tun: Option<Arc<RwLock<TunInterface>>>,
tun: Arc<RwLock<Option<TunInterface>>>,
pcbs: Arc<IndexedPcbs>,
status: Arc<RwLock<IfaceStatus>>,
stop_notifier: Arc<Notify>,
}

async fn is_running(status: Arc<RwLock<IfaceStatus>>) -> bool {
let st = status.read().await;
matches!(st.deref(), IfaceStatus::Running)
}

impl Interface {
Expand All @@ -60,35 +73,54 @@ impl Interface {
.collect::<Result<_, _>>()?;

let pcbs = Arc::new(pcbs);
Self { pcbs, tun: None }
Self { pcbs, tun: Arc::new(RwLock::new(None)), status: Arc::new(RwLock::new(IfaceStatus::Idle)), stop_notifier: Arc::new(Notify::new()) }
}

pub async fn set_tun(&self, tun: TunInterface) {
debug!("Setting tun interface");
self.tun.write().await.replace(tun);
let mut st = self.status.write().await;
*st = IfaceStatus::Running;
}

pub fn set_tun(&mut self, tun: Arc<RwLock<TunInterface>>) {
self.tun = Some(tun);
pub fn get_tun(&self) -> Arc<RwLock<Option<TunInterface>>> {
self.tun.clone()
}

pub async fn remove_tun(&self){
let mut st = self.status.write().await;
self.stop_notifier.notify_waiters();
*st = IfaceStatus::Idle;
}

pub async fn run(&self) -> anyhow::Result<()> {
let pcbs = self.pcbs.clone();
let tun = self
.tun
.clone()
.ok_or(anyhow::anyhow!("tun interface does not exist"))?;
.clone();
let status = self.status.clone();
let stop_notifier = self.stop_notifier.clone();
log::info!("Starting interface");

let outgoing = async move {
loop {
while is_running(status.clone()).await {
let mut buf = [0u8; 3000];

let src = {
let src = match tun.read().await.recv(&mut buf[..]).await {
Ok(len) => &buf[..len],
Err(e) => {
error!("Failed to read from interface: {}", e);
continue
}
let t = tun.read().await;
let Some(_tun) = t.as_ref() else {
continue;
};
debug!("Read {} bytes from interface", src.len());
src
tokio::select! {
_ = stop_notifier.notified() => continue,
pkg = _tun.recv(&mut buf[..]) => match pkg {
Ok(len) => &buf[..len],
Err(e) => {
error!("Failed to read from interface: {}", e);
continue
}
},
}
};

let dst_addr = match Tunnel::dst_address(src) {
Expand Down Expand Up @@ -123,8 +155,7 @@ impl Interface {
let mut tsks = vec![];
let tun = self
.tun
.clone()
.ok_or(anyhow::anyhow!("tun interface does not exist"))?;
.clone();
let outgoing = tokio::task::spawn(outgoing);
tsks.push(outgoing);
debug!("preparing to spawn read tasks");
Expand All @@ -149,9 +180,10 @@ impl Interface {
};

let pcb = pcbs.pcbs[i].clone();
let status = self.status.clone();
let update_timers_tsk = async move {
let mut buf = [0u8; 65535];
loop {
while is_running(status.clone()).await {
tokio::time::sleep(tokio::time::Duration::from_millis(250)).await;
match pcb.update_timers(&mut buf).await {
Ok(..) => (),
Expand All @@ -164,8 +196,9 @@ impl Interface {
};

let pcb = pcbs.pcbs[i].clone();
let status = self.status.clone();
let reset_rate_limiter_tsk = async move {
loop {
while is_running(status.clone()).await {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
pcb.reset_rate_limiter().await;
}
Expand Down
6 changes: 3 additions & 3 deletions burrow/src/wireguard/pcb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl PeerPcb {
Ok(())
}

pub async fn run(&self, tun_interface: Arc<RwLock<TunInterface>>) -> Result<(), Error> {
pub async fn run(&self, tun_interface: Arc<RwLock<Option<TunInterface>>>) -> Result<(), Error> {
tracing::debug!("starting read loop for pcb... for {:?}", &self);
let rid: i32 = random();
let mut buf: [u8; 3000] = [0u8; 3000];
Expand Down Expand Up @@ -106,12 +106,12 @@ impl PeerPcb {
}
TunnResult::WriteToTunnelV4(packet, addr) => {
tracing::debug!("WriteToTunnelV4: {:?}, {:?}", packet, addr);
tun_interface.read().await.send(packet).await?;
tun_interface.read().await.as_ref().ok_or(anyhow::anyhow!("tun interface does not exist"))?.send(packet).await?;
break
}
TunnResult::WriteToTunnelV6(packet, addr) => {
tracing::debug!("WriteToTunnelV6: {:?}, {:?}", packet, addr);
tun_interface.read().await.send(packet).await?;
tun_interface.read().await.as_ref().ok_or(anyhow::anyhow!("tun interface does not exist"))?.send(packet).await?;
break
}
}
Expand Down

0 comments on commit c4c342d

Please sign in to comment.