diff --git a/Cargo.lock b/Cargo.lock index af7b163..fb619bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -686,6 +686,7 @@ dependencies = [ "serde_json", "socketcan", "testcontainers", + "toml 1.1.2+spec-1.1.0", ] [[package]] @@ -2547,10 +2548,12 @@ version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee" dependencies = [ + "indexmap 2.14.0", "serde_core", "serde_spanned", "toml_datetime 1.1.1+spec-1.1.0", "toml_parser", + "toml_writer", "winnow 1.0.1", ] @@ -2581,6 +2584,12 @@ dependencies = [ "winnow 1.0.1", ] +[[package]] +name = "toml_writer" +version = "1.1.1+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db" + [[package]] name = "tonic" version = "0.14.5" diff --git a/Cargo.toml b/Cargo.toml index f024f09..f6b371a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ serde = { version = "1.0.228", features = ["derive"] } # Test-only helper binary dependencies (enabled via feature) caps = { version = "0.5", optional = true } libc = { version = "0.2", optional = true } +toml = "1.1.2" [features] # Enables the `ferroflow-vcan` helper binary used by integration tests. diff --git a/README.md b/README.md index 144bc8d..39f0842 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # Ferroflow -Ferroflow is the new control software for all Liquid Rocketry projects at the TU Wien Space Team. + +Ferroflow is the new control software for all Liquid Rocketry projects at the TU Wien Space Team. It interfaces with our custom Engine Control Units ECUs, through our custom [LiquidCAN protocol](https://github.com/SpaceTeam/LiquidCAN/). On the other end, it provides a high-level API for our [ECUI](https://github.com/SpaceTeam/web_ecui_houbolt), which is the user interface for our ECUs. @@ -10,33 +11,60 @@ On the other end, it provides a high-level API for our [ECUI](https://github.com Some integration tests talk to the ECUemulator over SocketCAN. For that you use a virtual CAN interface. ### Test helper: `ferroflow-vcan` + For test environments, this repo provides a small helper binary that can be granted `CAP_NET_ADMIN` once via `setcap`. Integration tests will automatically use it (if it’s available on `PATH`) to create/delete `vcan` interfaces without sudo. Build the helper (feature-gated; not part of normal builds): + ```bash cargo build --release --features test-vcan --bin ferroflow-vcan ``` + Put it on PATH (recommended for tests): + ```bash install -m 0755 ./target/release/ferroflow-vcan ~/.local/bin/ferroflow-vcan sudo setcap cap_net_admin+ep ~/.local/bin/ferroflow-vcan ``` Manual usage: + ```bash ferroflow-vcan up vcan0 ferroflow-vcan down vcan0 ``` - ## Development +### Mapping Configuration + +`mapping_path` in `config.yml` points to a directory containing `.toml` files, which are loaded in sorted order and validated together. + +Mappings are grouped by node name: + +```toml +[[mapping.FuelECU]] +name = "fuel_level" +type = "telemetry" +raw_field = "level_adc" +value = { slope = 0.5, offset = 1.0, unit = "mAh" } + +logical = [ + { range = { min = 100 }, value = "High" }, + { range = { min = 50, max = 100 }, value = "Normal" }, + { range = { max = 50 }, value = "Low" }, +] +``` + +The repository includes [schemas/mapping.schema.json](schemas/mapping.schema.json) and [taplo.toml](taplo.toml) so Taplo-compatible editors, including VS Code with Even Better TOML, can validate mapping files before the application loads them. The schema is associated with `mapping.toml` files and TOML files under `mapping/` or `mappings/` directories. + ### Running CI Checks The repository includes a CI script (`ci-rust.sh`) that runs all quality checks on the Rust implementation. This script is used both locally and in GitHub Actions **Run all checks:** + ```bash ./ci-rust.sh # or explicitly @@ -44,17 +72,20 @@ The repository includes a CI script (`ci-rust.sh`) that runs all quality checks ``` **Run individual checks:** + ```bash ./ci-rust.sh build # Build the project ./ci-rust.sh test # Run tests ./ci-rust.sh fmt # Check code formatting ./ci-rust.sh clippy # Run clippy linter ``` + You can fix formatting or linter issues by adding the -fix suffix to the command. e.g: `./ci-rust.sh clippy-fix` ### Running `fmt` and `clippy` as a pre-commit hook A pre-commit hook script is available in `.githooks`, which executes the CI script with `fmt` and `clippy` only and without the `fix` option. To setup the hook, configure git to use the `.githooks` directory and make the `pre-commit` file executable. + ```bash git config core.hooksPath .githooks chmod u+x .githooks/pre-commit @@ -66,6 +97,7 @@ chmod u+x .githooks/pre-commit We use TimescaleDB, which is an extension of PostgreSQL optimized for time-series data. You can install it by following the instructions on the [TimescaleDB installation page](https://docs.timescale.com/install/latest/). Using docker is recommended for local development (if you already have another instance of postgres running, use e.g. `-p 5433:5432` instead of `-p 5432:5432`): + ```bash docker run -d --name timescaledb -p 5432:5432 -e POSTGRES_PASSWORD=yourpassword timescale/timescaledb:latest-pg18 ``` @@ -76,6 +108,7 @@ The project uses Diesel for database interactions. Diesel CLI is recommended for **Running Diesel CLI** Here's some common commands: + ```bash export DATABASE_URL=postgres://postgres:yourpassword@localhost:5432/ferroflow # Set the database URL diesel setup # Set up the database @@ -97,4 +130,4 @@ Database tests use `testcontainers` to start a temporary TimescaleDB/PostgreSQL There are two examples in the repository: - a unit test in `src/db/mod.rs` -- an integration test in `tests/db_logging.rs` \ No newline at end of file +- an integration test in `tests/db_logging.rs` diff --git a/schemas/mapping.schema.json b/schemas/mapping.schema.json new file mode 100644 index 0000000..c0abe5c --- /dev/null +++ b/schemas/mapping.schema.json @@ -0,0 +1,117 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://spaceteam.at/ferroflow/schemas/mapping.schema.json", + "title": "FerroFlow Mapping", + "description": "TOML schema for FerroFlow node mapping files.", + "type": "object", + "required": ["mapping"], + "additionalProperties": false, + "properties": { + "mapping": { + "type": "object", + "description": "Mappings grouped by LiquidCAN node/device name.", + "minProperties": 1, + "additionalProperties": false, + "patternProperties": { + ".+": { + "type": "array", + "minItems": 1, + "items": { + "$ref": "#/$defs/mappingEntry" + } + } + } + } + }, + "$defs": { + "mappingEntry": { + "type": "object", + "additionalProperties": false, + "required": ["name", "type", "raw_field"], + "properties": { + "name": { + "type": "string", + "minLength": 1, + "description": "Unique application-facing mapping name." + }, + "type": { + "type": "string", + "enum": ["telemetry", "parameter"], + "description": "Whether the raw field is telemetry or a writable parameter." + }, + "raw_field": { + "type": "string", + "minLength": 1, + "description": "Raw LiquidCAN field name on the enclosing node." + }, + "value": { + "$ref": "#/$defs/valueParams" + }, + "logical": { + "type": "array", + "description": "Logical labels for mapped numeric ranges. Runtime validation requires these ranges to be non-overlapping.", + "items": { + "$ref": "#/$defs/logicalRule" + } + } + } + }, + "valueParams": { + "type": "object", + "additionalProperties": false, + "required": ["slope", "offset"], + "properties": { + "slope": { + "type": "number", + "not": { "const": 0 }, + "default": 1.0, + "description": "Linear conversion slope: mapped = raw * slope + offset." + }, + "offset": { + "type": "number", + "default": 0.0, + "description": "Linear conversion offset: mapped = raw * slope + offset." + }, + "unit": { + "type": "string", + "default": "" + } + } + }, + "logicalRule": { + "type": "object", + "additionalProperties": false, + "required": ["range", "value"], + "properties": { + "range": { + "$ref": "#/$defs/logicalRange" + }, + "value": { + "description": "Logical value returned when the mapped numeric value is inside the range." + } + } + }, + "logicalRange": { + "type": "object", + "additionalProperties": false, + "properties": { + "min": { + "type": "number", + "description": "Lower bound. Omit for an unbounded lower range." + }, + "max": { + "type": "number", + "description": "Upper bound. Omit for an unbounded upper range." + }, + "min_inclusive": { + "type": "boolean", + "default": true + }, + "max_inclusive": { + "type": "boolean", + "default": false + } + } + } + } +} diff --git a/src/bin/ff-socket-cli.rs b/src/bin/ff-socket-cli.rs new file mode 100644 index 0000000..a2192c7 --- /dev/null +++ b/src/bin/ff-socket-cli.rs @@ -0,0 +1,345 @@ +use std::{ + io::{self, ErrorKind, Read, Write}, + net::{SocketAddr, TcpListener, TcpStream}, + sync::mpsc::{self, TryRecvError}, + thread, + time::Duration, +}; + +use anyhow::{Context, Result, bail}; +use serde_json::{Value, json}; + +const DEFAULT_ADDR: &str = "127.0.0.1:8080"; +const POLL_PERIOD: Duration = Duration::from_millis(50); + +fn main() -> Result<()> { + let addr = parse_addr()?; + let listener = TcpListener::bind(addr).with_context(|| format!("failed to bind {addr}"))?; + listener.set_nonblocking(true)?; + + println!("FerroFlow socket CLI listening on {addr}"); + print_help(); + + let (tx, rx) = mpsc::channel(); + thread::spawn(move || stdin_thread(tx)); + + loop { + match wait_for_connection(&listener, &rx)? { + WaitResult::Connected(stream) => { + println!("connected: {}", stream.peer_addr()?); + match connection_loop(stream, &rx)? { + ConnectionResult::Disconnected => { + println!("disconnected; waiting for FerroFlow to reconnect"); + } + ConnectionResult::Quit => return Ok(()), + } + } + WaitResult::Quit => return Ok(()), + } + } +} + +fn parse_addr() -> Result { + let mut args = std::env::args().skip(1); + let mut addr = DEFAULT_ADDR.to_string(); + + while let Some(arg) = args.next() { + match arg.as_str() { + "-a" | "--addr" => { + addr = args + .next() + .with_context(|| format!("{arg} requires an address"))?; + } + "-h" | "--help" => { + println!("Usage: cargo run --bin ff-socket-cli -- [--addr 127.0.0.1:8080]"); + std::process::exit(0); + } + other => addr = other.to_string(), + } + } + + addr.parse() + .with_context(|| format!("invalid socket address {addr}")) +} + +fn stdin_thread(tx: mpsc::Sender) { + let stdin = io::stdin(); + loop { + print!("ff> "); + let _ = io::stdout().flush(); + + let mut line = String::new(); + match stdin.read_line(&mut line) { + Ok(0) => { + let _ = tx.send(CliCommand::Quit); + return; + } + Ok(_) => match parse_command(line.trim()) { + Ok(Some(command)) => { + if tx.send(command).is_err() { + return; + } + } + Ok(None) => {} + Err(error) => eprintln!("command error: {error:#}"), + }, + Err(error) => { + eprintln!("stdin error: {error}"); + let _ = tx.send(CliCommand::Quit); + return; + } + } + } +} + +fn parse_command(line: &str) -> Result> { + let mut parts = line.split_whitespace(); + let Some(command) = parts.next() else { + return Ok(None); + }; + + match command { + "help" | "h" | "?" => Ok(Some(CliCommand::Help)), + "quit" | "q" | "exit" => Ok(Some(CliCommand::Quit)), + "nodes" | "get-nodes" => Ok(Some(CliCommand::Send(json!({ + "type": "get_nodes", + "content": {} + })))), + "telemetry" | "get-telemetry" => Ok(Some(CliCommand::Send(json!({ + "type": "get_telemetry", + "content": {} + })))), + "get-mapped" => { + let name = next_arg(&mut parts, "mapped field name")?; + Ok(Some(CliCommand::Send(json!({ + "type": "get_field", + "content": { + "field": { + "type": "mapped", + "name": name + } + } + })))) + } + "get-raw" => { + let node_name = next_arg(&mut parts, "node name")?; + let field_name = next_arg(&mut parts, "raw field name")?; + Ok(Some(CliCommand::Send(json!({ + "type": "get_field", + "content": { + "field": { + "type": "raw", + "node_name": node_name, + "field_name": field_name + } + } + })))) + } + "set-mapped" => { + let name = next_arg(&mut parts, "mapped parameter name")?; + let value = next_arg(&mut parts, "numeric value")? + .parse::() + .with_context(|| "mapped parameter value must be numeric")?; + Ok(Some(CliCommand::Send(json!({ + "type": "set_parameter", + "content": { + "field": { + "type": "mapped", + "name": name + }, + "value": value + } + })))) + } + "set-raw" => { + let node_name = next_arg(&mut parts, "node name")?; + let field_name = next_arg(&mut parts, "raw field name")?; + let value_text = parts.collect::>().join(" "); + if value_text.is_empty() { + bail!("missing JSON value"); + } + let value = serde_json::from_str::(&value_text) + .with_context(|| "raw parameter value must be valid JSON")?; + Ok(Some(CliCommand::Send(json!({ + "type": "set_parameter", + "content": { + "field": { + "type": "raw", + "node_name": node_name, + "field_name": field_name + }, + "value": value + } + })))) + } + "send-json" | "send" => { + let message = line.strip_prefix(command).unwrap_or_default().trim_start(); + if message.is_empty() { + bail!("missing JSON message"); + } + Ok(Some(CliCommand::Send( + serde_json::from_str(message).with_context(|| "invalid JSON message")?, + ))) + } + other => bail!("unknown command {other}; type 'help'"), + } +} + +fn next_arg<'a>( + parts: &mut impl Iterator, + description: &'static str, +) -> Result<&'a str> { + parts + .next() + .with_context(|| format!("missing {description}")) +} + +fn wait_for_connection( + listener: &TcpListener, + rx: &mpsc::Receiver, +) -> Result { + loop { + match listener.accept() { + Ok((stream, _)) => return Ok(WaitResult::Connected(stream)), + Err(error) if error.kind() == ErrorKind::WouldBlock => {} + Err(error) => return Err(error).with_context(|| "failed to accept connection"), + } + + match rx.try_recv() { + Ok(CliCommand::Quit) => return Ok(WaitResult::Quit), + Ok(CliCommand::Help) => print_help(), + Ok(CliCommand::Send(_)) => { + eprintln!("not connected yet; command was ignored"); + } + Err(TryRecvError::Empty) => {} + Err(TryRecvError::Disconnected) => return Ok(WaitResult::Quit), + } + + thread::sleep(POLL_PERIOD); + } +} + +fn connection_loop( + mut stream: TcpStream, + rx: &mpsc::Receiver, +) -> Result { + stream.set_nonblocking(true)?; + let mut buffer = Vec::new(); + + loop { + match read_messages(&mut stream, &mut buffer)? { + ReadState::Open(messages) => { + for message in messages { + print_received_message(&message); + } + } + ReadState::Closed => return Ok(ConnectionResult::Disconnected), + } + + loop { + match rx.try_recv() { + Ok(CliCommand::Send(message)) => { + send_message(&mut stream, &message)?; + println!("<- {}", compact_json(&message)); + } + Ok(CliCommand::Help) => print_help(), + Ok(CliCommand::Quit) => return Ok(ConnectionResult::Quit), + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => return Ok(ConnectionResult::Quit), + } + } + + thread::sleep(POLL_PERIOD); + } +} + +fn read_messages(stream: &mut TcpStream, buffer: &mut Vec) -> Result { + let mut scratch = [0_u8; 4096]; + loop { + match stream.read(&mut scratch) { + Ok(0) => return Ok(ReadState::Closed), + Ok(bytes_read) => buffer.extend_from_slice(&scratch[..bytes_read]), + Err(error) if error.kind() == ErrorKind::Interrupted => continue, + Err(error) if error.kind() == ErrorKind::WouldBlock => break, + Err(error) => return Err(error).with_context(|| "failed to read socket"), + } + } + + let mut messages = Vec::new(); + while buffer.len() >= 2 { + let len = u16::from_be_bytes([buffer[0], buffer[1]]) as usize; + if buffer.len() < len + 2 { + break; + } + + messages.push(buffer[2..len + 2].to_vec()); + buffer.drain(..len + 2); + } + + Ok(ReadState::Open(messages)) +} + +fn send_message(stream: &mut TcpStream, message: &Value) -> Result<()> { + let payload = serde_json::to_vec(message)?; + let len = u16::try_from(payload.len()) + .with_context(|| format!("message too large: {} bytes", payload.len()))?; + + stream.write_all(&len.to_be_bytes())?; + stream.write_all(&payload)?; + Ok(()) +} + +fn print_received_message(message: &[u8]) { + match serde_json::from_slice::(message) { + Ok(value) => println!("-> {}", pretty_json(&value)), + Err(error) => { + println!("-> "); + println!("{}", String::from_utf8_lossy(message)); + } + } +} + +fn pretty_json(value: &Value) -> String { + serde_json::to_string_pretty(value).unwrap_or_else(|_| value.to_string()) +} + +fn compact_json(value: &Value) -> String { + serde_json::to_string(value).unwrap_or_else(|_| value.to_string()) +} + +fn print_help() { + println!( + r#"commands: + nodes request current nodes + telemetry request full telemetry snapshot + get-mapped request mapped field value + get-raw request raw field by device_name + raw field name + set-mapped set mapped parameter using a numeric mapped value + set-raw set raw parameter using a JSON value + send-json send a complete protocol message + help show this help + quit exit +"# + ); +} + +enum CliCommand { + Send(Value), + Help, + Quit, +} + +enum WaitResult { + Connected(TcpStream), + Quit, +} + +enum ConnectionResult { + Disconnected, + Quit, +} + +enum ReadState { + Open(Vec>), + Closed, +} diff --git a/src/config/mod.rs b/src/config/mod.rs index 3c04e19..60daa59 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -9,6 +9,28 @@ pub struct Config { pub can_bus_interfaces: Vec, pub heartbeat_period: u64, pub database_url: String, + pub mapping_path: String, + #[serde(default)] + pub webserver_socket: WebserverSocketConfig, +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +pub struct WebserverSocketConfig { + pub enabled: bool, + pub host: String, + pub port: u16, + pub reconnect_period_ms: u64, +} + +impl Default for WebserverSocketConfig { + fn default() -> Self { + Self { + enabled: false, + host: "127.0.0.1".to_string(), + port: 8080, + reconnect_period_ms: 3000, + } + } } pub fn load_config(path: &str) -> Result { diff --git a/src/db/mod.rs b/src/db/mod.rs index 4b65b83..41df1bf 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -39,7 +39,7 @@ pub fn spawn_logging_worker<'a>( loop { match rx.recv_timeout(flush_timeout) { - Ok(Event::NodeFieldUpdated(log)) => { + Ok(Event::NodeFieldUpdated(log, _)) => { batch.push(log); if batch.len() < batch_size_limit { continue; diff --git a/src/events/mod.rs b/src/events/mod.rs index 4aa5ae4..65d203f 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -12,7 +12,8 @@ pub enum Event { id: CanMessageId, message: CanMessage, }, - NodeFieldUpdated(crate::db::FieldLog), + NodeFieldUpdated(crate::db::FieldLog, NodeFieldUpdateSource), + NodeListUpdated, Shutdown, #[allow(unused)] SendCanMessage { @@ -25,10 +26,18 @@ pub enum Event { }, } +#[derive(Debug, Copy, Clone)] +pub enum NodeFieldUpdateSource { + FieldGetRes, + ParameterSetConfirmation, + TelemetryUpdate, +} + #[derive(Debug, Hash, Eq, PartialEq)] pub enum EventKind { CanMessageReceived, NodeFieldUpdated, + NodeListUpdated, Shutdown, SendCanMessage, RelayCanMessage, @@ -38,7 +47,8 @@ impl From for EventKind { fn from(value: Event) -> Self { match value { Event::CanMessageReceived { .. } => EventKind::CanMessageReceived, - Event::NodeFieldUpdated(_) => EventKind::NodeFieldUpdated, + Event::NodeFieldUpdated(_, _) => EventKind::NodeFieldUpdated, + Event::NodeListUpdated => EventKind::NodeListUpdated, Event::Shutdown => EventKind::Shutdown, Event::SendCanMessage { .. } => EventKind::SendCanMessage, Event::RelayCanMessage { .. } => EventKind::RelayCanMessage, diff --git a/src/lib.rs b/src/lib.rs index f75ccd3..58f6cc5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,9 @@ pub mod socket; pub fn run_with_config(config: Config) -> anyhow::Result<()> { let event_dispatcher = events::EventDispatcher::new(); - let node_manager = nodes::NodeManager::new(&event_dispatcher); + let mapping = nodes::mapping::Mapping::load_mapping_from_path(&config.mapping_path)?; + + let node_manager = nodes::NodeManager::new(&event_dispatcher, mapping); run_with_dependencies(&event_dispatcher, &node_manager, config) } @@ -45,6 +47,15 @@ pub fn run_with_dependencies( scope, ); + if config.webserver_socket.enabled { + socket::spawn_webserver_socket_worker( + config.webserver_socket.clone(), + node_manager, + event_dispatcher, + scope, + ); + } + node_manager.start_node_registration(); Ok(()) diff --git a/src/nodes/mapping.rs b/src/nodes/mapping.rs new file mode 100644 index 0000000..9a05704 --- /dev/null +++ b/src/nodes/mapping.rs @@ -0,0 +1,709 @@ +use anyhow::{Context, bail, ensure}; +use liquidcan::payloads::{CanDataType, CanDataValue}; +use serde::Deserialize; +use std::{ + collections::{BTreeMap, HashSet}, + fs, + path::Path, +}; +use toml::Value; + +#[derive(Debug, Clone, Default, Deserialize)] +#[serde(default)] +pub struct Mapping { + pub mapping: BTreeMap>, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct MappingEntry { + pub name: String, + #[serde(rename = "type")] + pub field_type: FieldType, + pub raw_field: String, + #[serde(default)] + pub value: ValueParams, + #[serde(default)] + pub logical: Vec, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ValueParams { + pub slope: f64, + pub offset: f64, + #[serde(default)] + pub unit: String, +} + +impl Default for ValueParams { + fn default() -> Self { + Self { + slope: 1.0, + offset: 0.0, + unit: "".to_string(), + } + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct LogicalRule { + pub range: LogicalRange, + pub value: Value, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct LogicalRange { + /// Inclusive lower bound by default. If omitted, the range is unbounded below. + #[serde(default = "default_unbounded_min")] + pub min: f64, + /// Exclusive upper bound by default. If omitted, the range is unbounded above. + #[serde(default = "default_unbounded_max")] + pub max: f64, + #[serde(default = "default_min_inclusive")] + pub min_inclusive: bool, + #[serde(default)] + pub max_inclusive: bool, +} + +fn default_unbounded_min() -> f64 { + f64::NEG_INFINITY +} + +fn default_unbounded_max() -> f64 { + f64::INFINITY +} + +fn default_min_inclusive() -> bool { + true +} + +#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum FieldType { + Telemetry, + Parameter, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct LogicalValue { + pub value: Value, +} + +#[derive(Debug, Clone, PartialEq)] +pub struct MappedValue { + pub value: f64, + pub unit: String, +} + +pub struct MappingLookupResult<'a> { + pub node_name: &'a str, + pub mapping_entry: &'a MappingEntry, +} + +impl Mapping { + pub fn load_mapping_from_file(path: &str) -> anyhow::Result { + if path.is_empty() { + return Ok(Self::default()); + } + + Self::load_mapping_file(Path::new(path)) + } + + pub fn load_mapping_from_path(path: &str) -> anyhow::Result { + if path.is_empty() { + return Ok(Self::default()); + } + + Self::load_mapping_directory(Path::new(path)) + } + + pub fn parse_mapping(toml_str: &str) -> anyhow::Result { + let config = toml::from_str::(toml_str) + .map_err(|err| anyhow::anyhow!("Failed to parse mapping config: {}", err))?; + + config.validate()?; + + Ok(config) + } + + fn load_mapping_file(path: &Path) -> anyhow::Result { + let toml_str = fs::read_to_string(path) + .with_context(|| format!("Failed to read mapping config file at {}", path.display()))?; + + Self::parse_mapping(&toml_str) + .with_context(|| format!("Failed to load mapping config from {}", path.display())) + } + + fn load_mapping_directory(path: &Path) -> anyhow::Result { + let mut entries = fs::read_dir(path) + .with_context(|| format!("Failed to read mapping directory {}", path.display()))? + .map(|entry| entry.map(|entry| entry.path())) + .collect::>>() + .with_context(|| format!("Failed to list mapping directory {}", path.display()))?; + + entries.retain(|entry| { + entry.is_file() + && entry + .extension() + .is_some_and(|extension| extension.eq_ignore_ascii_case("toml")) + }); + entries.sort(); + + ensure!( + !entries.is_empty(), + "mapping directory {} contains no TOML files", + path.display() + ); + + let mut combined = Self::default(); + for entry in entries { + let mapping = Self::load_mapping_file(&entry).with_context(|| { + format!("Failed to load mapping config from {}", entry.display()) + })?; + + for (node, fields) in mapping.mapping { + combined.mapping.entry(node).or_default().extend(fields); + } + } + + combined.validate().with_context(|| { + format!("Mapping validation failed for directory {}", path.display()) + })?; + + Ok(combined) + } + + pub fn validate(&self) -> anyhow::Result<()> { + let mut names = HashSet::new(); + let mut raw_fields = HashSet::new(); + for (node, fields) in &self.mapping { + ensure!( + !node.trim().is_empty(), + "mapping contains an entry with an empty node name" + ); + for field in fields { + field.validate().with_context(|| { + format!("mapping for node {} field {} is invalid", node, field.name) + })?; + + let raw_id = (node.as_str(), field.raw_field.as_str()); + if !raw_fields.insert(raw_id) { + anyhow::bail!( + "Duplicate raw field mapping for node '{}' field '{}'", + node, + field.raw_field + ); + } + + if !names.insert(field.name.as_str()) { + anyhow::bail!("Duplicate mapping name '{}'", field.name); + } + } + } + + Ok(()) + } + + pub fn get_mapping_for_name(&self, name: &str) -> Option> { + self.mapping.iter().find_map(|(node, fields)| { + fields + .iter() + .find(|field| field.name == name) + .map(|field| MappingLookupResult { + node_name: node.as_str(), + mapping_entry: field, + }) + }) + } + + pub fn get_mapping_for_raw( + &self, + node: &str, + field: &str, + field_type: FieldType, + ) -> Option> { + self.mapping + .get_key_value(node) + .and_then(|(node, mapping_entries)| { + mapping_entries + .iter() + .find(|mapping| mapping.raw_field == field && mapping.field_type == field_type) + .map(|mapping| MappingLookupResult { + node_name: node, + mapping_entry: mapping, + }) + }) + } +} + +impl MappingEntry { + fn validate(&self) -> anyhow::Result<()> { + ensure!( + !self.name.trim().is_empty(), + "mapping name must be non-empty", + ); + + ensure!( + !self.raw_field.trim().is_empty(), + "mapping {} has an empty raw_field", + self.name + ); + ensure!( + self.value.slope.is_finite(), + "mapping {} has a non-finite slope", + self.name + ); + ensure!( + self.value.slope != 0.0, + "mapping {} has a slope of zero, which is not allowed", + self.name + ); + ensure!( + self.value.offset.is_finite(), + "mapping {} has a non-finite offset", + self.name + ); + + self.validate_logical_rules()?; + + Ok(()) + } + + /// Validates that logical rules form an unambiguous partition of all mapped values. + /// + /// Empty logical rules are allowed. Ranges must be non-empty and non-overlapping. + fn validate_logical_rules(&self) -> anyhow::Result<()> { + if self.logical.is_empty() { + return Ok(()); + } + + let mut covered_ranges = Vec::new(); + + for (index, rule) in self.logical.iter().enumerate() { + if !rule.range.is_non_empty() { + bail!( + "Logical rule {} for mapping {} has an empty range {}", + index + 1, + self.name, + rule.range.describe() + ); + } + + for (covered_index, covered_range) in covered_ranges.iter().enumerate() { + if let Some(overlap) = rule.range.intersection(covered_range) { + bail!( + "Logical rule {} for mapping {} overlaps with rule {} in {}; overlapping ranges are ambiguous", + index + 1, + self.name, + covered_index + 1, + overlap.describe() + ); + } + } + + covered_ranges.push(rule.range.clone()); + } + + Ok(()) + } + + /// Applies the linear mapping `mapped = raw * slope + offset`. + pub fn mapped_value(&self, raw_value: &CanDataValue) -> anyhow::Result { + let numeric_raw_value = can_data_value_to_f64(raw_value)?; + + Ok(MappedValue { + unit: self.value.unit.clone(), + value: numeric_raw_value * self.value.slope + self.value.offset, + }) + } + + /// Inverts the linear mapping and converts the result to the concrete CAN data type. + pub fn raw_value_from_mapped( + &self, + mapped_value: f64, + data_type: CanDataType, + ) -> anyhow::Result { + ensure!( + mapped_value.is_finite(), + "mapped value for {} must be finite", + self.name + ); + + ensure!( + self.value.slope != 0.0, + "cannot invert mapping {} because slope is zero", + self.name + ); + + can_data_value_from_f64( + (mapped_value - self.value.offset) / self.value.slope, + data_type, + ) + } + + pub fn logical_value(&self, mapped_value: f64) -> Option { + self.logical + .iter() + .find(|rule| rule.matches(mapped_value)) + .map(|rule| LogicalValue { + value: rule.value.clone(), + }) + } +} + +impl LogicalRule { + fn matches(&self, mapped_value: f64) -> bool { + self.range.contains(mapped_value) + } +} + +impl LogicalRange { + fn contains(&self, value: f64) -> bool { + let above_lower = if self.min_inclusive { + value >= self.min + } else { + value > self.min + }; + let below_upper = if self.max_inclusive { + value <= self.max + } else { + value < self.max + }; + above_lower && below_upper + } + + fn intersection(&self, other: &Self) -> Option { + let max_cmp = self.max.partial_cmp(&other.max).unwrap(); + let min_cmp = self.min.partial_cmp(&other.min).unwrap(); + + let max = self.max.min(other.max); + let min = self.min.max(other.min); + + let min_inclusive = match min_cmp { + std::cmp::Ordering::Less => other.min_inclusive, + std::cmp::Ordering::Greater => self.min_inclusive, + std::cmp::Ordering::Equal => self.min_inclusive && other.min_inclusive, + }; + + let max_inclusive = match max_cmp { + std::cmp::Ordering::Less => self.max_inclusive, + std::cmp::Ordering::Greater => other.max_inclusive, + std::cmp::Ordering::Equal => self.max_inclusive && other.max_inclusive, + }; + + let intersection = Self { + min, + max, + min_inclusive, + max_inclusive, + }; + if intersection.is_non_empty() { + Some(intersection) + } else { + None + } + } + + fn is_non_empty(&self) -> bool { + if self.max > self.min { + return true; + } + + if self.max == self.min { + return self.min_inclusive && self.max_inclusive; + } + + false + } + + fn describe(&self) -> String { + format!( + "{}{}, {}{}", + if self.min_inclusive { "[" } else { "(" }, + self.min, + self.max, + if self.max_inclusive { "]" } else { ")" } + ) + } +} + +fn can_data_value_to_f64(value: &CanDataValue) -> anyhow::Result { + match value { + CanDataValue::Float32(value) => Ok(*value as f64), + CanDataValue::Int32(value) => Ok(*value as f64), + CanDataValue::Int16(value) => Ok(*value as f64), + CanDataValue::Int8(value) => Ok(*value as f64), + CanDataValue::UInt32(value) => Ok(*value as f64), + CanDataValue::UInt16(value) => Ok(*value as f64), + CanDataValue::UInt8(value) => Ok(*value as f64), + CanDataValue::Boolean(value) => Ok(if *value { 1.0 } else { 0.0 }), + CanDataValue::Raw(_) => bail!("raw CAN data must be decoded before applying a mapping"), + } +} + +/// Converts a mapped numeric value back into a typed CAN payload value. +fn can_data_value_from_f64(value: f64, data_type: CanDataType) -> anyhow::Result { + ensure!(value.is_finite(), "raw value must be finite"); + + match data_type { + CanDataType::Float32 => Ok(CanDataValue::Float32(value as f32)), + CanDataType::Int32 => Ok(CanDataValue::Int32(checked_integer::(value)?)), + CanDataType::Int16 => Ok(CanDataValue::Int16(checked_integer::(value)?)), + CanDataType::Int8 => Ok(CanDataValue::Int8(checked_integer::(value)?)), + CanDataType::UInt32 => Ok(CanDataValue::UInt32(checked_integer::(value)?)), + CanDataType::UInt16 => Ok(CanDataValue::UInt16(checked_integer::(value)?)), + CanDataType::UInt8 => Ok(CanDataValue::UInt8(checked_integer::(value)?)), + CanDataType::Boolean => { + if (value - 0.0).abs() < f64::EPSILON { + Ok(CanDataValue::Boolean(false)) + } else if (value - 1.0).abs() < f64::EPSILON { + Ok(CanDataValue::Boolean(true)) + } else { + bail!("boolean raw values must map back to 0 or 1, got {value}") + } + } + } +} + +/// Checks that a floating-point inverse-mapped value can be represented as an integer CAN type. +fn checked_integer(value: f64) -> anyhow::Result +where + T: TryFrom, + >::Error: std::fmt::Debug, +{ + let rounded = value.round(); + ensure!( + (value - rounded).abs() <= 1e-9, + "raw value {value} is not an integer" + ); + + T::try_from(rounded as i128).map_err(|_| anyhow::anyhow!("raw value {rounded} is out of range")) +} + +#[cfg(test)] +mod tests { + use std::{fs, path::PathBuf}; + + use liquidcan::payloads::{CanDataType, CanDataValue}; + use toml::Value; + + use super::{LogicalValue, Mapping}; + + #[test] + fn parses_and_applies_mapping_schema() { + let mapping = Mapping::parse_mapping( + r##" +[[mapping.ECU]] +name = "tank_pressure" +type = "telemetry" +raw_field = "pressure_adc" +value = { slope = 0.5, offset = 1.0, unit = "bar" } + +[[mapping.ECU.logical]] +range = { min = 100 } +value = "High" + +[[mapping.ECU.logical]] +range = { max = 100 } +value = "Normal" +"##, + ) + .expect("mapping should parse"); + + let lookup = mapping + .get_mapping_for_name("tank_pressure") + .expect("entry should exist"); + + let mapped = lookup + .mapping_entry + .mapped_value(&CanDataValue::UInt16(198)) + .expect("raw value should map"); + assert_eq!(mapped.value, 100.0); + assert_eq!(mapped.unit, "bar"); + + assert_eq!( + lookup.mapping_entry.logical_value(mapped.value), + Some(LogicalValue { + value: Value::String("High".to_string()), + }) + ); + } + + #[test] + fn rejects_duplicate_mapping_names() { + let error = Mapping::parse_mapping( + r#" +[[mapping.node1]] +name = "duplicate" +raw_field = "field1" +type = "telemetry" +value = { slope = 1.0, offset = 0.0 } + +[[mapping.node1]] +name = "duplicate" +raw_field = "field2" +type = "telemetry" +value = { slope = 1.0, offset = 0.0 } +"#, + ) + .expect_err("duplicate names should fail validation"); + + assert!(format!("{error:#}").contains("Duplicate mapping name")); + } + + #[test] + fn converts_mapped_value_back_to_raw_parameter_type() { + let mapping = Mapping::parse_mapping( + r#" +[[mapping.ECU]] +name = "valve_opening" +type = "parameter" +raw_field = "valve_raw" +value = { slope = 0.5, offset = 10.0, unit = "%" } +"#, + ) + .expect("mapping should parse"); + + let lookup = mapping.get_mapping_for_name("valve_opening").unwrap(); + let raw = lookup + .mapping_entry + .raw_value_from_mapped(60.0, CanDataType::UInt8) + .expect("mapped value should invert to raw"); + + assert_eq!(raw, CanDataValue::UInt8(100)); + } + + #[test] + fn rejects_fractional_raw_values_for_integer_parameters() { + let mapping = Mapping::parse_mapping( + r#" +[[mapping.ECU]] +name = "valve_opening" +type = "parameter" +raw_field = "valve_raw" +value = { slope = 1.0, offset = 0.0 } +"#, + ) + .expect("mapping should parse"); + + let lookup = mapping.get_mapping_for_name("valve_opening").unwrap(); + let error = lookup + .mapping_entry + .raw_value_from_mapped(10.2, CanDataType::UInt8) + .expect_err("fractional integer raw values should fail"); + + assert!(format!("{error:#}").contains("is not an integer")); + } + + #[test] + fn rejects_duplicate_raw_fields_across_mapping_files() { + let dir = temp_mapping_dir("duplicate_raw"); + fs::write( + dir.join("a.toml"), + r#" +[[mapping.ECU]] +name = "first" +type = "telemetry" +raw_field = "pressure" +"#, + ) + .unwrap(); + fs::write( + dir.join("b.toml"), + r#" +[[mapping.ECU]] +name = "second" +type = "telemetry" +raw_field = "pressure" +"#, + ) + .unwrap(); + + let error = Mapping::load_mapping_from_path(dir.to_str().unwrap()) + .expect_err("duplicate raw fields across files should fail"); + + assert!(format!("{error:#}").contains("Duplicate raw field")); + let _ = fs::remove_dir_all(dir); + } + + #[test] + fn loads_mapping_directory() { + let mapping = Mapping::load_mapping_from_path("tests/mapping/split") + .expect("split mapping directory should be valid"); + + assert!(mapping.get_mapping_for_name("fuel_level").is_some()); + assert!(mapping.get_mapping_for_name("throttle_state").is_some()); + } + + #[test] + fn rejects_empty_mapping_directory() { + let dir = temp_mapping_dir("empty"); + + let error = Mapping::load_mapping_from_path(dir.to_str().unwrap()) + .expect_err("empty mapping directories should fail"); + + assert!(format!("{error:#}").contains("contains no TOML files")); + let _ = fs::remove_dir_all(dir); + } + + #[test] + fn checked_in_example_mapping_is_valid() { + Mapping::load_mapping_from_file("tests/mapping/example1.toml") + .expect("example mapping should be valid"); + } + + #[test] + fn rejects_overlapping_logical_rules() { + let error = Mapping::parse_mapping( + r#" +[[mapping.ECU]] +name = "temperature" +type = "telemetry" +raw_field = "temperature" + +[[mapping.ECU.logical]] +range = { max = 100 } +value = "Low" + +[[mapping.ECU.logical]] +range = { max = 50 } +value = "Very low" + +[[mapping.ECU.logical]] +range = { min = 100 } +value = "High" +"#, + ) + .expect_err("second rule should overlap with the first"); + + assert!(format!("{error:#}").contains("overlaps")); + } + + #[test] + fn accepts_adjacent_ranges() { + Mapping::parse_mapping( + r#" +[[mapping.ECU]] +name = "temperature" +type = "telemetry" +raw_field = "temperature" + +[[mapping.ECU.logical]] +range = { max = 10 } +value = "Cold" + +[[mapping.ECU.logical]] +range = { min = 10 } +value = "Hot" +"#, + ) + .expect("adjacent ranges should cover the threshold exactly once"); + } + + fn temp_mapping_dir(name: &str) -> PathBuf { + let path = + std::env::temp_dir().join(format!("ferro_flow_mapping_{name}_{}", std::process::id())); + let _ = fs::remove_dir_all(&path); + fs::create_dir_all(&path).unwrap(); + path + } +} diff --git a/src/nodes/mod.rs b/src/nodes/mod.rs index b4d9876..f1a05ef 100644 --- a/src/nodes/mod.rs +++ b/src/nodes/mod.rs @@ -1,9 +1,10 @@ //! Contains code for managing the CAN nodes that are connected to FerroFlow, their fields and data types. mod can_node; +pub mod mapping; mod node_manager; -pub use node_manager::NodeManager; +pub use node_manager::{FieldValueSnapshot, NodeManager, NodeSnapshot, NodeTelemetrySnapshot}; use std::{ sync::mpsc::{self, RecvTimeoutError}, time::{Duration, Instant}, diff --git a/src/nodes/node_manager.rs b/src/nodes/node_manager.rs index 9b95b9c..e93b20b 100644 --- a/src/nodes/node_manager.rs +++ b/src/nodes/node_manager.rs @@ -4,19 +4,27 @@ use anyhow::anyhow; use anyhow::{Context, Result, bail}; use chrono::Utc; use dashmap::DashMap; +use liquidcan::payloads::FieldGetResPayload; use liquidcan::{ CanMessage, CanMessageId, payloads::{ - CanDataValue, FieldGetResPayload, FieldRegistrationPayload, HeartbeatPayload, - NodeInfoResPayload, TelemetryGroupDefinitionPayload, TelemetryGroupUpdatePayload, + CanDataType, CanDataValue, FieldRegistrationPayload, HeartbeatPayload, NodeInfoResPayload, + TelemetryGroupDefinitionPayload, TelemetryGroupUpdatePayload, }, }; +use crate::nodes::mapping::{self, LogicalValue, MappedValue, Mapping, MappingLookupResult}; use crate::{db::FieldLog, events}; use super::can_node::{CanNode, FieldInfo, RegistrationInfo, TelemetryGroupDefinition}; +mod commands; +mod snapshots; + +pub use snapshots::{FieldValueSnapshot, NodeSnapshot, NodeTelemetrySnapshot}; + pub struct NodeManager<'a> { + mapping: Mapping, can_nodes: DashMap, // Nodes that did not yet receive all their field registrations. @@ -25,8 +33,9 @@ pub struct NodeManager<'a> { } impl<'a> NodeManager<'a> { - pub fn new(event_dispatcher: &'a events::EventDispatcher) -> Self { + pub fn new(event_dispatcher: &'a events::EventDispatcher, mapping: Mapping) -> Self { Self { + mapping, can_nodes: DashMap::new(), registering_nodes: Mutex::new(HashMap::new()), event_dispatcher, @@ -89,7 +98,7 @@ impl<'a> NodeManager<'a> { let node = CanNode::new(registration_info); if node.node_registration_complete() { - self.can_nodes.insert(node_id, node); + self.complete_node_registration(node_id, node); } else { self.registering_nodes .lock() @@ -132,7 +141,7 @@ impl<'a> NodeManager<'a> { node_id ) })?; - self.can_nodes.insert(node_id, completed_node); + self.complete_node_registration(node_id, completed_node); } Ok(()) } else { @@ -170,7 +179,8 @@ impl<'a> NodeManager<'a> { node_id ) })?; - self.can_nodes.insert(node_id, completed_node); + + self.complete_node_registration(node_id, completed_node); } Ok(()) @@ -211,22 +221,24 @@ impl<'a> NodeManager<'a> { ) })?; - let field_infos = field_ids.iter().map(|id| { - node.telemetry_fields - .get(id) - .with_context(|| { - format!( - "received telemetry group update for node {} and group {} but field {} is not defined", - node_id, group_id, id - ) + let field_infos = field_ids + .iter() + .map(|id| { + node.telemetry_fields.get(id).with_context(|| { + format!( + "received telemetry group update for node {} and group {} but field {} is not defined", + node_id, group_id, id + ) + }) }) - }).collect::>>()?; + .collect::>>()?; + + let raw_values = group_update + .values + .unpack(field_infos.iter().map(|info| info.data_type)) + .collect::>(); - for (&id, value) in field_ids.iter().zip( - group_update - .values - .unpack(field_infos.iter().map(|info| info.data_type)), - ) { + for ((&id, field_info), value) in field_ids.iter().zip(field_infos).zip(raw_values) { let value = value.with_context(|| { format!( "failed to unpack value for node {} group {} field {}", @@ -235,8 +247,6 @@ impl<'a> NodeManager<'a> { })?; node.values.insert(id, (timestamp, value.clone())); - let field_info = node.telemetry_fields.get(&id).unwrap(); - let telemetry_log = FieldLog { timestamp, node_id: node_id as i16, @@ -245,7 +255,10 @@ impl<'a> NodeManager<'a> { field_value: Self::can_data_value_to_json(value), }; self.event_dispatcher - .dispatch(events::Event::NodeFieldUpdated(telemetry_log)); + .dispatch(events::Event::NodeFieldUpdated( + telemetry_log, + events::NodeFieldUpdateSource::TelemetryUpdate, + )); } Ok(()) @@ -299,7 +312,10 @@ impl<'a> NodeManager<'a> { }; self.event_dispatcher - .dispatch(events::Event::NodeFieldUpdated(telemetry_log)); + .dispatch(events::Event::NodeFieldUpdated( + telemetry_log, + events::NodeFieldUpdateSource::FieldGetRes, + )); Ok(()) } @@ -357,17 +373,347 @@ impl<'a> NodeManager<'a> { &self.can_nodes } - fn can_data_value_to_json(value: CanDataValue) -> serde_json::Value { - match value { - CanDataValue::Float32(v) => serde_json::json!(v), - CanDataValue::Int32(v) => serde_json::json!(v), - CanDataValue::Int16(v) => serde_json::json!(v), - CanDataValue::Int8(v) => serde_json::json!(v), - CanDataValue::UInt32(v) => serde_json::json!(v), - CanDataValue::UInt16(v) => serde_json::json!(v), - CanDataValue::UInt8(v) => serde_json::json!(v), - CanDataValue::Boolean(v) => serde_json::json!(v), - CanDataValue::Raw(items) => serde_json::json!(items), + fn complete_node_registration(&self, node_id: u8, node: CanNode) { + self.can_nodes.insert(node_id, node); + self.event_dispatcher + .dispatch(events::Event::NodeListUpdated); + } + + /// Returns the latest cached raw CAN value for a mapped field name. + /// + /// This does not send a CAN request. Call `request_value` first if a fresh value is needed. + /// + /// Use this `try_` variant to distinguish missing values from invalid mappings or fields + /// that have not registered yet. + pub fn try_get_raw_value(&self, mapped_name: &str) -> Result> { + let (_, target) = self.resolve_mapping_by_name(mapped_name)?; + + Ok(self.latest_raw_value(&target)) + } + + /// Convenience wrapper around `try_get_raw_value` that treats errors as missing values. + pub fn get_raw_value(&self, mapped_name: &str) -> Option { + self.try_get_raw_value(mapped_name).ok().flatten() + } + + /// Returns the latest cached value after applying the mapping's slope/offset conversion. + /// + /// `Ok(None)` means the mapping and raw field exist, but no value has been received yet. + pub fn try_get_mapped_value(&self, mapped_name: &str) -> Result> { + let (mapping, target) = self.resolve_mapping_by_name(mapped_name)?; + let Some(raw_value) = self.latest_raw_value(&target) else { + return Ok(None); + }; + + Ok(Some(mapping.mapping_entry.mapped_value(&raw_value)?)) + } + + /// Convenience wrapper around `try_get_mapped_value` that treats errors as missing values. + pub fn get_mapped_value(&self, mapped_name: &str) -> Option { + self.try_get_mapped_value(mapped_name).ok().flatten() + } + + /// Returns the logical value associated with the current mapped value. + /// + /// Logical values are derived from the configured range table. If the mapping has no logical + /// rules, this returns `Ok(None)` even when a mapped numeric value is available. + pub fn try_get_logical_value(&self, mapped_name: &str) -> Result> { + let Some(mapped_value) = self.try_get_mapped_value(mapped_name)? else { + return Ok(None); + }; + + let mapping_lookup = self.lookup_mapping(mapped_name)?; + + Ok(mapping_lookup + .mapping_entry + .logical_value(mapped_value.value)) + } + + /// Convenience wrapper around `try_get_logical_value` that treats errors as missing values. + pub fn get_logical_value(&self, mapped_name: &str) -> Option { + self.try_get_logical_value(mapped_name).ok().flatten() + } + + fn lookup_mapping(&self, mapped_name: &str) -> Result> { + self.mapping + .get_mapping_for_name(mapped_name) + .with_context(|| format!("no mapping exists for {mapped_name}")) + } + + fn resolve_mapping_by_name( + &self, + mapped_name: &str, + ) -> Result<(MappingLookupResult<'_>, ResolvedMappingTarget)> { + let mapping_lookup = self.lookup_mapping(mapped_name)?; + let target = self + .resolve_mapping_target(&mapping_lookup) + .with_context(|| format!("mapped field {mapped_name} is not registered"))?; + + Ok((mapping_lookup, target)) + } + + fn latest_raw_value(&self, target: &ResolvedMappingTarget) -> Option { + self.can_nodes.get(&target.node_id).and_then(|node| { + node.values + .get(&target.field_id) + .map(|value| value.1.clone()) + }) + } + + /// Resolves a mapping entry to the currently registered node id, field id, and field type. + /// + /// Mappings are written against stable device/field names, but LiquidCAN requests need numeric + /// ids learned during node registration. + fn resolve_mapping_target( + &self, + mapping_lookup_result: &MappingLookupResult, + ) -> Option { + self.can_nodes.iter().find_map(|node| { + if node.registration_info.device_name != mapping_lookup_result.node_name { + return None; + } + + let fields = match mapping_lookup_result.mapping_entry.field_type { + mapping::FieldType::Telemetry => &node.telemetry_fields, + mapping::FieldType::Parameter => &node.parameter_fields, + }; + + fields + .iter() + .find(|(_, field)| field.name == mapping_lookup_result.mapping_entry.raw_field) + .map(|(field_id, field)| ResolvedMappingTarget { + node_id: *node.key(), + field_id: *field_id, + data_type: field.data_type, + }) + }) + } +} + +struct ResolvedMappingTarget { + node_id: u8, + field_id: u8, + data_type: CanDataType, +} + +#[cfg(test)] +mod tests { + use std::{sync::mpsc, time::Duration}; + + use chrono::Utc; + use liquidcan::payloads::{CanDataType, CanDataValue, ParameterSetReqPayload}; + use serde_json::json; + use toml::Value; + + use crate::events::{Event, EventDispatcher, EventKind}; + + use super::*; + + #[test] + fn reads_raw_mapped_and_logical_values_by_mapping_name() { + let dispatcher = EventDispatcher::new(); + let manager = NodeManager::new(&dispatcher, test_mapping()); + insert_test_node(&manager); + + assert_eq!( + manager.get_raw_value("tank_pressure"), + Some(CanDataValue::UInt16(198)) + ); + + let mapped = manager + .get_mapped_value("tank_pressure") + .expect("mapped value should be available"); + assert_eq!(mapped.value, 100.0); + assert_eq!(mapped.unit, "bar"); + + let logical = manager + .get_logical_value("tank_pressure") + .expect("logical value should be available"); + assert_eq!(logical.value, Value::String("High".to_string())); + } + + #[test] + fn writes_mapped_parameter_values_as_raw_can_values() { + let dispatcher = EventDispatcher::new(); + let (tx, rx) = mpsc::channel(); + dispatcher.subscribe(tx, vec![EventKind::SendCanMessage], "test-send-listener"); + + let manager = NodeManager::new(&dispatcher, test_mapping()); + insert_test_node(&manager); + + manager + .set_mapped_value_by_mapped("valve_opening", 60.0) + .expect("mapped parameter should be writable"); + + assert_eq!( + receive_parameter_set(&rx), + (5, 20, CanDataValue::UInt8(100)) + ); + } + + #[test] + fn writes_raw_parameter_values() { + let dispatcher = EventDispatcher::new(); + let (tx, rx) = mpsc::channel(); + dispatcher.subscribe(tx, vec![EventKind::SendCanMessage], "test-send-listener"); + + let manager = NodeManager::new(&dispatcher, test_mapping()); + insert_test_node(&manager); + + manager + .set_raw_value_by_mapped("valve_opening", json!(42)) + .expect("raw parameter should be writable"); + + assert_eq!(receive_parameter_set(&rx), (5, 20, CanDataValue::UInt8(42))); + } + + #[test] + fn requests_field_get_for_mapped_values() { + let dispatcher = EventDispatcher::new(); + let (tx, rx) = mpsc::channel(); + dispatcher.subscribe(tx, vec![EventKind::SendCanMessage], "test-send-listener"); + + let manager = NodeManager::new(&dispatcher, test_mapping()); + insert_test_node(&manager); + + manager + .request_field_by_mapped("tank_pressure") + .expect("mapped field should be requestable"); + + let event = rx + .recv_timeout(Duration::from_millis(200)) + .expect("send event should be dispatched"); + + match event { + Event::SendCanMessage { + receiver_node_id, + message: CanMessage::FieldGetReq { payload }, + } => { + assert_eq!(receiver_node_id, 5); + assert_eq!(payload.field_id, 10); + } + other => panic!("unexpected event: {other:?}"), } } + + #[test] + fn requests_field_get_by_raw_node_and_field_name() { + let dispatcher = EventDispatcher::new(); + let (tx, rx) = mpsc::channel(); + dispatcher.subscribe(tx, vec![EventKind::SendCanMessage], "test-send-listener"); + + let manager = NodeManager::new(&dispatcher, test_mapping()); + insert_test_node(&manager); + + manager + .request_field_by_raw("ECU", "pressure_adc") + .expect("raw field should be requestable"); + + let event = rx + .recv_timeout(Duration::from_millis(200)) + .expect("send event should be dispatched"); + + match event { + Event::SendCanMessage { + receiver_node_id, + message: CanMessage::FieldGetReq { payload }, + } => { + assert_eq!(receiver_node_id, 5); + assert_eq!(payload.field_id, 10); + } + other => panic!("unexpected event: {other:?}"), + } + } + + #[test] + fn writes_raw_parameter_by_node_name_and_raw_field_name() { + let dispatcher = EventDispatcher::new(); + let (tx, rx) = mpsc::channel(); + dispatcher.subscribe(tx, vec![EventKind::SendCanMessage], "test-send-listener"); + + let manager = NodeManager::new(&dispatcher, test_mapping()); + insert_test_node(&manager); + + manager + .set_raw_value_by_raw("ECU", "valve_raw", serde_json::json!(42)) + .expect("raw parameter should be writable by node and raw field"); + + assert_eq!(receive_parameter_set(&rx), (5, 20, CanDataValue::UInt8(42))); + } + + fn test_mapping() -> Mapping { + Mapping::parse_mapping( + r##" +[[mapping.ECU]] +name = "tank_pressure" +type = "telemetry" +raw_field = "pressure_adc" +value = { slope = 0.5, offset = 1.0, unit = "bar" } + +[[mapping.ECU.logical]] +range = { min = 100 } +value = "High" + +[[mapping.ECU.logical]] +range = { max = 100 } +value = "Normal" + +[[mapping.ECU]] +name = "valve_opening" +type = "parameter" +raw_field = "valve_raw" +value = { slope = 0.5, offset = 10.0, unit = "%" } +"##, + ) + .expect("mapping should parse") + } + + fn receive_parameter_set(rx: &mpsc::Receiver) -> (u8, u8, CanDataValue) { + let event = rx + .recv_timeout(Duration::from_millis(200)) + .expect("send event should be dispatched"); + + match event { + Event::SendCanMessage { + receiver_node_id, + message: + CanMessage::ParameterSetReq { + payload: + ParameterSetReqPayload { + parameter_id, + value, + }, + }, + } => (receiver_node_id, parameter_id, value), + other => panic!("unexpected event: {other:?}"), + } + } + + fn insert_test_node(manager: &NodeManager<'_>) { + let mut node = CanNode::new(RegistrationInfo { + telemetry_count: 1, + parameter_count: 1, + firmware_hash: 0, + protocol_hash: 0, + device_name: "ECU".to_string(), + }); + node.telemetry_fields.insert( + 10, + FieldInfo { + data_type: CanDataType::UInt16, + name: "pressure_adc".to_string(), + }, + ); + node.parameter_fields.insert( + 20, + FieldInfo { + data_type: CanDataType::UInt8, + name: "valve_raw".to_string(), + }, + ); + node.values + .insert(10, (Utc::now(), CanDataValue::UInt16(198))); + + manager.can_nodes.insert(5, node); + } } diff --git a/src/nodes/node_manager/commands.rs b/src/nodes/node_manager/commands.rs new file mode 100644 index 0000000..2972450 --- /dev/null +++ b/src/nodes/node_manager/commands.rs @@ -0,0 +1,232 @@ +use std::collections::HashMap; + +use anyhow::{Context, Result, anyhow}; +use liquidcan::{ + CanMessage, + payloads::{CanDataType, CanDataValue, FieldGetReqPayload, ParameterSetReqPayload}, +}; +use serde_json::Value; + +use crate::{events, nodes::mapping}; + +use super::super::can_node::FieldInfo; +use super::{NodeManager, ResolvedMappingTarget}; + +impl<'a> NodeManager<'a> { + /// Sends a `FieldGetReq` for the raw field behind a mapped name. + /// + /// The response is processed asynchronously by the normal CAN message handler. + pub fn request_field_by_mapped(&self, mapped_name: &str) -> Result<()> { + let (_, target) = self.resolve_mapping_by_name(mapped_name)?; + + self.event_dispatcher + .dispatch(events::Event::SendCanMessage { + receiver_node_id: target.node_id, + message: CanMessage::FieldGetReq { + payload: FieldGetReqPayload { + field_id: target.field_id, + }, + }, + }); + + Ok(()) + } + + /// Sends a `FieldGetReq` for the field identified by the node name and its raw field name. + /// + /// The response is processed asynchronously by the normal CAN message handler. + pub fn request_field_by_raw(&self, node_name: &str, raw_field_name: &str) -> Result<()> { + let target = + self.resolve_field_by_node_name_and_raw_name(node_name, raw_field_name, None)?; + + self.event_dispatcher + .dispatch(events::Event::SendCanMessage { + receiver_node_id: target.node_id, + message: CanMessage::FieldGetReq { + payload: FieldGetReqPayload { + field_id: target.field_id, + }, + }, + }); + + Ok(()) + } + + /// Writes a mapped value using its mapped name. + /// + /// The value is converted back to the raw CAN type using the inverse of the configured linear + /// mapping, then sent as a `ParameterSetReq`. + pub fn set_mapped_value_by_mapped(&self, mapped_name: &str, mapped_value: f64) -> Result<()> { + let (mapping_lookup, target) = self.resolve_mapping_by_name(mapped_name)?; + + if mapping_lookup.mapping_entry.field_type != mapping::FieldType::Parameter { + anyhow::bail!( + "mapped field {mapped_name} is not writable because it is not a parameter" + ); + } + + let raw_value = mapping_lookup + .mapping_entry + .raw_value_from_mapped(mapped_value, target.data_type)?; + self.dispatch_parameter_set(target, raw_value); + + Ok(()) + } + + /// Writes a raw CAN value using its mapped name. + pub fn set_raw_value_by_mapped(&self, mapped_name: &str, value: Value) -> Result<()> { + let (mapping_lookup, target) = self.resolve_mapping_by_name(mapped_name)?; + + if mapping_lookup.mapping_entry.field_type != mapping::FieldType::Parameter { + anyhow::bail!( + "mapped field {mapped_name} is not writable because it is not a parameter" + ); + } + + let raw_value = json_value_to_can_data_value(value, target.data_type)?; + + self.dispatch_parameter_set(target, raw_value); + + Ok(()) + } + + /// Writes a raw CAN value using its raw name. + pub fn set_raw_value_by_raw( + &self, + node_name: &str, + raw_field_name: &str, + value: Value, + ) -> Result<()> { + let target = self.resolve_field_by_node_name_and_raw_name( + node_name, + raw_field_name, + Some(mapping::FieldType::Parameter), + )?; + + let raw_value = json_value_to_can_data_value(value, target.data_type)?; + + self.dispatch_parameter_set( + ResolvedMappingTarget { + node_id: target.node_id, + field_id: target.field_id, + data_type: target.data_type, + }, + raw_value, + ); + + Ok(()) + } + + fn dispatch_parameter_set(&self, target: ResolvedMappingTarget, raw_value: CanDataValue) { + self.event_dispatcher + .dispatch(events::Event::SendCanMessage { + receiver_node_id: target.node_id, + message: CanMessage::ParameterSetReq { + payload: ParameterSetReqPayload { + parameter_id: target.field_id, + value: raw_value, + }, + }, + }); + } + + fn resolve_field_by_node_name_and_raw_name( + &self, + node_name: &str, + raw_field_name: &str, + required_field_type: Option, + ) -> Result { + for node in self.can_nodes.iter() { + if node.registration_info.device_name != node_name { + continue; + } + + let node_id = *node.key(); + if required_field_type != Some(mapping::FieldType::Parameter) + && let Some(target) = + self.resolve_raw_field_in_map(node_id, &node.telemetry_fields, raw_field_name) + { + return Ok(target); + } + + if required_field_type != Some(mapping::FieldType::Telemetry) + && let Some(target) = + self.resolve_raw_field_in_map(node_id, &node.parameter_fields, raw_field_name) + { + return Ok(target); + } + + anyhow::bail!( + "node {node_name} has no field named {raw_field_name} matching the required field type" + ); + } + + anyhow::bail!("no node found with name {node_name}"); + } + + fn resolve_raw_field_in_map( + &self, + node_id: u8, + fields: &HashMap, + raw_field_name: &str, + ) -> Option { + fields.iter().find_map(|(&field_id, field)| { + if field.name != raw_field_name { + return None; + } + + Some(ResolvedNodeField { + node_id, + field_id, + data_type: field.data_type, + }) + }) + } +} + +struct ResolvedNodeField { + node_id: u8, + field_id: u8, + data_type: CanDataType, +} + +pub fn json_value_to_can_data_value( + value: serde_json::Value, + data_type: CanDataType, +) -> Result { + match data_type { + CanDataType::Float32 => Ok(CanDataValue::Float32(json_value_to_f64(&value)? as f32)), + CanDataType::Int32 => Ok(CanDataValue::Int32(json_value_to_integer(&value)?)), + CanDataType::Int16 => Ok(CanDataValue::Int16(json_value_to_integer(&value)?)), + CanDataType::Int8 => Ok(CanDataValue::Int8(json_value_to_integer(&value)?)), + CanDataType::UInt32 => Ok(CanDataValue::UInt32(json_value_to_integer(&value)?)), + CanDataType::UInt16 => Ok(CanDataValue::UInt16(json_value_to_integer(&value)?)), + CanDataType::UInt8 => Ok(CanDataValue::UInt8(json_value_to_integer(&value)?)), + CanDataType::Boolean => value + .as_bool() + .map(CanDataValue::Boolean) + .or_else(|| { + value + .as_i64() + .map(|value| CanDataValue::Boolean(value != 0)) + }) + .with_context(|| format!("expected boolean-compatible value, got {value}")), + } +} + +fn json_value_to_f64(value: &serde_json::Value) -> Result { + value + .as_f64() + .with_context(|| format!("expected numeric value, got {value}")) +} + +fn json_value_to_integer(value: &serde_json::Value) -> Result +where + T: TryFrom, + >::Error: std::fmt::Debug, +{ + let raw = value + .as_i64() + .with_context(|| format!("expected integer value, got {value}"))?; + T::try_from(raw).map_err(|_| anyhow!("integer value {raw} is out of range")) +} diff --git a/src/nodes/node_manager/snapshots.rs b/src/nodes/node_manager/snapshots.rs new file mode 100644 index 0000000..f446f7e --- /dev/null +++ b/src/nodes/node_manager/snapshots.rs @@ -0,0 +1,214 @@ +use anyhow::Result; +use liquidcan::payloads::CanDataValue; + +use crate::nodes::mapping::{self, FieldType}; + +use super::super::can_node::CanNode; +use super::NodeManager; + +#[derive(Clone, Debug)] +pub struct NodeSnapshot { + pub id: u8, + pub name: String, + pub fields: Vec, +} + +#[derive(Clone, Debug)] +pub struct FieldSnapshot { + pub id: u8, + pub name: String, + pub raw_name: String, + pub mapped_name: Option, + pub kind: FieldType, +} + +#[derive(Clone, Debug)] +pub struct NodeTelemetrySnapshot { + pub id: u8, + pub name: String, + pub telemetry: Vec, +} + +#[derive(Clone, Debug)] +pub struct FieldValueSnapshot { + pub node_id: u8, + pub id: u8, + pub node_name: String, + pub raw_name: String, + pub mapped_name: Option, + pub name: String, + pub raw: serde_json::Value, + pub value: serde_json::Value, + pub unit: String, + pub logical: serde_json::Value, +} + +impl<'a> NodeManager<'a> { + pub fn nodes_snapshot(&self) -> Vec { + let mut nodes = self + .can_nodes + .iter() + .map(|node| { + let node_id = *node.key(); + let node_name = node.registration_info.device_name.clone(); + let mut fields = Vec::new(); + + for (node_fields, field_type) in [ + (&node.telemetry_fields, FieldType::Telemetry), + (&node.parameter_fields, FieldType::Parameter), + ] { + fields.extend(node_fields.iter().map(|(&id, field)| { + // TODO: we should avoid having to look up the mapping every time + let mapped_name = self + .mapping + .get_mapping_for_raw(&node_name, &field.name, field_type) + .map(|mapping| mapping.mapping_entry.name.clone()); + let name = mapped_name.clone().unwrap_or_else(|| field.name.clone()); + FieldSnapshot { + id, + name, + raw_name: field.name.clone(), + mapped_name, + kind: field_type, + } + })) + } + + NodeSnapshot { + id: node_id, + name: node_name, + fields, + } + }) + .collect::>(); + nodes.sort_by_key(|node| node.id); + nodes + } + + pub fn telemetry_snapshot(&self) -> Vec { + self.can_nodes + .iter() + .filter_map(|node| { + let telemetry = node + .values + .iter() + .filter_map(|value| { + self.field_value_snapshot_from_node(*node.key(), &node, *value.key()) + }) + .collect::>(); + + if telemetry.is_empty() { + return None; + } + + Some(NodeTelemetrySnapshot { + id: *node.key(), + name: node.registration_info.device_name.clone(), + telemetry, + }) + }) + .collect() + } + + pub fn field_value_snapshot_by_id( + &self, + node_id: u8, + field_id: u8, + ) -> Option { + self.can_nodes + .get(&node_id) + .and_then(|node| self.field_value_snapshot_from_node(node_id, &node, field_id)) + } + + pub fn field_value_snapshot_by_mapped_name( + &self, + mapped_name: &str, + ) -> Result> { + let (_, target) = self.resolve_mapping_by_name(mapped_name)?; + + Ok(self.field_value_snapshot_by_id(target.node_id, target.field_id)) + } + + pub(super) fn can_data_value_to_json(value: CanDataValue) -> serde_json::Value { + match value { + CanDataValue::Float32(v) => serde_json::json!(v), + CanDataValue::Int32(v) => serde_json::json!(v), + CanDataValue::Int16(v) => serde_json::json!(v), + CanDataValue::Int8(v) => serde_json::json!(v), + CanDataValue::UInt32(v) => serde_json::json!(v), + CanDataValue::UInt16(v) => serde_json::json!(v), + CanDataValue::UInt8(v) => serde_json::json!(v), + CanDataValue::Boolean(v) => serde_json::json!(v), + CanDataValue::Raw(items) => serde_json::json!(items), + } + } + + fn field_value_snapshot_from_node( + &self, + node_id: u8, + node: &CanNode, + field_id: u8, + ) -> Option { + let node_name = &node.registration_info.device_name; + let (field_type, field) = node + .telemetry_fields + .get(&field_id) + .map(|field| (mapping::FieldType::Telemetry, field)) + .or_else(|| { + node.parameter_fields + .get(&field_id) + .map(|field| (mapping::FieldType::Parameter, field)) + })?; + + let raw_value = node.values.get(&field_id).map(|value| value.1.clone())?; + let raw_json = Self::can_data_value_to_json(raw_value.clone()); + let mapping = self + .mapping + .get_mapping_for_raw(node_name, &field.name, field_type); + + let (mapped_name, name, value, unit, logical) = if let Some(mapping) = mapping { + let mapped = mapping.mapping_entry.mapped_value(&raw_value).ok(); + let value = mapped + .as_ref() + .map(|mapped| serde_json::json!(mapped.value)) + .unwrap_or_else(|| raw_json.clone()); + let unit = mapped + .as_ref() + .map(|mapped| mapped.unit.clone()) + .unwrap_or_default(); + let logical = mapped + .and_then(|mapped| mapping.mapping_entry.logical_value(mapped.value)) + .and_then(|logical| serde_json::to_value(logical.value).ok()) + .unwrap_or(serde_json::Value::Null); + + ( + Some(mapping.mapping_entry.name.clone()), + mapping.mapping_entry.name.clone(), + value, + unit, + logical, + ) + } else { + ( + None, + field.name.clone(), + raw_json.clone(), + String::new(), + serde_json::Value::Null, + ) + }; + + Some(FieldValueSnapshot { + node_id, + id: field_id, + node_name: node_name.clone(), + raw_name: field.name.clone(), + mapped_name, + name, + raw: raw_json, + value, + unit, + logical, + }) + } +} diff --git a/src/socket/PROTOCOL.md b/src/socket/PROTOCOL.md new file mode 100644 index 0000000..bec05fc --- /dev/null +++ b/src/socket/PROTOCOL.md @@ -0,0 +1,247 @@ +# Socket Protocol + +## Message Format + +Messages are sent over TCP. Each message starts with a 2-byte unsigned big-endian length header, followed by the JSON-encoded message body. + +Every message body is a JSON object: + +```json +{ + "type": "message_type", + "content": { + // message-specific data + } +} +``` + +## Field References + +Commands that target a field use one of these explicit references: + +Mapped field reference, resolved through the mapping: + +```json +{ + "type": "mapped", + "name": "tank_pressure" +} +``` + +Raw field reference, resolved by LiquidCAN device name and raw field name: + +```json +{ + "type": "raw", + "node_name": "ECU", + "field_name": "pressure_adc" +} +``` + +For raw references, `node_name` is the node `device_name` announced by the LiquidCAN protocol. + +## Messages From FerroFlow + +### Telemetry + +Sent at connection and on request. Contains all current cached telemetry values for all nodes. + +```json +{ + "type": "telemetry", + "content": { + "timestamp": 1710000000000, + "nodes": [ + { + "id": 5, + "name": "ECU", + "telemetry": [ + { + "id": 10, + "name": "tank_pressure", + "raw_name": "pressure_adc", + "mapped_name": "tank_pressure", + "raw": 198, + "value": 100.0, + "unit": "bar", + "logical": "High" + } + ] + } + ] + } +} +``` + +If a field has no mapping, `name` equals `raw_name`, `mapped_name` is `null`, `value` equals `raw`, `unit` is empty, and `logical` is `null`. + +### TelemetryDelta + +Sent when telemetry values change. Contains values that changed since the previous `telemetry` or `telemetry_delta` message sent on the socket. + +```json +{ + "type": "telemetry_delta", + "content": { + "prev_timestamp": 1710000000000, + "timestamp": 1710000000020, + "nodes": [ + { + "id": 5, + "name": "ECU", + "telemetry": [ + { + "id": 10, + "name": "tank_pressure", + "raw_name": "pressure_adc", + "mapped_name": "tank_pressure", + "raw": 199, + "value": 100.5, + "unit": "bar", + "logical": "High" + } + ] + } + ] + } +} +``` + +### Nodes + +Sent at connection, whenever the registered node list changes, and on request. + +```json +{ + "type": "nodes", + "content": { + "nodes": [ + { + "id": 5, + "name": "ECU", + "fields": [ + { + "id": 10, + "type": "telemetry", + "name": "tank_pressure", + "raw_name": "pressure_adc", + "mapped_name": "tank_pressure" + }, + { + "id": 20, + "type": "parameter", + "name": "valve_opening", + "raw_name": "valve_raw", + "mapped_name": "valve_opening" + } + ] + } + ] + } +} +``` + +### FieldGetResponse + +Sent in response to a `get_field` command once the value is available. + +```json +{ + "type": "field_get_response", + "content": { + "node_id": 5, + "node_name": "ECU", + "raw_name": "pressure_adc", + "mapped_name": "tank_pressure", + "name": "tank_pressure", + "raw": 198, + "value": 100.0, + "unit": "bar", + "logical": "High" + } +} +``` + +## Commands To FerroFlow + +### SetParameter + +Sets a parameter value. With a mapped reference, FerroFlow inverse-applies the mapping before sending the raw CAN value. With a raw reference, FerroFlow sends the JSON value converted directly to the registered CAN data type. + +```json +{ + "type": "set_parameter", + "content": { + "field": { + "type": "mapped", + "name": "valve_opening" + }, + "value": 60.0 + } +} +``` + +```json +{ + "type": "set_parameter", + "content": { + "field": { + "type": "raw", + "node_name": "ECU", + "field_name": "valve_raw" + }, + "value": 100 + } +} +``` + +### GetField + +Requests the current value of a field from a node. + +```json +{ + "type": "get_field", + "content": { + "field": { + "type": "mapped", + "name": "tank_pressure" + } + } +} +``` + +```json +{ + "type": "get_field", + "content": { + "field": { + "type": "raw", + "node_name": "ECU", + "field_name": "pressure_adc" + } + } +} +``` + +### GetNodes + +Requests a `nodes` message. + +```json +{ + "type": "get_nodes", + "content": {} +} +``` + +### GetTelemetry + +Requests a full `telemetry` message. + +```json +{ + "type": "get_telemetry", + "content": {} +} +``` diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 31f14a7..3aef008 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -1 +1,547 @@ //! Code for managing socket connections to the frontend. + +use std::{ + io::{ErrorKind, Read, Write}, + net::{SocketAddr, TcpStream, ToSocketAddrs}, + sync::mpsc::{self, TryRecvError}, + thread, + time::Duration, +}; + +use anyhow::{Context, Result, bail}; +use chrono::Utc; +use serde::{Deserialize, Serialize}; + +use crate::{ + config::WebserverSocketConfig, + events::{self, Event, EventKind}, + nodes::{FieldValueSnapshot, NodeManager, NodeTelemetrySnapshot, mapping::FieldType}, +}; + +const SOCKET_POLL_PERIOD: Duration = Duration::from_millis(50); + +pub fn spawn_webserver_socket_worker<'a>( + config: WebserverSocketConfig, + node_manager: &'a NodeManager<'a>, + event_dispatcher: &'a events::EventDispatcher, + scope: &'a thread::Scope<'a, '_>, +) { + let (tx, rx) = mpsc::channel::(); + event_dispatcher.subscribe( + tx, + vec![ + EventKind::Shutdown, + EventKind::NodeFieldUpdated, + EventKind::NodeListUpdated, + ], + "Webserver socket thread", + ); + + scope.spawn(move || socket_worker(config, node_manager, rx)); +} + +fn socket_worker( + config: WebserverSocketConfig, + node_manager: &NodeManager<'_>, + rx: mpsc::Receiver, +) { + let reconnect_period = Duration::from_millis(config.reconnect_period_ms); + let address = match resolve_socket_addr(&config) { + Ok(address) => address, + Err(error) => { + eprintln!("Failed to resolve webserver socket address: {error:#}"); + return; + } + }; + + loop { + match TcpStream::connect(address) { + Ok(mut stream) => { + println!("Connected to webserver socket at {address}"); + if let Err(error) = stream.set_nonblocking(true) { + eprintln!("Failed to configure webserver socket as nonblocking: {error:#}"); + return; + } + + if let Err(error) = connected_socket_worker(&mut stream, node_manager, &rx) { + eprintln!("Error in webserver socket worker: {error:#}"); + } else { + println!("Webserver socket worker exiting gracefully."); + return; + } + } + Err(error) => { + eprintln!("Failed to connect to webserver socket at {address}: {error:#}"); + } + } + + match rx.recv_timeout(reconnect_period) { + Ok(Event::Shutdown) | Err(mpsc::RecvTimeoutError::Disconnected) => return, + Ok(_) | Err(mpsc::RecvTimeoutError::Timeout) => {} + } + } +} + +fn connected_socket_worker( + stream: &mut TcpStream, + node_manager: &NodeManager<'_>, + rx: &mpsc::Receiver, +) -> Result<()> { + let mut msg_buf = Vec::new(); + let mut last_telemetry_timestamp = Utc::now().timestamp_millis(); + + send_nodes_message(stream, node_manager)?; + send_telemetry_message(stream, node_manager, last_telemetry_timestamp)?; + + loop { + match rx.recv_timeout(SOCKET_POLL_PERIOD) { + Ok(event) => { + handle_event(stream, node_manager, event, &mut last_telemetry_timestamp)?; + } + Err(mpsc::RecvTimeoutError::Timeout) => {} + Err(mpsc::RecvTimeoutError::Disconnected) => return Ok(()), + } + + loop { + match rx.try_recv() { + Ok(Event::Shutdown) => return Ok(()), + Ok(event) => { + handle_event(stream, node_manager, event, &mut last_telemetry_timestamp)?; + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => return Ok(()), + } + } + + while let Some(msg_len) = read_message(stream, &mut msg_buf)? { + msg_buf.drain(..2); // remove length prefix + handle_command( + stream, + node_manager, + &msg_buf[..msg_len], + &mut last_telemetry_timestamp, + )?; + msg_buf.drain(..msg_len); + } + } +} + +fn handle_event( + stream: &mut TcpStream, + node_manager: &NodeManager<'_>, + event: Event, + last_telemetry_timestamp: &mut i64, +) -> Result<()> { + match event { + Event::NodeListUpdated => send_nodes_message(stream, node_manager), + Event::NodeFieldUpdated(field_log, source) => { + let Some(field) = node_manager + .field_value_snapshot_by_id(field_log.node_id as u8, field_log.field_id as u8) + else { + return Ok(()); + }; + + let node_id = field_log.node_id as u8; + match source { + events::NodeFieldUpdateSource::FieldGetRes => { + let response = FieldGetResponseContent::from_field(node_id, field); + send_message(stream, "field_get_response", response) + } + events::NodeFieldUpdateSource::ParameterSetConfirmation => { + // TODO + Ok(()) + } + events::NodeFieldUpdateSource::TelemetryUpdate => { + let previous_timestamp = *last_telemetry_timestamp; + let timestamp = Utc::now().timestamp_millis(); + *last_telemetry_timestamp = timestamp; + + let node = NodeTelemetrySnapshot { + id: node_id, + name: field.node_name.clone(), + telemetry: vec![field], + }; + send_message( + stream, + "telemetry_delta", + TelemetryDeltaContent { + prev_timestamp: previous_timestamp, + timestamp, + nodes: telemetry_nodes_from_snapshots(vec![node]), + }, + ) + } + } + } + _ => Ok(()), + } +} + +fn handle_command( + stream: &mut TcpStream, + node_manager: &NodeManager<'_>, + message: &[u8], + last_telemetry_timestamp: &mut i64, +) -> Result<()> { + let incoming: IncomingMessage = serde_json::from_slice(message) + .with_context(|| "failed to decode webserver command as JSON")?; + + match incoming.message_type.as_str() { + "set_parameter" => { + let command: SetParameterCommand = serde_json::from_value(incoming.content) + .with_context(|| "invalid set_parameter command content")?; + match command.field { + FieldReferenceCommand::Mapped { name } => { + let value = command + .value + .as_f64() + .with_context(|| format!("mapped parameter {name} requires a number"))?; + node_manager.set_mapped_value_by_mapped(&name, value) + } + FieldReferenceCommand::Raw { + node_name, + field_name, + } => node_manager.set_raw_value_by_raw(&node_name, &field_name, command.value), + } + } + "get_field" => { + let command: GetFieldCommand = serde_json::from_value(incoming.content) + .with_context(|| "invalid get_field command content")?; + request_field(node_manager, &command.field)?; + + Ok(()) + } + "get_nodes" => send_nodes_message(stream, node_manager), + "get_telemetry" => { + let timestamp = Utc::now().timestamp_millis(); + send_telemetry_message(stream, node_manager, timestamp)?; + *last_telemetry_timestamp = timestamp; + Ok(()) + } + other => bail!("unsupported command type {other}"), + } +} + +fn send_nodes_message(stream: &mut TcpStream, node_manager: &NodeManager<'_>) -> Result<()> { + let nodes = node_manager + .nodes_snapshot() + .into_iter() + .map(|node| NodeContent { + id: node.id, + name: node.name, + fields: node + .fields + .into_iter() + .map(|field| FieldContent { + id: field.id, + name: field.name, + raw_name: field.raw_name, + mapped_name: field.mapped_name, + kind: match field.kind { + FieldType::Telemetry => "telemetry", + FieldType::Parameter => "parameter", + }, + }) + .collect(), + }) + .collect(); + send_message(stream, "nodes", NodesContent { nodes }) +} + +fn request_field( + node_manager: &NodeManager<'_>, + field_reference: &FieldReferenceCommand, +) -> Result<()> { + match field_reference { + FieldReferenceCommand::Mapped { name } => node_manager.request_field_by_mapped(name), + FieldReferenceCommand::Raw { + node_name, + field_name, + } => node_manager.request_field_by_raw(node_name, field_name), + } +} + +fn send_telemetry_message( + stream: &mut TcpStream, + node_manager: &NodeManager<'_>, + timestamp: i64, +) -> Result<()> { + send_message( + stream, + "telemetry", + TelemetryContent { + timestamp, + nodes: telemetry_nodes_from_snapshots(node_manager.telemetry_snapshot()), + }, + ) +} + +fn telemetry_nodes_from_snapshots( + snapshots: Vec, +) -> Vec { + snapshots + .into_iter() + .map(|node| TelemetryNodeContent { + id: node.id, + name: node.name, + telemetry: node + .telemetry + .into_iter() + .map(TelemetryFieldContent::from) + .collect(), + }) + .collect() +} + +fn send_message( + stream: &mut TcpStream, + message_type: &'static str, + content: T, +) -> Result<()> { + let payload = serde_json::to_vec(&OutgoingMessage { + message_type, + content, + })?; + let len = u16::try_from(payload.len()) + .with_context(|| format!("socket message too large: {} bytes", payload.len()))?; + + stream.write_all(&len.to_be_bytes())?; + stream.write_all(&payload)?; + Ok(()) +} + +/// Attempts to read a single message from the socket. Returns true if a message was placed in the message buffer, false otherwise. +fn read_message(stream: &mut TcpStream, message_buffer: &mut Vec) -> Result> { + // Do we already have a full message in the buffer from a previous read? + if let Some(msg_len) = buffer_contains_full_message(message_buffer) { + return Ok(Some(msg_len)); + } + + // Read more data + let mut scratch = [0_u8; 4096]; + + loop { + match stream.read(&mut scratch) { + Ok(0) => break, // EOF, socket closed + Ok(bytes_read) => message_buffer.extend_from_slice(&scratch[..bytes_read]), + Err(error) if error.kind() == ErrorKind::Interrupted => continue, + Err(error) if matches!(error.kind(), ErrorKind::WouldBlock | ErrorKind::TimedOut) => { + break; + } + Err(error) => return Err(error.into()), + } + } + + // Do we have a full message now? + if let Some(msg_len) = buffer_contains_full_message(message_buffer) { + Ok(Some(msg_len)) + } else { + Ok(None) + } +} + +fn message_len_from_buffer(buffer: &[u8]) -> Option { + if buffer.len() < 2 { + None + } else { + Some(u16::from_be_bytes([buffer[0], buffer[1]]) as usize) + } +} + +fn buffer_contains_full_message(buffer: &[u8]) -> Option { + let len = message_len_from_buffer(buffer); + if let Some(len) = len + && buffer.len() >= len + 2 + { + Some(len) + } else { + None + } +} + +fn resolve_socket_addr(config: &WebserverSocketConfig) -> Result { + (config.host.as_str(), config.port) + .to_socket_addrs()? + .next() + .with_context(|| format!("{}:{} resolved to no addresses", config.host, config.port)) +} + +#[derive(Deserialize)] +struct IncomingMessage { + #[serde(rename = "type")] + message_type: String, + content: serde_json::Value, +} + +#[derive(Deserialize)] +struct SetParameterCommand { + field: FieldReferenceCommand, + value: serde_json::Value, +} + +#[derive(Deserialize)] +struct GetFieldCommand { + field: FieldReferenceCommand, +} + +#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq)] +#[serde(tag = "type", rename_all = "snake_case")] +enum FieldReferenceCommand { + Mapped { + name: String, + }, + Raw { + node_name: String, + field_name: String, + }, +} + +#[derive(Serialize)] +struct OutgoingMessage { + #[serde(rename = "type")] + message_type: &'static str, + content: T, +} + +#[derive(Serialize)] +struct TelemetryContent { + timestamp: i64, + nodes: Vec, +} + +#[derive(Serialize)] +struct TelemetryDeltaContent { + prev_timestamp: i64, + timestamp: i64, + nodes: Vec, +} + +#[derive(Serialize)] +struct TelemetryNodeContent { + id: u8, + name: String, + telemetry: Vec, +} + +#[derive(Serialize)] +struct TelemetryFieldContent { + id: u8, + name: String, + raw_name: String, + mapped_name: Option, + raw: serde_json::Value, + value: serde_json::Value, + unit: String, + logical: serde_json::Value, +} + +impl From for TelemetryFieldContent { + fn from(value: FieldValueSnapshot) -> Self { + Self { + id: value.id, + name: value.name, + raw_name: value.raw_name, + mapped_name: value.mapped_name, + raw: value.raw, + value: value.value, + unit: value.unit, + logical: value.logical, + } + } +} + +#[derive(Serialize)] +struct NodesContent { + nodes: Vec, +} + +#[derive(Serialize)] +struct NodeContent { + id: u8, + name: String, + fields: Vec, +} + +#[derive(Serialize)] +struct FieldContent { + id: u8, + name: String, + raw_name: String, + mapped_name: Option, + #[serde(rename = "type")] + kind: &'static str, +} + +#[derive(Serialize)] +struct FieldGetResponseContent { + node_id: u8, + node_name: String, + raw_name: String, + mapped_name: Option, + name: String, + raw: serde_json::Value, + value: serde_json::Value, + unit: String, + logical: serde_json::Value, +} + +impl FieldGetResponseContent { + fn from_field(node_id: u8, field: FieldValueSnapshot) -> Self { + Self { + node_id, + node_name: field.node_name, + raw_name: field.raw_name, + mapped_name: field.mapped_name, + name: field.name, + raw: field.raw, + value: field.value, + unit: field.unit, + logical: field.logical, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_mapped_field_command_reference() { + let command: GetFieldCommand = serde_json::from_value(serde_json::json!({ + "field": { + "type": "mapped", + "name": "tank_pressure" + } + })) + .expect("mapped command should parse"); + + assert_eq!( + command.field, + FieldReferenceCommand::Mapped { + name: "tank_pressure".to_string() + } + ); + } + + #[test] + fn parses_raw_field_command_reference() { + let command: SetParameterCommand = serde_json::from_value(serde_json::json!({ + "field": { + "type": "raw", + "node_name": "ECU", + "field_name": "valve_raw" + }, + "value": 42 + })) + .expect("raw command should parse"); + + assert_eq!( + command.field, + FieldReferenceCommand::Raw { + node_name: "ECU".to_string(), + field_name: "valve_raw".to_string() + } + ); + assert_eq!(command.value, serde_json::json!(42)); + } +} diff --git a/taplo.toml b/taplo.toml new file mode 100644 index 0000000..cc11e9b --- /dev/null +++ b/taplo.toml @@ -0,0 +1,5 @@ +[[rule]] +include = ["**/mapping.toml", "**/mapping/**/*.toml", "**/mappings/**/*.toml"] + +[rule.schema] +path = "./schemas/mapping.schema.json" diff --git a/tests/db_logging.rs b/tests/db_logging.rs index a91be77..ed8e515 100644 --- a/tests/db_logging.rs +++ b/tests/db_logging.rs @@ -28,13 +28,16 @@ fn logging_worker_persists_events_to_timescaledb() { db::spawn_logging_worker(database_url.clone(), &event_dispatcher, scope) .expect("logging worker should start"); - event_dispatcher.dispatch(events::Event::NodeFieldUpdated(db::FieldLog { - timestamp: Utc::now(), - node_id: 3, - field_id: 99, - field_name: "tank_pressure".into(), - field_value: json!(17.4), - })); + event_dispatcher.dispatch(events::Event::NodeFieldUpdated( + db::FieldLog { + timestamp: Utc::now(), + node_id: 3, + field_id: 99, + field_name: "tank_pressure".into(), + field_value: json!(17.4), + }, + events::NodeFieldUpdateSource::TelemetryUpdate, + )); event_dispatcher.dispatch(events::Event::Shutdown); }); diff --git a/tests/emulator.rs b/tests/emulator.rs index 334829b..bde8108 100644 --- a/tests/emulator.rs +++ b/tests/emulator.rs @@ -2,7 +2,8 @@ mod common; use crate::common::ShutdownGuard; use chrono::{DateTime, Utc}; -use ferro_flow::config::Config; +use ferro_flow::config::{Config, WebserverSocketConfig}; +use ferro_flow::nodes::mapping::Mapping; use ferro_flow::{events, nodes, run_with_dependencies}; use liquidcan::payloads::CanDataType; use std::{io::Write, time::Instant}; @@ -17,7 +18,7 @@ fn test_node_registration() { let emulator_config = ecuemulator_test_config_toml(&vcan_iface); let event_dispatcher = events::EventDispatcher::new(); - let node_manager = nodes::NodeManager::new(&event_dispatcher); + let node_manager = nodes::NodeManager::new(&event_dispatcher, Mapping::default()); let config = build_test_config(&vcan_iface); std::thread::scope(|s| { @@ -109,7 +110,7 @@ fn test_telemetry_group_updates() { let emulator_config = ecuemulator_test_config_toml(&vcan_iface); let event_dispatcher = events::EventDispatcher::new(); - let node_manager = nodes::NodeManager::new(&event_dispatcher); + let node_manager = nodes::NodeManager::new(&event_dispatcher, Mapping::default()); let config = build_test_config(&vcan_iface); println!("Starting application with test config: {:?}", config); @@ -185,6 +186,8 @@ fn build_test_config(can_iface: &str) -> Config { can_bus_interfaces: vec![can_iface.to_string()], heartbeat_period: 1, database_url: "".to_string(), + mapping_path: "".to_string(), + webserver_socket: WebserverSocketConfig::default(), } } diff --git a/tests/mapping/example1.toml b/tests/mapping/example1.toml new file mode 100644 index 0000000..9848d5b --- /dev/null +++ b/tests/mapping/example1.toml @@ -0,0 +1,35 @@ +[[mapping.node1]] +name = "Example Mapping 1" +type = "telemetry" +raw_field = "field1" +value.unit = "mAh" +value.slope = 0.5 +value.offset = 1.0 + +[[mapping.node1.logical]] +range = { min = 100 } +value = "High" + +[[mapping.node1.logical]] +range = { min = 50, max = 100 } +value = "Normal" + +[[mapping.node1.logical]] +range = { max = 50 } +value = "Low" + + +# alternatively + +[[mapping.node1]] +name = "Example Mapping 2" +type = "parameter" +raw_field = "field2" + +value = { slope = 0.5, offset = 1.0, unit = "mAh" } + +logical = [ + { range = { min = 100 }, value = "High" }, + { range = { min = 50, max = 100 }, value = "Normal" }, + { range = { max = 50 }, value = "Low" }, +] diff --git a/tests/mapping/split/engine.toml b/tests/mapping/split/engine.toml new file mode 100644 index 0000000..ace9a7a --- /dev/null +++ b/tests/mapping/split/engine.toml @@ -0,0 +1,10 @@ +[[mapping.EngineECU]] +name = "throttle_state" +type = "parameter" +raw_field = "throttle_raw" +value = { slope = 0.25, offset = 0.0, unit = "%" } +logical = [ + { range = { min = 100 }, value = "High" }, + { range = { min = 50, max = 100 }, value = "Normal" }, + { range = { max = 50 }, value = "Low" }, +] diff --git a/tests/mapping/split/fuel.toml b/tests/mapping/split/fuel.toml new file mode 100644 index 0000000..eb72155 --- /dev/null +++ b/tests/mapping/split/fuel.toml @@ -0,0 +1,5 @@ +[[mapping.FuelECU]] +name = "fuel_level" +type = "telemetry" +raw_field = "level_adc" +value = { slope = 0.5, offset = 1.0, unit = "mAh" }