Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ default-members = ["lighthouse-client"]
resolver = "2"

[workspace.package]
version = "3.4.0"
version = "3.4.1"
edition = "2021"
license = "MIT"

[workspace.dependencies]
lighthouse-protocol = { version = "^3.4.0", path = "lighthouse-protocol" }
lighthouse-protocol = { version = "^3.4.1", path = "lighthouse-protocol" }
2 changes: 1 addition & 1 deletion lighthouse-client/examples/admin_crud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;
use lighthouse_client::{protocol::Authentication, Error, Lighthouse, Result, TokioWebSocket, LIGHTHOUSE_URL};
use tracing::{info, info_span, Instrument};

async fn run(mut lh: Lighthouse<TokioWebSocket>) -> Result<()> {
async fn run(lh: Lighthouse<TokioWebSocket>) -> Result<()> {
info!("Connected to the Lighthouse server");

async {
Expand Down
2 changes: 1 addition & 1 deletion lighthouse-client/examples/admin_get_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;
use lighthouse_client::{protocol::Authentication, Lighthouse, Result, TokioWebSocket, LIGHTHOUSE_URL};
use tracing::info;

async fn run(mut lh: Lighthouse<TokioWebSocket>) -> Result<()> {
async fn run(lh: Lighthouse<TokioWebSocket>) -> Result<()> {
info!("Connected to the Lighthouse server");

let metrics = lh.get_laser_metrics().await?.payload;
Expand Down
6 changes: 3 additions & 3 deletions lighthouse-client/examples/admin_list_root.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use clap::Parser;
use lighthouse_client::{protocol::Authentication, Lighthouse, Result, TokioWebSocket, LIGHTHOUSE_URL};
use lighthouse_client::{protocol::Authentication, root, Lighthouse, Result, TokioWebSocket, LIGHTHOUSE_URL};
use tracing::info;

async fn run(mut lh: Lighthouse<TokioWebSocket>) -> Result<()> {
async fn run(lh: Lighthouse<TokioWebSocket>) -> Result<()> {
info!("Connected to the Lighthouse server");

let tree = lh.list(&[]).await?.payload;
let tree = lh.list(root![]).await?.payload;
info!("Got {}", tree);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion lighthouse-client/examples/black.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use clap::Parser;
use lighthouse_client::{protocol::{Authentication, Color, Frame}, Lighthouse, Result, TokioWebSocket, LIGHTHOUSE_URL};
use tracing::info;

async fn run(mut lh: Lighthouse<TokioWebSocket>) -> Result<()> {
async fn run(lh: Lighthouse<TokioWebSocket>) -> Result<()> {
info!("Connected to the Lighthouse server");

lh.put_model(Frame::fill(Color::BLACK)).await?;
Expand Down
2 changes: 1 addition & 1 deletion lighthouse-client/examples/disco.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tracing::info;
use tokio::time;
use std::time::Duration;

async fn run(mut lh: Lighthouse<TokioWebSocket>) -> Result<()> {
async fn run(lh: Lighthouse<TokioWebSocket>) -> Result<()> {
info!("Connected to the Lighthouse server");

loop {
Expand Down
2 changes: 1 addition & 1 deletion lighthouse-client/examples/input_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use lighthouse_client::{protocol::Authentication, Lighthouse, Result, TokioWebSo
use lighthouse_protocol::Model;
use tracing::info;

async fn run(mut lh: Lighthouse<TokioWebSocket>) -> Result<()> {
async fn run(lh: Lighthouse<TokioWebSocket>) -> Result<()> {
info!("Connected to the Lighthouse server");

// Stream input events
Expand Down
4 changes: 2 additions & 2 deletions lighthouse-client/examples/snake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl State {
}
}

async fn run_updater(mut lh: Lighthouse<TokioWebSocket>, shared_state: Arc<Mutex<State>>) -> Result<()> {
async fn run_updater(lh: Lighthouse<TokioWebSocket>, shared_state: Arc<Mutex<State>>) -> Result<()> {
loop {
// Update the snake and render it
let frame = {
Expand Down Expand Up @@ -190,7 +190,7 @@ async fn main() -> Result<()> {
let auth = Authentication::new(&args.username, &args.token);
let state = Arc::new(Mutex::new(State::new()));

let mut lh = Lighthouse::connect_with_tokio_to(&args.url, auth).await?;
let lh = Lighthouse::connect_with_tokio_to(&args.url, auth).await?;
info!("Connected to the Lighthouse server");

let stream = lh.stream_model().await?;
Expand Down
2 changes: 1 addition & 1 deletion lighthouse-client/examples/stress_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use lighthouse_client::{protocol::{Authentication, Frame}, Lighthouse, Result, T
use tokio::time::{self, Instant};
use tracing::info;

async fn run(mut lh: Lighthouse<TokioWebSocket>, delay_ms: Option<u64>) -> Result<()> {
async fn run(lh: Lighthouse<TokioWebSocket>, delay_ms: Option<u64>) -> Result<()> {
info!("Connected to the Lighthouse server");

let mut last_second = Instant::now();
Expand Down
8 changes: 8 additions & 0 deletions lighthouse-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,11 @@ pub use lighthouse::*;
pub use spawn::*;

pub use lighthouse_protocol as protocol;

/// Small convenience macro that expresses the root path.
#[macro_export]
macro_rules! root {
() => {
&[] as &[&str]
};
}
119 changes: 80 additions & 39 deletions lighthouse-client/src/lighthouse.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc, fmt::Debug};
use std::{collections::HashMap, fmt::Debug, sync::{atomic::{AtomicI32, Ordering}, Arc}};

use async_tungstenite::tungstenite::{Message, self};
use futures::{prelude::*, channel::mpsc::{Sender, self}, stream::{SplitSink, SplitStream}, lock::Mutex};
Expand All @@ -11,13 +11,13 @@ use crate::{Check, Error, Result, Spawner};
/// A connection to the lighthouse server for sending requests and receiving events.
pub struct Lighthouse<S> {
/// The sink-part of the WebSocket connection.
ws_sink: SplitSink<S, Message>,
ws_sink: Arc<Mutex<SplitSink<S, Message>>>,
/// The response/event slots, keyed by request id.
slots: Arc<Mutex<HashMap<i32, Slot<ServerMessage<Value>>>>>,
/// The credentials used to authenticate with the lighthouse.
authentication: Authentication,
/// The next request id. Incremented on every request.
request_id: i32,
request_id: Arc<AtomicI32>,
}

/// A facility for coordinating asynchronous responses to a request between a
Expand Down Expand Up @@ -47,10 +47,10 @@ impl<S> Lighthouse<S>
let (ws_sink, ws_stream) = web_socket.split();
let slots = Arc::new(Mutex::new(HashMap::new()));
let lh = Self {
ws_sink,
ws_sink: Arc::new(Mutex::new(ws_sink)),
slots: slots.clone(),
authentication,
request_id: 0,
request_id: Arc::new(AtomicI32::new(0)),
};
W::spawn(Self::run_receive_loop(ws_stream, slots));
Ok(lh)
Expand Down Expand Up @@ -119,110 +119,131 @@ impl<S> Lighthouse<S>
}

/// Replaces the user's lighthouse model with the given frame.
pub async fn put_model(&mut self, frame: Frame) -> Result<ServerMessage<()>> {
pub async fn put_model(&self, frame: Frame) -> Result<ServerMessage<()>> {
let username = self.authentication.username.clone();
self.put(&["user", username.as_str(), "model"], Model::Frame(frame)).await
self.put(&["user".into(), username, "model".into()], Model::Frame(frame)).await
}

/// Requests a stream of events (including key/controller events) for the user's lighthouse model.
pub async fn stream_model(&mut self) -> Result<impl Stream<Item = Result<ServerMessage<Model>>>> {
pub async fn stream_model(&self) -> Result<impl Stream<Item = Result<ServerMessage<Model>>>> {
let username = self.authentication.username.clone();
self.stream(&["user", username.as_str(), "model"], ()).await
self.stream(&["user".into(), username, "model".into()], ()).await
}

/// Fetches lamp server metrics.
pub async fn get_laser_metrics(&mut self) -> Result<ServerMessage<LaserMetrics>> {
pub async fn get_laser_metrics(&self) -> Result<ServerMessage<LaserMetrics>> {
self.get(&["metrics", "laser"]).await
}

/// Combines PUT and CREATE. Requires CREATE and WRITE permission.
pub async fn post<P>(&mut self, path: &[&str], payload: P) -> Result<ServerMessage<()>>
pub async fn post<P>(&self, path: &[impl AsRef<str> + Debug], payload: P) -> Result<ServerMessage<()>>
where
P: Serialize {
self.perform(&Verb::Post, path, payload).await
}

/// Updates the resource at the given path with the given payload. Requires WRITE permission.
pub async fn put<P>(&mut self, path: &[&str], payload: P) -> Result<ServerMessage<()>>
pub async fn put<P>(&self, path: &[impl AsRef<str> + Debug], payload: P) -> Result<ServerMessage<()>>
where
P: Serialize {
self.perform(&Verb::Put, path, payload).await
}

/// Creates a resource at the given path. Requires CREATE permission.
pub async fn create(&mut self, path: &[&str]) -> Result<ServerMessage<()>> {
pub async fn create(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Create, path, ()).await
}

/// Deletes a resource at the given path. Requires DELETE permission.
pub async fn delete(&mut self, path: &[&str]) -> Result<ServerMessage<()>> {
pub async fn delete(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Delete, path, ()).await
}

/// Creates a directory at the given path. Requires CREATE permission.
pub async fn mkdir(&mut self, path: &[&str]) -> Result<ServerMessage<()>> {
pub async fn mkdir(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Mkdir, path, ()).await
}

/// Lists the directory tree at the given path. Requires READ permission.
pub async fn list(&mut self, path: &[&str]) -> Result<ServerMessage<DirectoryTree>> {
pub async fn list(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<DirectoryTree>> {
self.perform(&Verb::List, path, ()).await
}

/// Gets the resource at the given path. Requires READ permission.
pub async fn get<R>(&mut self, path: &[&str]) -> Result<ServerMessage<R>>
pub async fn get<R>(&self, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<R>>
where
R: for<'de> Deserialize<'de> {
self.perform(&Verb::Get, path, ()).await
}

/// Links the given source to the given destination path.
pub async fn link(&mut self, src_path: &[&str], dest_path: &[&str]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Link, dest_path, src_path).await
pub async fn link(&self, src_path: &[impl AsRef<str> + Debug], dest_path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Link, dest_path, src_path.iter().map(|s| s.as_ref().to_owned()).collect::<Vec<_>>()).await
}

/// Unlinks the given source from the given destination path.
pub async fn unlink(&mut self, src_path: &[&str], dest_path: &[&str]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Unlink, dest_path, src_path).await
pub async fn unlink(&self, src_path: &[impl AsRef<str> + Debug], dest_path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Unlink, dest_path, src_path.iter().map(|s| s.as_ref().to_owned()).collect::<Vec<_>>()).await
}

/// Stops the given stream.
pub async fn stop(&mut self, path: &[&str]) -> Result<ServerMessage<()>> {
self.perform(&Verb::Stop, path, ()).await
/// Stops the given stream. **Should generally not be called manually**,
/// since streams will automatically be stopped once dropped.
pub async fn stop(&self, request_id: i32, path: &[impl AsRef<str> + Debug]) -> Result<ServerMessage<()>> {
self.perform_with_id(request_id, &Verb::Stop, path, ()).await
}

/// Performs a single request to the given path with the given payload.
#[tracing::instrument(skip(self, payload))]
pub async fn perform<P, R>(&mut self, verb: &Verb, path: &[&str], payload: P) -> Result<ServerMessage<R>>
pub async fn perform<P, R>(&self, verb: &Verb, path: &[impl AsRef<str> + Debug], payload: P) -> Result<ServerMessage<R>>
where
P: Serialize,
R: for<'de> Deserialize<'de> {
let request_id = self.next_request_id();
self.perform_with_id(request_id, verb, path, payload).await
}

/// Performs a single request to the given path with the given request id.
#[tracing::instrument(skip(self, payload))]
async fn perform_with_id<P, R>(&self, request_id: i32, verb: &Verb, path: &[impl AsRef<str> + Debug], payload: P) -> Result<ServerMessage<R>>
where
P: Serialize,
R: for<'de> Deserialize<'de> {
assert_ne!(verb, &Verb::Stream, "Lighthouse::perform may only be used for one-off requests, use Lighthouse::stream for streaming.");
let request_id = self.send_request(verb, path, payload).await?;
self.send_request(request_id, verb, path, payload).await?;
let response = self.receive_single(request_id).await?.check()?.decode_payload()?;
Ok(response)
}

/// Performs a STREAM request to the given path with the given payload.
/// Automatically sends a STOP once dropped.
#[tracing::instrument(skip(self, payload))]
pub async fn stream<P, R>(&mut self, path: &[&str], payload: P) -> Result<impl Stream<Item = Result<ServerMessage<R>>>>
pub async fn stream<P, R>(&self, path: &[impl AsRef<str> + Debug], payload: P) -> Result<impl Stream<Item = Result<ServerMessage<R>>>>
where
P: Serialize,
R: for<'de> Deserialize<'de> {
let request_id = self.send_request(&Verb::Stream, path, payload).await?;
let request_id = self.next_request_id();
let path: Vec<String> = path.into_iter().map(|s| s.as_ref().to_string()).collect();
self.send_request(request_id, &Verb::Stream, &path, payload).await?;
let stream = self.receive_streaming(request_id).await?;
// TODO: Send STOP once dropped
Ok(stream)
Ok(stream.guard({
// Stop the stream on drop
let this = (*self).clone();
move || {
tokio::spawn(async move {
if let Err(error) = this.stop(request_id, &path).await {
error! { ?path, %error, "Could not STOP stream" };
}
});
}
}))
}

/// Sends a request to the given path with the given payload.
async fn send_request<P>(&mut self, verb: &Verb, path: &[&str], payload: P) -> Result<i32>
async fn send_request<P>(&self, request_id: i32, verb: &Verb, path: &[impl AsRef<str> + Debug], payload: P) -> Result<i32>
where
P: Serialize {
let path = path.into_iter().map(|s| s.to_string()).collect();
let request_id = self.request_id;
let path = path.into_iter().map(|s| s.as_ref().to_string()).collect();
debug! { %request_id, "Sending request" };
self.request_id += 1;
self.send_message(&ClientMessage {
request_id,
authentication: self.authentication.clone(),
Expand All @@ -235,7 +256,7 @@ impl<S> Lighthouse<S>
}

/// Sends a generic message to the lighthouse.
async fn send_message<P>(&mut self, message: &ClientMessage<P>) -> Result<()>
async fn send_message<P>(&self, message: &ClientMessage<P>) -> Result<()>
where
P: Serialize {
self.send_raw(rmp_serde::to_vec_named(message)?).await
Expand Down Expand Up @@ -291,8 +312,13 @@ impl<S> Lighthouse<S>
}

/// Sends raw bytes to the lighthouse via the WebSocket connection.
async fn send_raw(&mut self, bytes: impl Into<Vec<u8>> + Debug) -> Result<()> {
Ok(self.ws_sink.send(Message::Binary(bytes.into())).await?)
async fn send_raw(&self, bytes: impl Into<Vec<u8>> + Debug) -> Result<()> {
Ok(self.ws_sink.lock().await.send(Message::Binary(bytes.into())).await?)
}

/// Fetches the next request id.
fn next_request_id(&self) -> i32 {
self.request_id.fetch_add(1, Ordering::Relaxed)
}

/// Fetches the credentials used to authenticate with the lighthouse.
Expand All @@ -303,7 +329,22 @@ impl<S> Lighthouse<S>
/// Closes the WebSocket connection gracefully with a close message. While
/// the server will usually also handle abruptly closed connections
/// properly, it is recommended to always close the [``Lighthouse``].
pub async fn close(&mut self) -> Result<()> {
Ok(self.ws_sink.close().await?)
pub async fn close(&self) -> Result<()> {
Ok(self.ws_sink.lock().await.close().await?)
}
}

// For some reason `#[derive(Clone)]` adds the trait bound `S: Clone`, despite
// not actually being needed since the WebSocket sink is already wrapped in an
// `Arc`, therefore we implement `Clone` manually.

impl<S> Clone for Lighthouse<S> {
fn clone(&self) -> Self {
Self {
ws_sink: self.ws_sink.clone(),
slots: self.slots.clone(),
authentication: self.authentication.clone(),
request_id: self.request_id.clone(),
}
}
}