Skip to content

Commit

Permalink
Publish status changes to connected sockets
Browse files Browse the repository at this point in the history
Whenever the playback mode (playing, paused, stopped) or the track changes, all
socket listeners will be notified.

Fixes #924, fixes #1019
  • Loading branch information
hrkfdn committed Dec 28, 2022
1 parent 9c79c3c commit 3888988
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ strum = "0.24.1"
strum_macros = "0.24.3"
tokio = {version = "1", features = ["rt-multi-thread", "sync", "time", "net"]}
tokio-util = {version = "0.7.4", features = ["codec"]}
tokio-stream = "0.1.9"
tokio-stream = {version = "0.1.11", features = ["sync"]}
toml = "0.5"
unicode-width = "0.1.9"
url = "2.2"
Expand Down
88 changes: 68 additions & 20 deletions src/ipc.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,26 @@
use std::{io, path::PathBuf};

use futures::SinkExt;
use log::{debug, error, info};
use tokio::net::{UnixListener, UnixStream};
use tokio::runtime::Handle;
use tokio::sync::watch::{Receiver, Sender};
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LinesCodec};
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec};

use crate::events::{Event, EventManager};
use crate::model::playable::Playable;
use crate::spotify::PlayerEvent;

pub struct IpcSocket {
listener: UnixListener,
ev: EventManager,
tx: Sender<Status>,
}

#[derive(Clone, Debug, Serialize)]
struct Status {
mode: PlayerEvent,
playable: Option<Playable>,
}

impl IpcSocket {
Expand All @@ -21,39 +31,77 @@ impl IpcSocket {

info!("Creating IPC domain socket at {path:?}");

let _guard = handle.enter();
let listener = UnixListener::bind(path)?;
Ok(IpcSocket { listener, ev })
let status = Status {
mode: PlayerEvent::Stopped,
playable: None,
};

let (tx, rx) = tokio::sync::watch::channel(status);
handle.spawn(async move {
let listener = UnixListener::bind(path).expect("Could not create IPC domain socket");
Self::worker(listener, ev, rx.clone()).await;
});

Ok(IpcSocket { tx })
}

pub async fn worker(&self) {
pub fn publish(&self, event: &PlayerEvent, playable: Option<Playable>) {
let status = Status {
mode: event.clone(),
playable,
};
self.tx.send(status).expect("Error publishing IPC update");
}

async fn worker(listener: UnixListener, ev: EventManager, tx: Receiver<Status>) {
loop {
match self.listener.accept().await {
match listener.accept().await {
Ok((stream, sockaddr)) => {
debug!("Connection from {:?}", sockaddr);
tokio::spawn(Self::stream_handler(stream, self.ev.clone()));
tokio::spawn(Self::stream_handler(
stream,
ev.clone(),
WatchStream::new(tx.clone()),
));
}
Err(e) => error!("Error accepting connection: {e}"),
}
}
}

async fn stream_handler(mut stream: UnixStream, ev: EventManager) -> io::Result<()> {
let (reader, _writer) = stream.split();
async fn stream_handler(
mut stream: UnixStream,
ev: EventManager,
mut rx: WatchStream<Status>,
) -> Result<(), String> {
let (reader, writer) = stream.split();
let mut framed_reader = FramedRead::new(reader, LinesCodec::new());
let mut framed_writer = FramedWrite::new(writer, LinesCodec::new());

loop {
if let Some(line) = framed_reader.next().await {
match line {
Ok(line) => {
debug!("Received line: \"{line}\"");
ev.send(Event::IpcInput(line));
tokio::select! {
line = framed_reader.next() => {
match line {
Some(Ok(line)) => {
debug!("Received line: \"{line}\"");
ev.send(Event::IpcInput(line));
}
Some(Err(e)) => error!("Error reading line: {e}"),
None => {
debug!("Closing IPC connection");
return Ok(())
}
}
Err(e) => error!("Error reading line: {e}"),
}
} else {
debug!("Closing connection");
return Ok(());
Some(status) = rx.next() => {
debug!("IPC Status update: {status:?}");
let status_str = serde_json::to_string(&status).map_err(|e| e.to_string())?;
framed_writer.send(status_str).await.map_err(|e| e.to_string())?;
}
else => {
error!("All streams are closed");
return Ok(())
}
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,15 +355,14 @@ fn main() -> Result<(), String> {
let mut signals = Signals::new([SIGTERM, SIGHUP]).expect("could not register signal handler");

#[cfg(unix)]
{
let ipc = ipc::IpcSocket::new(
let ipc = {
ipc::IpcSocket::new(
ASYNC_RUNTIME.handle(),
cache_path("ncspot.sock"),
event_manager.clone(),
)
.map_err(|e| e.to_string())?;
ASYNC_RUNTIME.spawn(async move { ipc.worker().await });
}
.map_err(|e| e.to_string())?
};

// cursive event loop
while cursive.is_running() {
Expand All @@ -386,6 +385,9 @@ fn main() -> Result<(), String> {
#[cfg(feature = "mpris")]
mpris_manager.update();

#[cfg(unix)]
ipc.publish(&state, queue.get_current());

if state == PlayerEvent::FinishedTrack {
queue.next(false);
}
Expand Down
2 changes: 1 addition & 1 deletion src/spotify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::ASYNC_RUNTIME;

pub const VOLUME_PERCENT: u16 = ((u16::max_value() as f64) * 1.0 / 100.0) as u16;

#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub enum PlayerEvent {
Playing(SystemTime),
Paused(Duration),
Expand Down

0 comments on commit 3888988

Please sign in to comment.