Skip to content

Commit

Permalink
feat(subscriptions): add uds subscription support
Browse files Browse the repository at this point in the history
This commit adds support for subscriptions via Unix Domain Sockets which
are better suited for IPC between Rust processes compared to Named Pipes
which have issues that I don't want to spend time resolving.

The main motivation for this change is to provide an easy way for the
new zebar project to consume information about komorebi's state in the
Rust backend so that a bar module can be created for komorebi users.

The next step in this process will be to finally refactor the komorebi
crate into a mixed bin/lib crate, and expose all notification-related
structs and maybe some connection helper methods in a new
komorebi-client crate.

The previous "subscribe" and "unsubscribe" komorebic commands have had
the "-pipe" suffix added to them, with aliases in place for the previous
names in order to ensure backwards compat.
  • Loading branch information
LGUG2Z committed Feb 18, 2024
1 parent c8f6502 commit ef1ce4a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 24 deletions.
6 changes: 4 additions & 2 deletions komorebi-core/src/lib.rs
Expand Up @@ -156,8 +156,10 @@ pub enum SocketMessage {
ToggleMouseFollowsFocus,
RemoveTitleBar(ApplicationIdentifier, String),
ToggleTitleBars,
AddSubscriber(String),
RemoveSubscriber(String),
AddSubscriberSocket(String),
RemoveSubscriberSocket(String),
AddSubscriberPipe(String),
RemoveSubscriberPipe(String),
ApplicationSpecificConfigurationSchema,
NotificationSchema,
SocketSchema,
Expand Down
38 changes: 30 additions & 8 deletions komorebi/src/main.rs
Expand Up @@ -38,6 +38,7 @@ use sysinfo::Process;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::EnvFilter;
use uds_windows::UnixStream;
use which::which;
use winreg::enums::HKEY_CURRENT_USER;
use winreg::RegKey;
Expand Down Expand Up @@ -172,6 +173,8 @@ lazy_static! {
]));
static ref SUBSCRIPTION_PIPES: Arc<Mutex<HashMap<String, File>>> =
Arc::new(Mutex::new(HashMap::new()));
static ref SUBSCRIPTION_SOCKETS: Arc<Mutex<HashMap<String, PathBuf>>> =
Arc::new(Mutex::new(HashMap::new()));
static ref TCP_CONNECTIONS: Arc<Mutex<HashMap<String, TcpStream>>> =
Arc::new(Mutex::new(HashMap::new()));
static ref HIDING_BEHAVIOUR: Arc<Mutex<HidingBehaviour>> =
Expand Down Expand Up @@ -388,12 +391,32 @@ pub struct Notification {
}

pub fn notify_subscribers(notification: &str) -> Result<()> {
let mut stale_subscriptions = vec![];
let mut subscriptions = SUBSCRIPTION_PIPES.lock();
for (subscriber, pipe) in &mut *subscriptions {
let mut stale_sockets = vec![];
let mut sockets = SUBSCRIPTION_SOCKETS.lock();

for (socket, path) in &mut *sockets {
match UnixStream::connect(path) {
Ok(mut stream) => {
tracing::debug!("pushed notification to subscriber: {socket}");
stream.write_all(notification.as_bytes())?;
}
Err(_) => {
stale_sockets.push(socket.clone());
}
}
}

for socket in stale_sockets {
tracing::warn!("removing stale subscription: {socket}");
sockets.remove(&socket);
}

let mut stale_pipes = vec![];
let mut pipes = SUBSCRIPTION_PIPES.lock();
for (subscriber, pipe) in &mut *pipes {
match writeln!(pipe, "{notification}") {
Ok(()) => {
tracing::debug!("pushed notification to subscriber: {}", subscriber);
tracing::debug!("pushed notification to subscriber: {subscriber}");
}
Err(error) => {
// ERROR_FILE_NOT_FOUND
Expand All @@ -406,16 +429,15 @@ pub fn notify_subscribers(notification: &str) -> Result<()> {

// Remove the subscription; the process will have to subscribe again
if let Some(2 | 232) = error.raw_os_error() {
let subscriber_cl = subscriber.clone();
stale_subscriptions.push(subscriber_cl);
stale_pipes.push(subscriber.clone());
}
}
}
}

for subscriber in stale_subscriptions {
for subscriber in stale_pipes {
tracing::warn!("removing stale subscription: {}", subscriber);
subscriptions.remove(&subscriber);
pipes.remove(&subscriber);
}

Ok(())
Expand Down
15 changes: 13 additions & 2 deletions komorebi/src/process_command.rs
Expand Up @@ -59,6 +59,7 @@ use crate::BORDER_OFFSET;
use crate::BORDER_OVERFLOW_IDENTIFIERS;
use crate::BORDER_WIDTH;
use crate::CUSTOM_FFM;
use crate::DATA_DIR;
use crate::DISPLAY_INDEX_PREFERENCES;
use crate::FLOAT_IDENTIFIERS;
use crate::HIDING_BEHAVIOUR;
Expand All @@ -70,6 +71,7 @@ use crate::NO_TITLEBAR;
use crate::OBJECT_NAME_CHANGE_ON_LAUNCH;
use crate::REMOVE_TITLEBARS;
use crate::SUBSCRIPTION_PIPES;
use crate::SUBSCRIPTION_SOCKETS;
use crate::TCP_CONNECTIONS;
use crate::TRAY_AND_MULTI_WINDOW_IDENTIFIERS;
use crate::WORKSPACE_RULES;
Expand Down Expand Up @@ -1155,7 +1157,16 @@ impl WindowManager {
workspace.set_resize_dimensions(resize);
self.update_focused_workspace(false)?;
}
SocketMessage::AddSubscriber(ref subscriber) => {
SocketMessage::AddSubscriberSocket(ref socket) => {
let mut sockets = SUBSCRIPTION_SOCKETS.lock();
let socket_path = DATA_DIR.join(socket);
sockets.insert(socket.clone(), socket_path);
}
SocketMessage::RemoveSubscriberSocket(ref socket) => {
let mut sockets = SUBSCRIPTION_SOCKETS.lock();
sockets.remove(socket);
}
SocketMessage::AddSubscriberPipe(ref subscriber) => {
let mut pipes = SUBSCRIPTION_PIPES.lock();
let pipe_path = format!(r"\\.\pipe\{subscriber}");
let pipe = connect(&pipe_path).map_err(|_| {
Expand All @@ -1164,7 +1175,7 @@ impl WindowManager {

pipes.insert(subscriber.clone(), pipe);
}
SocketMessage::RemoveSubscriber(ref subscriber) => {
SocketMessage::RemoveSubscriberPipe(ref subscriber) => {
let mut pipes = SUBSCRIPTION_PIPES.lock();
pipes.remove(subscriber);
}
Expand Down
50 changes: 38 additions & 12 deletions komorebic/src/main.rs
Expand Up @@ -721,13 +721,25 @@ struct LoadCustomLayout {
}

#[derive(Parser, AhkFunction)]
struct Subscribe {
struct SubscribeSocket {
/// Name of the socket to send event notifications to
socket: String,
}

#[derive(Parser, AhkFunction)]
struct UnsubscribeSocket {
/// Name of the socket to stop sending event notifications to
socket: String,
}

#[derive(Parser, AhkFunction)]
struct SubscribePipe {
/// Name of the pipe to send event notifications to (without "\\.\pipe\" prepended)
named_pipe: String,
}

#[derive(Parser, AhkFunction)]
struct Unsubscribe {
struct UnsubscribePipe {
/// Name of the pipe to stop sending event notifications to (without "\\.\pipe\" prepended)
named_pipe: String,
}
Expand Down Expand Up @@ -802,12 +814,20 @@ enum SubCommand {
/// Query the current window manager state
#[clap(arg_required_else_help = true)]
Query(Query),
/// Subscribe to komorebi events
/// Subscribe to komorebi events using a Unix Domain Socket
#[clap(arg_required_else_help = true)]
Subscribe(Subscribe),
SubscribeSocket(SubscribeSocket),
/// Unsubscribe from komorebi events
#[clap(arg_required_else_help = true)]
Unsubscribe(Unsubscribe),
UnsubscribeSocket(UnsubscribeSocket),
/// Subscribe to komorebi events using a Named Pipe
#[clap(arg_required_else_help = true)]
#[clap(alias = "subscribe")]
SubscribePipe(SubscribePipe),
/// Unsubscribe from komorebi events
#[clap(arg_required_else_help = true)]
#[clap(alias = "unsubscribe")]
UnsubscribePipe(UnsubscribePipe),
/// Tail komorebi.exe's process logs (cancel with Ctrl-C)
Log,
/// Quicksave the current resize layout dimensions
Expand Down Expand Up @@ -1175,7 +1195,7 @@ pub fn send_message(bytes: &[u8]) -> Result<()> {
pub fn send_query(bytes: &[u8]) -> Result<String> {
let socket = DATA_DIR.join("komorebi.sock");

let mut stream = UnixStream::connect(&socket)?;
let mut stream = UnixStream::connect(socket)?;
stream.write_all(bytes)?;
stream.shutdown(Shutdown::Write)?;

Expand All @@ -1188,9 +1208,9 @@ pub fn send_query(bytes: &[u8]) -> Result<String> {

// print_query is a helper that queries komorebi and prints the response.
// panics on error.
pub fn print_query(bytes: &[u8]) {
fn print_query(bytes: &[u8]) {
match send_query(bytes) {
Ok(response) => println!("{}", response),
Ok(response) => println!("{response}"),
Err(error) => panic!("{}", error),
}
}
Expand Down Expand Up @@ -2088,11 +2108,17 @@ Stop-Process -Name:whkd -ErrorAction SilentlyContinue
SubCommand::LoadResize(arg) => {
send_message(&SocketMessage::Load(resolve_home_path(arg.path)?).as_bytes()?)?;
}
SubCommand::Subscribe(arg) => {
send_message(&SocketMessage::AddSubscriber(arg.named_pipe).as_bytes()?)?;
SubCommand::SubscribeSocket(arg) => {
send_message(&SocketMessage::AddSubscriberSocket(arg.socket).as_bytes()?)?;
}
SubCommand::UnsubscribeSocket(arg) => {
send_message(&SocketMessage::RemoveSubscriberSocket(arg.socket).as_bytes()?)?;
}
SubCommand::SubscribePipe(arg) => {
send_message(&SocketMessage::AddSubscriberPipe(arg.named_pipe).as_bytes()?)?;
}
SubCommand::Unsubscribe(arg) => {
send_message(&SocketMessage::RemoveSubscriber(arg.named_pipe).as_bytes()?)?;
SubCommand::UnsubscribePipe(arg) => {
send_message(&SocketMessage::RemoveSubscriberPipe(arg.named_pipe).as_bytes()?)?;
}
SubCommand::ToggleMouseFollowsFocus => {
send_message(&SocketMessage::ToggleMouseFollowsFocus.as_bytes()?)?;
Expand Down

0 comments on commit ef1ce4a

Please sign in to comment.