From fc8adf2a5cdf9cc6276250703124051a7cb04f56 Mon Sep 17 00:00:00 2001 From: Kevin David Date: Sat, 27 May 2023 13:09:07 -0400 Subject: [PATCH 01/23] Feature/optional MQTT discovery Adds config options to enable MQTT Discovery as documented here: https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery As discussed in https://github.com/QuantumEntangledAndy/neolink/pull/14#issuecomment-1561076262 Unfortinuately, the floodlight/spotlight doesn't show up in the `` returned by the camera: https://github.com/QuantumEntangledAndy/neolink/pull/14#issuecomment-1563728970 Open to other suggestions on how to detect or enable these options. --- Cargo.lock | 8 ++++ Cargo.toml | 2 + sample_config.toml | 4 ++ src/config.rs | 10 ++++ src/mqtt/mod.rs | 116 +++++++++++++++++++++++++++++++++++++++++++-- src/mqtt/mqttc.rs | 16 ++++++- 6 files changed, 151 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be48a93f..080b54df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1266,6 +1266,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" + [[package]] name = "lazy_static" version = "1.4.0" @@ -1421,8 +1427,10 @@ dependencies = [ "gstreamer-app", "gstreamer-rtsp", "gstreamer-rtsp-server", + "heck", "is_sorted", "itertools", + "json", "lazy_static", "log", "neolink_core", diff --git a/Cargo.toml b/Cargo.toml index 13297919..f159b4d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,8 +23,10 @@ gstreamer = "0.20.3" gstreamer-app = "0.20.0" gstreamer-rtsp = "0.20.0" gstreamer-rtsp-server = { version = "0.20.3", features = ["v1_16"] } +heck = "0.4.1" is_sorted = "0.1.1" itertools = "0.10.5" +json = "0.12.4" lazy_static = "1.4.0" log = { version = "0.4.17", features = [ "release_max_level_debug" ] } neolink_core = { path = "crates/core", version = "0.5.12" } diff --git a/sample_config.toml b/sample_config.toml index da37fb6f..1f8128f1 100644 --- a/sample_config.toml +++ b/sample_config.toml @@ -38,6 +38,10 @@ address = "192.168.1.187:9000" # mqtt.broker_addr = "192.168.1.122" # mqtt.port = 1883 # mqtt.credentials = ["mqtt_user", "mqtt_password"] +# MQTT Discovery: https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery +# mqtt.discovery.topic = homeassistant # Uncomment to enable +# If using discovery, _ characters are replaced with spaces in the name and title case is applied +# mqtt.discovery.features = ["floodlight"] # Uncomment if this camera has a spotlight/floodlight # If you use a battery camera: **Instead** of an `address` supply the uid # as follows diff --git a/src/config.rs b/src/config.rs index 2cfd870b..842b7748 100644 --- a/src/config.rs +++ b/src/config.rs @@ -146,6 +146,16 @@ pub(crate) struct MqttConfig { #[serde(default)] pub(crate) client_auth: Option<(std::path::PathBuf, std::path::PathBuf)>, + + #[serde(default)] + pub(crate) discovery: Option, +} + +#[derive(Debug, Deserialize, Clone, Validate)] +pub(crate) struct MqttDiscoveryConfig { + pub(crate) topic: String, + + pub(crate) features: Vec, } fn validate_mqtt_config(config: &MqttConfig) -> Result<(), ValidationError> { diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index d0b00568..80eedaff 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -9,6 +9,7 @@ /// /// Control messages: /// +/// - `/control/floodlight [on|off]` Turns floodlight (if equipped) on/off /// - `/control/led [on|off]` Turns status LED on/off /// - `/control/pir [on|off]` Turns PIR on/off /// - `/control/ir [on|off|auto]` Turn IR lights on/off or automatically via light detection @@ -60,11 +61,13 @@ mod cmdline; mod event_cam; mod mqttc; -use crate::config::{CameraConfig, Config, MqttConfig}; +use crate::config::{CameraConfig, Config, MqttConfig, MqttDiscoveryConfig}; use anyhow::{anyhow, Context, Error, Result}; pub(crate) use cmdline::Opt; use event_cam::EventCam; pub(crate) use event_cam::{Direction, Messages}; +use heck::ToTitleCase; +use json::{array, object}; use log::*; use mqttc::{Mqtt, MqttReplyRef}; @@ -167,6 +170,10 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi .with_context(|| { format!("Failed to post connect over MQTT for {}", camera_name) })?; + + if let Some(discovery_config) = &mqtt_config.discovery { + enable_discovery(discovery_config, &mqtt_sender_cam, &cam_config).await?; + } } Messages::FloodlightOn => { mqtt_sender_cam @@ -174,7 +181,7 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi .await .with_context(|| { format!( - "Failed to publish gloodlight on over MQTT for {}", + "Failed to publish floodlight on over MQTT for {}", camera_name ) })?; @@ -185,7 +192,7 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi .await .with_context(|| { format!( - "Failed to publish gloodlight off over MQTT for {}", + "Failed to publish floodlight off over MQTT for {}", camera_name ) })?; @@ -219,6 +226,109 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi Ok(()) } +/// Enables MQTT discovery for a camera. See docs at https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery +async fn enable_discovery( + discovery_config: &MqttDiscoveryConfig, + mqtt_sender: &MqttSender, + cam_config: &Arc, +) -> Result<()> { + debug!("Enabling MQTT discovery for {}", cam_config.name); + + let mut connections = array![]; + if let Some(addr) = &cam_config.camera_addr { + connections + .push(array!["camera_addr", addr.clone()]) + .expect("Failed to add camera_addr to connections"); + } + if let Some(uid) = &cam_config.camera_uid { + connections + .push(array!["camera_uid", uid.clone()]) + .expect("Failed to add camera_uid to connections"); + } + + if connections.is_empty() { + error!( + "No connections found for camera {}, either addr or UID must be supplied", + cam_config.name + ); + return Ok(()); + } + + let friendly_name = cam_config.name.replace("_", " ").to_title_case(); + let device = object! { + connections: connections, + name: friendly_name.clone(), + identifiers: array![format!("neolink_{}", cam_config.name)], + manufacturer: "Reolink", + model: "Neolink", + sw_version: env!("CARGO_PKG_VERSION"), + }; + + let availability = object! { + topic: format!("neolink/{}/status", cam_config.name), + payload_available: "connected", + }; + + for feature in &discovery_config.features { + match feature.as_str() { + "floodlight" => { + let discovery_prefix = format!("{}/light", discovery_config.topic); + + let config_data = object! { + // Common across all potential features + device: device.clone(), + availability: availability.clone(), + + // Identifiers + name: format!("{} Floodlight", friendly_name.clone()), + unique_id: format!("neolink_{}_floodlight", cam_config.name), + // Match native home assistant integration: https://github.com/home-assistant/core/blob/dev/homeassistant/components/reolink/light.py#L49 + icon: "mdi:spotlight-beam", + + // State + state_topic: format!("neolink/{}/status/floodlight", cam_config.name), + state_value_template: "{{ value_json.state }}", + + // Control + command_topic: format!("neolink/{}/control/floodlight", cam_config.name), + // Lowercase payloads to match neolink convention + payload_on: "on", + payload_off: "off", + }; + + // Each feature needs to be individually registered + mqtt_sender + .send_message_with_root_topic( + &discovery_prefix, + "config", + &config_data.dump(), + true, + ) + .await + .with_context(|| { + format!( + "Failed to publish auto-discover data on over MQTT for {}", + cam_config.name + ) + })?; + } + _ => { + error!( + "Unsupported MQTT feature {} for {}", + feature, cam_config.name + ); + } + } + } + + info!( + "Enabled MQTT discovery for {} with friendly name {}", + cam_config.name, friendly_name + ); + + Ok(()) +} + async fn handle_mqtt_message( msg: &MqttReply, event_cam_sender: EventCamSender, diff --git a/src/mqtt/mqttc.rs b/src/mqtt/mqttc.rs index bbfedefb..6dea40d1 100644 --- a/src/mqtt/mqttc.rs +++ b/src/mqtt/mqttc.rs @@ -50,15 +50,16 @@ pub(crate) struct MqttSender { } impl MqttSender { - pub async fn send_message( + pub async fn send_message_with_root_topic( &self, + root_topic: &str, sub_topic: &str, message: &str, retain: bool, ) -> Result<(), ClientError> { self.client .publish( - format!("neolink/{}/{}", self.name, sub_topic), + format!("{}/{}/{}", root_topic, self.name, sub_topic), QoS::AtLeastOnce, retain, message, @@ -67,6 +68,17 @@ impl MqttSender { Ok(()) } + pub async fn send_message( + &self, + sub_topic: &str, + message: &str, + retain: bool, + ) -> Result<(), ClientError> { + self.send_message_with_root_topic("neolink", sub_topic, message, retain) + .await?; + Ok(()) + } + async fn subscribe(&self) -> Result<(), ClientError> { self.client .subscribe(format!("neolink/{}/#", self.name), QoS::AtMostOnce) From fd3fffc0a11991cdf640e58ba0d3554e8eb8b5f4 Mon Sep 17 00:00:00 2001 From: Kevin David Date: Mon, 29 May 2023 13:00:26 -0400 Subject: [PATCH 02/23] Attempting to reduce `.clone()` usage --- src/mqtt/mod.rs | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 80eedaff..6c5a7cab 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -67,7 +67,7 @@ pub(crate) use cmdline::Opt; use event_cam::EventCam; pub(crate) use event_cam::{Direction, Messages}; use heck::ToTitleCase; -use json::{array, object}; +use json::{array, object, JsonValue}; use log::*; use mqttc::{Mqtt, MqttReplyRef}; @@ -234,15 +234,15 @@ async fn enable_discovery( ) -> Result<()> { debug!("Enabling MQTT discovery for {}", cam_config.name); - let mut connections = array![]; + let mut connections: JsonValue = array![]; if let Some(addr) = &cam_config.camera_addr { connections - .push(array!["camera_addr", addr.clone()]) + .push(array!["camera_addr", addr.as_str()]) .expect("Failed to add camera_addr to connections"); } if let Some(uid) = &cam_config.camera_uid { connections - .push(array!["camera_uid", uid.clone()]) + .push(array!["camera_uid", uid.as_str()]) .expect("Failed to add camera_uid to connections"); } @@ -255,19 +255,19 @@ async fn enable_discovery( } let friendly_name = cam_config.name.replace("_", " ").to_title_case(); - let device = object! { + let device = Arc::new(object! { connections: connections, - name: friendly_name.clone(), + name: friendly_name.as_str(), identifiers: array![format!("neolink_{}", cam_config.name)], manufacturer: "Reolink", model: "Neolink", sw_version: env!("CARGO_PKG_VERSION"), - }; + }); - let availability = object! { + let availability = Arc::new(object! { topic: format!("neolink/{}/status", cam_config.name), payload_available: "connected", - }; + }); for feature in &discovery_config.features { match feature.as_str() { @@ -276,11 +276,11 @@ async fn enable_discovery( let config_data = object! { // Common across all potential features - device: device.clone(), - availability: availability.clone(), + device: Arc::try_unwrap(device.clone()).unwrap(), + availability: Arc::try_unwrap(availability.clone()).unwrap(), // Identifiers - name: format!("{} Floodlight", friendly_name.clone()), + name: format!("{} Floodlight", friendly_name.as_str()), unique_id: format!("neolink_{}_floodlight", cam_config.name), // Match native home assistant integration: https://github.com/home-assistant/core/blob/dev/homeassistant/components/reolink/light.py#L49 icon: "mdi:spotlight-beam", @@ -323,7 +323,8 @@ async fn enable_discovery( info!( "Enabled MQTT discovery for {} with friendly name {}", - cam_config.name, friendly_name + cam_config.name, + friendly_name.as_str() ); Ok(()) From 2d378074019f8926edfe9a81a68c027b19cec13d Mon Sep 17 00:00:00 2001 From: Kevin David Date: Sun, 4 Jun 2023 12:44:20 -0400 Subject: [PATCH 03/23] Don't get clever trying to reuse device/availability --- src/mqtt/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 6c5a7cab..0e64bdb5 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -255,19 +255,19 @@ async fn enable_discovery( } let friendly_name = cam_config.name.replace("_", " ").to_title_case(); - let device = Arc::new(object! { + let device = object! { connections: connections, name: friendly_name.as_str(), identifiers: array![format!("neolink_{}", cam_config.name)], manufacturer: "Reolink", model: "Neolink", sw_version: env!("CARGO_PKG_VERSION"), - }); + }; - let availability = Arc::new(object! { + let availability = object! { topic: format!("neolink/{}/status", cam_config.name), payload_available: "connected", - }); + }; for feature in &discovery_config.features { match feature.as_str() { @@ -276,8 +276,8 @@ async fn enable_discovery( let config_data = object! { // Common across all potential features - device: Arc::try_unwrap(device.clone()).unwrap(), - availability: Arc::try_unwrap(availability.clone()).unwrap(), + device: device.clone(), + availability: availability.clone(), // Identifiers name: format!("{} Floodlight", friendly_name.as_str()), From b5136ed9130e87ea7c6a91d59244d855f922935e Mon Sep 17 00:00:00 2001 From: Kevin David Date: Sun, 4 Jun 2023 12:51:03 -0400 Subject: [PATCH 04/23] Document MQTT Discovery in README.md --- README.md | 15 +++++++++++++++ sample_config.toml | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8cde7cf4..c2f3816b 100644 --- a/README.md +++ b/README.md @@ -208,6 +208,21 @@ Query Messages: - `/query/battery` Request that the camera reports its battery level - `/query/pir` Request that the camera reports its pir status +#### MQTT Discovery + +[MQTT Discovery](https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery) is partially supported. +Currently, discovery is opt-in and camera features must be manually specified. + +```toml +[cameras.mqtt] + # + [cameras.mqtt.discovery] + topic = "homeassistant" + features = ["floodlight"] +``` + +See the sample config file for more details. + ### Pause To use the pause feature you will need to adjust your config file as such: diff --git a/sample_config.toml b/sample_config.toml index 1f8128f1..f8f2b468 100644 --- a/sample_config.toml +++ b/sample_config.toml @@ -39,7 +39,7 @@ address = "192.168.1.187:9000" # mqtt.port = 1883 # mqtt.credentials = ["mqtt_user", "mqtt_password"] # MQTT Discovery: https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery -# mqtt.discovery.topic = homeassistant # Uncomment to enable +# mqtt.discovery.topic = "homeassistant" # Uncomment to enable # If using discovery, _ characters are replaced with spaces in the name and title case is applied # mqtt.discovery.features = ["floodlight"] # Uncomment if this camera has a spotlight/floodlight From 48f385e73acf64a5169bbbfd27260eb776297296 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Tue, 6 Jun 2023 15:55:04 +0700 Subject: [PATCH 05/23] Add camera preview auto discovery --- Cargo.lock | 8 +--- Cargo.toml | 2 +- src/mqtt/mod.rs | 110 ++---------------------------------------------- 3 files changed, 5 insertions(+), 115 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 080b54df..91dd8607 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1266,12 +1266,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "json" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd" - [[package]] name = "lazy_static" version = "1.4.0" @@ -1430,13 +1424,13 @@ dependencies = [ "heck", "is_sorted", "itertools", - "json", "lazy_static", "log", "neolink_core", "regex", "rumqttc", "serde", + "serde_json", "time", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index f159b4d1..f789f015 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,13 +26,13 @@ gstreamer-rtsp-server = { version = "0.20.3", features = ["v1_16"] } heck = "0.4.1" is_sorted = "0.1.1" itertools = "0.10.5" -json = "0.12.4" lazy_static = "1.4.0" log = { version = "0.4.17", features = [ "release_max_level_debug" ] } neolink_core = { path = "crates/core", version = "0.5.12" } regex = "1.7.3" rumqttc = "0.20.0" serde = { version = "1.0.160", features = ["derive"] } +serde_json = "1.0.96" time = "0.3.20" tokio = { version = "1.27.0", features = ["rt-multi-thread", "macros", "io-util", "tracing"] } tokio-stream = "0.1.12" diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 0e64bdb5..1ebd924d 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -58,22 +58,22 @@ use std::sync::Arc; use tokio::time::{sleep, Duration}; mod cmdline; +mod discovery; mod event_cam; mod mqttc; -use crate::config::{CameraConfig, Config, MqttConfig, MqttDiscoveryConfig}; +use crate::config::{CameraConfig, Config, MqttConfig}; use anyhow::{anyhow, Context, Error, Result}; pub(crate) use cmdline::Opt; use event_cam::EventCam; pub(crate) use event_cam::{Direction, Messages}; -use heck::ToTitleCase; -use json::{array, object, JsonValue}; use log::*; use mqttc::{Mqtt, MqttReplyRef}; use self::{ event_cam::EventCamSender, mqttc::{MqttReply, MqttSender}, + discovery::enable_discovery, }; /// Entry point for the mqtt subcommand @@ -226,110 +226,6 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi Ok(()) } -/// Enables MQTT discovery for a camera. See docs at https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery -async fn enable_discovery( - discovery_config: &MqttDiscoveryConfig, - mqtt_sender: &MqttSender, - cam_config: &Arc, -) -> Result<()> { - debug!("Enabling MQTT discovery for {}", cam_config.name); - - let mut connections: JsonValue = array![]; - if let Some(addr) = &cam_config.camera_addr { - connections - .push(array!["camera_addr", addr.as_str()]) - .expect("Failed to add camera_addr to connections"); - } - if let Some(uid) = &cam_config.camera_uid { - connections - .push(array!["camera_uid", uid.as_str()]) - .expect("Failed to add camera_uid to connections"); - } - - if connections.is_empty() { - error!( - "No connections found for camera {}, either addr or UID must be supplied", - cam_config.name - ); - return Ok(()); - } - - let friendly_name = cam_config.name.replace("_", " ").to_title_case(); - let device = object! { - connections: connections, - name: friendly_name.as_str(), - identifiers: array![format!("neolink_{}", cam_config.name)], - manufacturer: "Reolink", - model: "Neolink", - sw_version: env!("CARGO_PKG_VERSION"), - }; - - let availability = object! { - topic: format!("neolink/{}/status", cam_config.name), - payload_available: "connected", - }; - - for feature in &discovery_config.features { - match feature.as_str() { - "floodlight" => { - let discovery_prefix = format!("{}/light", discovery_config.topic); - - let config_data = object! { - // Common across all potential features - device: device.clone(), - availability: availability.clone(), - - // Identifiers - name: format!("{} Floodlight", friendly_name.as_str()), - unique_id: format!("neolink_{}_floodlight", cam_config.name), - // Match native home assistant integration: https://github.com/home-assistant/core/blob/dev/homeassistant/components/reolink/light.py#L49 - icon: "mdi:spotlight-beam", - - // State - state_topic: format!("neolink/{}/status/floodlight", cam_config.name), - state_value_template: "{{ value_json.state }}", - - // Control - command_topic: format!("neolink/{}/control/floodlight", cam_config.name), - // Lowercase payloads to match neolink convention - payload_on: "on", - payload_off: "off", - }; - - // Each feature needs to be individually registered - mqtt_sender - .send_message_with_root_topic( - &discovery_prefix, - "config", - &config_data.dump(), - true, - ) - .await - .with_context(|| { - format!( - "Failed to publish auto-discover data on over MQTT for {}", - cam_config.name - ) - })?; - } - _ => { - error!( - "Unsupported MQTT feature {} for {}", - feature, cam_config.name - ); - } - } - } - - info!( - "Enabled MQTT discovery for {} with friendly name {}", - cam_config.name, - friendly_name.as_str() - ); - - Ok(()) -} - async fn handle_mqtt_message( msg: &MqttReply, event_cam_sender: EventCamSender, From 10956e8c8e02e38e7e263346edaa374c48da3161 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 13:03:58 +0700 Subject: [PATCH 06/23] Add camera preview into mqtt via snap shot command --- Cargo.lock | 7 ++++--- Cargo.toml | 1 + crates/core/src/bc/model.rs | 2 ++ crates/core/src/bc/xml.rs | 35 ++++++++++++++++++++++++++++++++- crates/core/src/bc_protocol.rs | 1 + src/mqtt/event_cam.rs | 36 +++++++++++++++++++++++++++++++++- src/mqtt/mod.rs | 15 +++++++++++--- src/mqtt/mqttc.rs | 2 ++ 8 files changed, 91 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91dd8607..a34b50a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -198,9 +198,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.0" +version = "0.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a" +checksum = "604178f6c5c21f02dc555784810edfb88d34ac2c73b2eae109655649ee73ce3d" [[package]] name = "bitflags" @@ -1411,6 +1411,7 @@ name = "neolink" version = "0.5.12" dependencies = [ "anyhow", + "base64 0.21.2", "byte-slice-cast", "clap", "console-subscriber", @@ -1919,7 +1920,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d194b56d58803a43635bdc398cd17e383d6f71f9182b9a192c127ca42494a59b" dependencies = [ - "base64 0.21.0", + "base64 0.21.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f789f015..498eb136 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ [dependencies] anyhow = "1.0.70" +base64 = "0.21.2" byte-slice-cast = "1.2.2" clap = { version = "4.2.2", features = ["derive", "cargo"] } console-subscriber = "0.1.8" diff --git a/crates/core/src/bc/model.rs b/crates/core/src/bc/model.rs index 8a52f592..f7d69969 100644 --- a/crates/core/src/bc/model.rs +++ b/crates/core/src/bc/model.rs @@ -33,6 +33,8 @@ pub const MSG_ID_VERSION: u32 = 80; pub const MSG_ID_PING: u32 = 93; /// General system info messages have this ID pub const MSG_ID_GET_GENERAL: u32 = 104; +/// Snapshot to get a jpeg image +pub const MSG_ID_SNAP: u32 = 109; /// Used to get the abilities of a user pub const MSG_ID_ABILITY_INFO: u32 = 151; /// Setting general system info (clock mostly) messages have this ID diff --git a/crates/core/src/bc/xml.rs b/crates/core/src/bc/xml.rs index 3cda05c8..adc790e0 100644 --- a/crates/core/src/bc/xml.rs +++ b/crates/core/src/bc/xml.rs @@ -97,6 +97,9 @@ pub struct BcXml { /// Recieved on request for a link type #[yaserde(rename = "LinkType")] pub link_type: Option, + /// Recieved AND send for the snap message + #[yaserde(rename = "Snap")] + pub snap: Option, } impl BcXml { @@ -700,7 +703,37 @@ pub struct AbilityInfoSubModule { pub struct LinkType { #[yaserde(rename = "type")] /// Type of connection known values `"LAN"` - link_type: String, + pub link_type: String, +} + +/// The Link Type contains the type of connection present +#[derive(PartialEq, Eq, Default, Debug, YaDeserialize, YaSerialize)] +pub struct Snap { + #[yaserde(rename = "channelId")] + /// The channel id to get the snapshot from + pub channel_id: u8, + /// Unknown, observed values: 0 + /// value is only set on request + #[yaserde(rename = "logicChannel")] + pub logic_channel: Option, + /// Time of snapshot, zero when requesting + pub time: u32, + /// Request a full frame, observed values: 0 + /// value is only set on request + #[yaserde(rename = "fullFrame")] + pub full_frame: Option, + /// Stream name, observed values: `main`, `sub` + /// value is only set on request + #[yaserde(rename = "streamType")] + pub stream_type: Option, + /// File name, usually of the form `01_20230518140240.jpg` + /// value is only set on recieve + #[yaserde(rename = "fileName")] + pub file_name: Option, + /// Size in bytes of the picture + /// value is only set on recieve + #[yaserde(rename = "pictureSize")] + pub picture_size: Option, } /// Convience function to return the xml version used throughout the library diff --git a/crates/core/src/bc_protocol.rs b/crates/core/src/bc_protocol.rs index 4b3b889c..f838bfc4 100644 --- a/crates/core/src/bc_protocol.rs +++ b/crates/core/src/bc_protocol.rs @@ -27,6 +27,7 @@ mod pirstate; mod ptz; mod reboot; mod resolution; +mod snap; mod stream; mod talk; mod time; diff --git a/src/mqtt/event_cam.rs b/src/mqtt/event_cam.rs index 6fc652b5..70e57366 100644 --- a/src/mqtt/event_cam.rs +++ b/src/mqtt/event_cam.rs @@ -13,7 +13,7 @@ use tokio::{ time::{interval, sleep, Duration}, }; -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Clone)] pub(crate) enum Messages { Login, MotionStop, @@ -32,6 +32,7 @@ pub(crate) enum Messages { PIRQuery, Ptz(Direction), Preset(i8), + Snap(Vec), } #[derive(Debug, Copy, Clone)] @@ -210,6 +211,11 @@ impl EventCamThread { camera: arc_cam.clone(), }; + let mut snap_thread = SnapThread { + tx: self.tx.clone(), + camera: arc_cam.clone(), + }; + let mut keepalive_thread = KeepaliveThread { camera: arc_cam.clone(), }; @@ -256,6 +262,18 @@ impl EventCamThread { Ok(()) } }, + val = async { + info!("{}: Updating Preview", camera_config.name); + snap_thread.run().await + } => { + if let Err(e) = val { + error!("Snap thread aborted: {:?}", e); + Err(e) + } else { + debug!("Normal finish on Snap thread"); + Ok(()) + } + }, val = async { info!("{}: Setting up camera actions", camera_config.name); message_handler.listen().await @@ -329,6 +347,22 @@ impl FloodlightThread { } } +struct SnapThread { + tx: Sender, + camera: Arc, +} + +impl SnapThread { + async fn run(&mut self) -> Result<()> { + let mut inter = interval(Duration::from_millis(500)); + loop { + inter.tick().await; + let snapshot = self.camera.get_snapshot().await?; + self.tx.send(Messages::Snap(snapshot)).await?; + } + } +} + struct KeepaliveThread { camera: Arc, } diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 1ebd924d..4f8db096 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -1,3 +1,4 @@ +use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; /// /// # Neolink MQTT /// @@ -71,9 +72,9 @@ use log::*; use mqttc::{Mqtt, MqttReplyRef}; use self::{ + discovery::enable_discovery, event_cam::EventCamSender, mqttc::{MqttReply, MqttSender}, - discovery::enable_discovery, }; /// Entry point for the mqtt subcommand @@ -135,7 +136,8 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi tokio::select! { v = async { // Normal poll operations - while let Ok(msg) = mqtt.poll().await { + loop { + let msg = mqtt.poll().await?; tokio::task::yield_now().await; // Put the reply on it's own async thread so we can safely sleep // and wait for it to reply in it's own time @@ -152,7 +154,6 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi } }); } - Ok(()) } => v, // Wait on any error from any of the error channels and if we get it we abort v = error_recv.recv() => v.map(Err).unwrap_or_else(|| Err(anyhow!("Listen on camera error channel closed"))), @@ -213,6 +214,14 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi format!("Failed to publish motion start for {}", camera_name) })?; } + Messages::Snap(data) => { + mqtt_sender_cam + .send_message("status/preview", BASE64.encode(data).as_str(), true) + .await + .with_context(|| { + format!("Failed to publish preview over MQTT for {}", camera_name) + })?; + } _ => {} } } diff --git a/src/mqtt/mqttc.rs b/src/mqtt/mqttc.rs index 6dea40d1..6f499549 100644 --- a/src/mqtt/mqttc.rs +++ b/src/mqtt/mqttc.rs @@ -168,6 +168,8 @@ impl Mqtt { &config.broker_addr, config.port, ); + let max_size = 100 * (1024 * 1024); + mqttoptions.set_max_packet_size(max_size, max_size); // Use TLS if ca path is set if let Some(ca_path) = &config.ca { From b221fe69614c8158a48d1582d0bb7274932e9d61 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 13:04:34 +0700 Subject: [PATCH 07/23] Actually add the new .rs --- src/mqtt/discovery.rs | 235 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 src/mqtt/discovery.rs diff --git a/src/mqtt/discovery.rs b/src/mqtt/discovery.rs new file mode 100644 index 00000000..d775aa85 --- /dev/null +++ b/src/mqtt/discovery.rs @@ -0,0 +1,235 @@ +//! Enable and configures home assistant MQTTT discovery +//! +//! https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery +//! +use anyhow::{Context, Result}; +use heck::ToTitleCase; +use log::*; +use std::sync::Arc; + +use super::mqttc::MqttSender; +use crate::config::{CameraConfig, MqttDiscoveryConfig}; +use serde::{Serialize, Serializer}; + +#[derive(Debug, Clone)] +struct DiscoveryConnection { + connection_type: String, + connection_id: String, +} + +impl Serialize for DiscoveryConnection { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + vec![&self.connection_type, &self.connection_id].serialize(serializer) + } +} + +#[derive(Serialize, Debug, Clone)] +struct DiscoveryDevice { + name: String, + connections: Vec, + identifiers: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + manufacturer: Option, + #[serde(skip_serializing_if = "Option::is_none")] + model: Option, + #[serde(skip_serializing_if = "Option::is_none")] + sw_version: Option, +} + +#[derive(Serialize, Debug, Clone)] +struct DiscoveryAvaliablity { + topic: String, + #[serde(skip_serializing_if = "Option::is_none")] + payload_available: Option, + #[serde(skip_serializing_if = "Option::is_none")] + payload_not_available: Option, +} + +#[derive(Serialize, Debug)] +struct DiscoveryLight { + name: String, + unique_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + icon: Option, + #[serde(skip_serializing_if = "Option::is_none")] + state_topic: Option, + #[serde(skip_serializing_if = "Option::is_none")] + state_value_template: Option, + #[serde(skip_serializing_if = "Option::is_none")] + command_topic: Option, + payload_on: String, + payload_off: String, + device: DiscoveryDevice, + availability: DiscoveryAvaliablity, +} + +#[derive(Serialize, Debug)] +#[allow(dead_code)] +enum Encoding { + None, + #[serde(rename = "b64")] + Base64, +} + +impl Encoding { + fn is_none(&self) -> bool { + matches!(self, Self::None) + } +} + +#[derive(Serialize, Debug)] +struct DiscoveryCamera { + name: String, + unique_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + icon: Option, + topic: String, + device: DiscoveryDevice, + availability: DiscoveryAvaliablity, + #[serde(skip_serializing_if = "Encoding::is_none")] + image_encoding: Encoding, +} + +/// Enables MQTT discovery for a camera. See docs at https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery +pub(crate) async fn enable_discovery( + discovery_config: &MqttDiscoveryConfig, + mqtt_sender: &MqttSender, + cam_config: &Arc, +) -> Result<()> { + debug!("Enabling MQTT discovery for {}", cam_config.name); + + let mut connections = vec![]; + if let Some(addr) = &cam_config.camera_addr { + connections.push(DiscoveryConnection { + connection_type: "camera_addr".to_string(), + connection_id: addr.clone(), + }); + } + if let Some(uid) = &cam_config.camera_uid { + connections.push(DiscoveryConnection { + connection_type: "camera_uid".to_string(), + connection_id: uid.clone(), + }); + } + + if connections.is_empty() { + error!( + "No connections found for camera {}, either addr or UID must be supplied", + cam_config.name + ); + return Ok(()); + } + + let friendly_name = cam_config.name.replace('_', " ").to_title_case(); + let device = DiscoveryDevice { + name: friendly_name.clone(), + connections, + identifiers: vec![format!("neolink_{}", cam_config.name)], + manufacturer: Some("Reolink".to_string()), + model: Some("Neolink".to_string()), + sw_version: Some(env!("CARGO_PKG_VERSION").to_string()), + }; + + let availability = DiscoveryAvaliablity { + topic: format!("neolink/{}/status", cam_config.name), + payload_available: Some("connected".to_string()), + payload_not_available: None, + }; + + for feature in &discovery_config.features { + match feature.as_str() { + "floodlight" => { + let config_data = DiscoveryLight { + // Common across all potential features + device: device.clone(), + availability: availability.clone(), + + // Identifiers + name: format!("{} Floodlight", friendly_name.as_str()), + unique_id: format!("neolink_{}_floodlight", cam_config.name), + // Match native home assistant integration: https://github.com/home-assistant/core/blob/dev/homeassistant/components/reolink/light.py#L49 + icon: Some("mdi:spotlight-beam".to_string()), + + // State + state_topic: Some(format!("neolink/{}/status/floodlight", cam_config.name)), + state_value_template: Some("{{ value_json.state }}".to_string()), + + // Control + command_topic: Some(format!("neolink/{}/control/floodlight", cam_config.name)), + // Lowercase payloads to match neolink convention + payload_on: "on".to_string(), + payload_off: "off".to_string(), + }; + + // Each feature needs to be individually registered + mqtt_sender + .send_message_with_root_topic( + &format!("{}/light", discovery_config.topic), + "config", + &serde_json::to_string(&config_data).with_context(|| { + "Cound not serialise discovery light config into json" + })?, + true, + ) + .await + .with_context(|| { + format!( + "Failed to publish floodlight auto-discover data on over MQTT for {}", + cam_config.name + ) + })?; + } + "camera" => { + let config_data = DiscoveryCamera { + // Common across all potential features + device: device.clone(), + availability: availability.clone(), + + // Identifiers + name: format!("{} Camera", friendly_name.as_str()), + unique_id: format!("neolink_{}_camera", cam_config.name), + icon: Some("mdi:camera-iris".to_string()), + + // Camera specific + topic: format!("neolink/{}/status/preview", cam_config.name), + image_encoding: Encoding::Base64, + }; + + // Each feature needs to be individually registered + mqtt_sender + .send_message_with_root_topic( + &format!("{}/camera", discovery_config.topic), + "config", + &serde_json::to_string(&config_data).with_context(|| { + "Cound not serialise discovery camera config into json" + })?, + true, + ) + .await + .with_context(|| { + format!( + "Failed to publish camera auto-discover data on over MQTT for {}", + cam_config.name + ) + })?; + } + _ => { + error!( + "Unsupported MQTT feature {} for {}", + feature, cam_config.name + ); + } + } + } + + info!( + "Enabled MQTT discovery for {} with friendly name {}", + cam_config.name, + friendly_name.as_str() + ); + + Ok(()) +} From c2e613d68bc102ed7a2d5a57b4c2b578b4269dae Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 13:06:01 +0700 Subject: [PATCH 08/23] fmt --- src/mqtt/discovery.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mqtt/discovery.rs b/src/mqtt/discovery.rs index d775aa85..00599715 100644 --- a/src/mqtt/discovery.rs +++ b/src/mqtt/discovery.rs @@ -192,7 +192,7 @@ pub(crate) async fn enable_discovery( name: format!("{} Camera", friendly_name.as_str()), unique_id: format!("neolink_{}_camera", cam_config.name), icon: Some("mdi:camera-iris".to_string()), - + // Camera specific topic: format!("neolink/{}/status/preview", cam_config.name), image_encoding: Encoding::Base64, From 2b3ae0e522b2277b7cacd4f666722c4414c5bc77 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 13:08:38 +0700 Subject: [PATCH 09/23] Add snap.rs --- crates/core/src/bc_protocol/snap.rs | 79 +++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 crates/core/src/bc_protocol/snap.rs diff --git a/crates/core/src/bc_protocol/snap.rs b/crates/core/src/bc_protocol/snap.rs new file mode 100644 index 00000000..45855c74 --- /dev/null +++ b/crates/core/src/bc_protocol/snap.rs @@ -0,0 +1,79 @@ +use futures::{StreamExt, TryStreamExt}; + +use super::{BcCamera, Error, Result}; +use crate::bc::{model::*, xml::*}; + +impl BcCamera { + /// Get the snapshot image + pub async fn get_snapshot(&self) -> Result> { + let connection = self.get_connection(); + let msg_num = self.new_message_num(); + let mut sub_get = connection.subscribe(msg_num).await?; + let get = Bc { + meta: BcMeta { + msg_id: MSG_ID_SNAP, + channel_id: self.channel_id, + msg_num, + response_code: 0, + stream_type: 0, + class: 0x6414, + }, + body: BcBody::ModernMsg(ModernMsg { + extension: Some(Extension { + channel_id: Some(self.channel_id), + ..Default::default() + }), + payload: Some(BcPayloads::BcXml(BcXml { + snap: Some(Snap { + channel_id: self.channel_id, + logic_channel: Some(self.channel_id), + time: 0, + full_frame: Some(0), + stream_type: Some("main".to_string()), + ..Default::default() + }), + ..Default::default() + })), + }), + }; + + sub_get.send(get).await?; + let msg = sub_get.recv().await?; + if msg.meta.response_code != 200 { + return Err(Error::CameraServiceUnavaliable); + } + + if let BcBody::ModernMsg(ModernMsg { + payload: + Some(BcPayloads::BcXml(BcXml { + snap: + Some(Snap { + file_name: Some(filename), + picture_size: Some(expected_size), + .. + }), + .. + })), + .. + }) = msg.body + { + log::trace!("Got snap {} with size {}", filename, expected_size); + let expected_size = expected_size as usize; + + let binary_stream = sub_get.payload_stream(); + let result: Vec<_> = binary_stream + .map_ok(|i| tokio_stream::iter(i).map(Result::Ok)) + .try_flatten() + .take(expected_size) + .try_collect() + .await?; + log::trace!("Got whole of the snap: {}", result.len()); + Ok(result) + } else { + Err(Error::UnintelligibleReply { + reply: std::sync::Arc::new(Box::new(msg)), + why: "Expected Snap xml but it was not recieved", + }) + } + } +} From dba2f243649226299e11b10fca44d74a0daf35ba Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 15:41:10 +0700 Subject: [PATCH 10/23] Add all the mqtt controls to auto discovery --- README.md | 1 + src/mqtt/discovery.rs | 279 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 275 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c2f3816b..2d2dcb7f 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,7 @@ Status Messages: of the battery status - `/status/pir` Sent in reply to a `/query/pir` an XML encoded version of the pir status +- `/status/motion` Contains the motion detection alarm status. `on` for motion and `off` for still Query Messages: diff --git a/src/mqtt/discovery.rs b/src/mqtt/discovery.rs index 00599715..8264b97f 100644 --- a/src/mqtt/discovery.rs +++ b/src/mqtt/discovery.rs @@ -54,6 +54,9 @@ struct DiscoveryLight { unique_id: String, #[serde(skip_serializing_if = "Option::is_none")] icon: Option, + device: DiscoveryDevice, + availability: DiscoveryAvaliablity, + // Light specific #[serde(skip_serializing_if = "Option::is_none")] state_topic: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -62,8 +65,6 @@ struct DiscoveryLight { command_topic: Option, payload_on: String, payload_off: String, - device: DiscoveryDevice, - availability: DiscoveryAvaliablity, } #[derive(Serialize, Debug)] @@ -86,13 +87,82 @@ struct DiscoveryCamera { unique_id: String, #[serde(skip_serializing_if = "Option::is_none")] icon: Option, - topic: String, device: DiscoveryDevice, availability: DiscoveryAvaliablity, + // Camera specific + topic: String, #[serde(skip_serializing_if = "Encoding::is_none")] image_encoding: Encoding, } +#[derive(Serialize, Debug)] +struct DiscoverySwitch { + name: String, + unique_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + icon: Option, + device: DiscoveryDevice, + availability: DiscoveryAvaliablity, + // Switch specific + // - Control + command_topic: String, + payload_off: String, + payload_on: String, + // - State + #[serde(skip_serializing_if = "Option::is_none")] + state_topic: Option, + #[serde(skip_serializing_if = "Option::is_none")] + state_off: Option, + #[serde(skip_serializing_if = "Option::is_none")] + state_on: Option, +} + +#[derive(Serialize, Debug)] +struct DiscoverySelect { + name: String, + unique_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + icon: Option, + device: DiscoveryDevice, + availability: DiscoveryAvaliablity, + // Switch specific + // - Control + command_topic: String, + options: Vec, + // - State + #[serde(skip_serializing_if = "Option::is_none")] + state_topic: Option, +} + +#[derive(Serialize, Debug)] +struct DiscoveryBinarySensor { + name: String, + unique_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + icon: Option, + device: DiscoveryDevice, + availability: DiscoveryAvaliablity, + // BinarySensor specific + payload_off: String, + payload_on: String, + // - State + state_topic: String, +} + +#[derive(Serialize, Debug)] +struct DiscoveryButton { + name: String, + unique_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + icon: Option, + device: DiscoveryDevice, + availability: DiscoveryAvaliablity, + // Button specific + command_topic: String, + #[serde(skip_serializing_if = "Option::is_none")] + payload_press: Option, +} + /// Enables MQTT discovery for a camera. See docs at https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery pub(crate) async fn enable_discovery( discovery_config: &MqttDiscoveryConfig, @@ -167,7 +237,10 @@ pub(crate) async fn enable_discovery( // Each feature needs to be individually registered mqtt_sender .send_message_with_root_topic( - &format!("{}/light", discovery_config.topic), + &format!( + "{}/light/{}", + discovery_config.topic, &config_data.unique_id + ), "config", &serde_json::to_string(&config_data).with_context(|| { "Cound not serialise discovery light config into json" @@ -201,7 +274,10 @@ pub(crate) async fn enable_discovery( // Each feature needs to be individually registered mqtt_sender .send_message_with_root_topic( - &format!("{}/camera", discovery_config.topic), + &format!( + "{}/camera/{}", + discovery_config.topic, &config_data.unique_id + ), "config", &serde_json::to_string(&config_data).with_context(|| { "Cound not serialise discovery camera config into json" @@ -216,6 +292,199 @@ pub(crate) async fn enable_discovery( ) })?; } + "led" => { + let config_data = DiscoverySwitch { + // Common across all potential features + device: device.clone(), + availability: availability.clone(), + + // Identifiers + name: format!("{} LED", friendly_name.as_str()), + unique_id: format!("neolink_{}_led", cam_config.name), + icon: Some("mdi:led-on".to_string()), + + // Switch specific + command_topic: format!("neolink/{}/control/led", cam_config.name), + payload_off: "off".to_string(), + payload_on: "on".to_string(), + state_topic: None, + state_off: None, + state_on: None, + }; + + // Each feature needs to be individually registered + mqtt_sender + .send_message_with_root_topic( + &format!( + "{}/switch/{}", + discovery_config.topic, &config_data.unique_id + ), + "config", + &serde_json::to_string(&config_data).with_context(|| { + "Cound not serialise discovery led config into json" + })?, + true, + ) + .await + .with_context(|| { + format!( + "Failed to publish led auto-discover data on over MQTT for {}", + cam_config.name + ) + })?; + } + "ir" => { + let config_data = DiscoverySelect { + // Common across all potential features + device: device.clone(), + availability: availability.clone(), + + // Identifiers + name: format!("{} IR", friendly_name.as_str()), + unique_id: format!("neolink_{}_ir", cam_config.name), + icon: Some("mdi:lightbulb-night".to_string()), + + // Switch specific + command_topic: format!("neolink/{}/control/ir", cam_config.name), + options: vec!["on".to_string(), "off".to_string(), "auto".to_string()], + state_topic: None, + }; + + // Each feature needs to be individually registered + mqtt_sender + .send_message_with_root_topic( + &format!( + "{}/select/{}", + discovery_config.topic, &config_data.unique_id + ), + "config", + &serde_json::to_string(&config_data).with_context(|| { + "Cound not serialise discovery led config into json" + })?, + true, + ) + .await + .with_context(|| { + format!( + "Failed to publish led auto-discover data on over MQTT for {}", + cam_config.name + ) + })?; + } + "motion" => { + let config_data = DiscoveryBinarySensor { + // Common across all potential features + device: device.clone(), + availability: availability.clone(), + + // Identifiers + name: format!("{} MD", friendly_name.as_str()), + unique_id: format!("neolink_{}_md", cam_config.name), + icon: Some("mdi:motion-sensor".to_string()), + + // Switch specific + state_topic: format!("neolink/{}/status/motion", cam_config.name), + payload_off: "off".to_string(), + payload_on: "on".to_string(), + }; + + // Each feature needs to be individually registered + mqtt_sender + .send_message_with_root_topic( + &format!( + "{}/binary_sensor/{}", + discovery_config.topic, &config_data.unique_id + ), + "config", + &serde_json::to_string(&config_data).with_context(|| { + "Cound not serialise discovery motion config into json" + })?, + true, + ) + .await + .with_context(|| { + format!( + "Failed to publish motion auto-discover data on over MQTT for {}", + cam_config.name + ) + })?; + } + "reboot" => { + let config_data = DiscoveryButton { + // Common across all potential features + device: device.clone(), + availability: availability.clone(), + + // Identifiers + name: format!("{} Reboot", friendly_name.as_str()), + unique_id: format!("neolink_{}_reboot", cam_config.name), + icon: Some("mdi:restart".to_string()), + + // Switch specific + command_topic: format!("neolink/{}/control/reboot", cam_config.name), + payload_press: None, + }; + + // Each feature needs to be individually registered + mqtt_sender + .send_message_with_root_topic( + &format!( + "{}/button/{}", + discovery_config.topic, &config_data.unique_id + ), + "config", + &serde_json::to_string(&config_data).with_context(|| { + "Cound not serialise discovery reboot config into json" + })?, + true, + ) + .await + .with_context(|| { + format!( + "Failed to publish reboot auto-discover data on over MQTT for {}", + cam_config.name + ) + })?; + } + "pt" => { + for dir in ["left", "right", "up", "down"] { + let config_data = DiscoveryButton { + // Common across all potential features + device: device.clone(), + availability: availability.clone(), + + // Identifiers + name: format!("{} Pan {}", friendly_name.as_str(), dir), + unique_id: format!("neolink_{}_pan_{}", cam_config.name, dir), + icon: Some(format!("mdi:pan-{}", dir)), + + // Switch specific + command_topic: format!("neolink/{}/control/ptz", cam_config.name), + payload_press: Some(dir.to_string()), + }; + + // Each feature needs to be individually registered + mqtt_sender + .send_message_with_root_topic( + &format!( + "{}/button/{}", + discovery_config.topic, &config_data.unique_id + ), + "config", + &serde_json::to_string(&config_data).with_context(|| { + "Cound not serialise discovery pt config into json" + })?, + true, + ) + .await + .with_context(|| { + format!( + "Failed to publish pt auto-discover data on over MQTT for {}", + cam_config.name + ) + })?; + } + } _ => { error!( "Unsupported MQTT feature {} for {}", From 00f30b8972a2abedf35291291d5bc5386cd500f2 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 15:57:07 +0700 Subject: [PATCH 11/23] Use enum for config --- src/config.rs | 4 +++- src/mqtt/discovery.rs | 42 +++++++++++++++++++++++++++--------------- src/mqtt/mod.rs | 1 + 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/src/config.rs b/src/config.rs index d57fadde..87de87a4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,8 +1,10 @@ +use crate::mqtt::Discoveries; use lazy_static::lazy_static; use neolink_core::bc_protocol::{DiscoveryMethods, PrintFormat}; use regex::Regex; use serde::Deserialize; use std::clone::Clone; +use std::collections::HashSet; use validator::{Validate, ValidationError}; use validator_derive::Validate; @@ -155,7 +157,7 @@ pub(crate) struct MqttConfig { pub(crate) struct MqttDiscoveryConfig { pub(crate) topic: String, - pub(crate) features: Vec, + pub(crate) features: HashSet, } fn validate_mqtt_config(config: &MqttConfig) -> Result<(), ValidationError> { diff --git a/src/mqtt/discovery.rs b/src/mqtt/discovery.rs index 8264b97f..9cad0711 100644 --- a/src/mqtt/discovery.rs +++ b/src/mqtt/discovery.rs @@ -9,7 +9,25 @@ use std::sync::Arc; use super::mqttc::MqttSender; use crate::config::{CameraConfig, MqttDiscoveryConfig}; -use serde::{Serialize, Serializer}; +use serde::{Deserialize, Serialize, Serializer}; + +#[derive(Debug, Deserialize, Clone, PartialEq, Eq, Copy, Hash)] +pub(crate) enum Discoveries { + #[serde(alias = "floodlight", alias = "light")] + Floodlight, + #[serde(alias = "camera")] + Camera, + #[serde(alias = "motion", alias = "md", alias = "pir")] + Motion, + #[serde(alias = "led")] + Led, + #[serde(alias = "ir")] + Ir, + #[serde(alias = "reboot")] + Reboot, + #[serde(alias = "pt")] + Pt, +} #[derive(Debug, Clone)] struct DiscoveryConnection { @@ -210,8 +228,8 @@ pub(crate) async fn enable_discovery( }; for feature in &discovery_config.features { - match feature.as_str() { - "floodlight" => { + match feature { + Discoveries::Floodlight => { let config_data = DiscoveryLight { // Common across all potential features device: device.clone(), @@ -255,7 +273,7 @@ pub(crate) async fn enable_discovery( ) })?; } - "camera" => { + Discoveries::Camera => { let config_data = DiscoveryCamera { // Common across all potential features device: device.clone(), @@ -292,7 +310,7 @@ pub(crate) async fn enable_discovery( ) })?; } - "led" => { + Discoveries::Led => { let config_data = DiscoverySwitch { // Common across all potential features device: device.clone(), @@ -333,7 +351,7 @@ pub(crate) async fn enable_discovery( ) })?; } - "ir" => { + Discoveries::Ir => { let config_data = DiscoverySelect { // Common across all potential features device: device.clone(), @@ -371,7 +389,7 @@ pub(crate) async fn enable_discovery( ) })?; } - "motion" => { + Discoveries::Motion => { let config_data = DiscoveryBinarySensor { // Common across all potential features device: device.clone(), @@ -409,7 +427,7 @@ pub(crate) async fn enable_discovery( ) })?; } - "reboot" => { + Discoveries::Reboot => { let config_data = DiscoveryButton { // Common across all potential features device: device.clone(), @@ -446,7 +464,7 @@ pub(crate) async fn enable_discovery( ) })?; } - "pt" => { + Discoveries::Pt => { for dir in ["left", "right", "up", "down"] { let config_data = DiscoveryButton { // Common across all potential features @@ -485,12 +503,6 @@ pub(crate) async fn enable_discovery( })?; } } - _ => { - error!( - "Unsupported MQTT feature {} for {}", - feature, cam_config.name - ); - } } } diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 4f8db096..ad806ef8 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -68,6 +68,7 @@ use anyhow::{anyhow, Context, Error, Result}; pub(crate) use cmdline::Opt; use event_cam::EventCam; pub(crate) use event_cam::{Direction, Messages}; +pub(crate) use discovery::Discoveries; use log::*; use mqttc::{Mqtt, MqttReplyRef}; From 9eb892ae270ef0701fd335433dd348fdae5b7b5c Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 15:59:31 +0700 Subject: [PATCH 12/23] fmt --- src/mqtt/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index ad806ef8..1c5a4af7 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -66,9 +66,9 @@ mod mqttc; use crate::config::{CameraConfig, Config, MqttConfig}; use anyhow::{anyhow, Context, Error, Result}; pub(crate) use cmdline::Opt; +pub(crate) use discovery::Discoveries; use event_cam::EventCam; pub(crate) use event_cam::{Direction, Messages}; -pub(crate) use discovery::Discoveries; use log::*; use mqttc::{Mqtt, MqttReplyRef}; From 3467951c5ca38835b373fd841e03a62e0fbb7b50 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 17:36:42 +0700 Subject: [PATCH 13/23] Handle message that don't reply on success --- .../core/src/bc_protocol/floodlight_status.rs | 25 +++++++++----- crates/core/src/bc_protocol/ledstate.rs | 27 +++++++++------ crates/core/src/bc_protocol/pirstate.rs | 33 +++++++++++-------- 3 files changed, 53 insertions(+), 32 deletions(-) diff --git a/crates/core/src/bc_protocol/floodlight_status.rs b/crates/core/src/bc_protocol/floodlight_status.rs index d682a40a..27f87ef3 100644 --- a/crates/core/src/bc_protocol/floodlight_status.rs +++ b/crates/core/src/bc_protocol/floodlight_status.rs @@ -77,18 +77,25 @@ impl BcCamera { }; sub_set.send(get).await?; - let msg = sub_set.recv().await?; - if let BcMeta { - response_code: 200, .. - } = msg.meta + if let Ok(reply) = + tokio::time::timeout(tokio::time::Duration::from_micros(500), sub_set.recv()).await { - Ok(()) + let msg = reply?; + if let BcMeta { + response_code: 200, .. + } = msg.meta + { + Ok(()) + } else { + Err(Error::UnintelligibleReply { + reply: std::sync::Arc::new(Box::new(msg)), + why: "The camera did not accept the Floodlight manual state", + }) + } } else { - Err(Error::UnintelligibleReply { - reply: std::sync::Arc::new(Box::new(msg)), - why: "The camera did not accept the Floodlight manual state", - }) + // Some cameras seem to just not send a reply on success, so after 500ms we return Ok + Ok(()) } } } diff --git a/crates/core/src/bc_protocol/ledstate.rs b/crates/core/src/bc_protocol/ledstate.rs index 472a8eaa..7acca2e5 100644 --- a/crates/core/src/bc_protocol/ledstate.rs +++ b/crates/core/src/bc_protocol/ledstate.rs @@ -83,18 +83,25 @@ impl BcCamera { }; sub_set.send(get).await?; - let msg = sub_set.recv().await?; - - if let BcMeta { - response_code: 200, .. - } = msg.meta + if let Ok(reply) = + tokio::time::timeout(tokio::time::Duration::from_micros(500), sub_set.recv()).await { - Ok(()) + let msg = reply?; + + if let BcMeta { + response_code: 200, .. + } = msg.meta + { + Ok(()) + } else { + Err(Error::UnintelligibleReply { + reply: std::sync::Arc::new(Box::new(msg)), + why: "The camera did not except the LEDState xml", + }) + } } else { - Err(Error::UnintelligibleReply { - reply: std::sync::Arc::new(Box::new(msg)), - why: "The camera did not except the LEDState xml", - }) + // Some cameras seem to just not send a reply on success, so after 500ms we return Ok + Ok(()) } } diff --git a/crates/core/src/bc_protocol/pirstate.rs b/crates/core/src/bc_protocol/pirstate.rs index aa340e86..bbd223b6 100644 --- a/crates/core/src/bc_protocol/pirstate.rs +++ b/crates/core/src/bc_protocol/pirstate.rs @@ -94,21 +94,28 @@ impl BcCamera { }; sub_set.send(get).await?; - let msg = sub_set.recv().await?; - if msg.meta.response_code != 200 { - return Err(Error::CameraServiceUnavaliable); - } - - if let BcMeta { - response_code: 200, .. - } = msg.meta + if let Ok(reply) = + tokio::time::timeout(tokio::time::Duration::from_micros(500), sub_set.recv()).await { - Ok(()) + let msg = reply?; + if msg.meta.response_code != 200 { + return Err(Error::CameraServiceUnavaliable); + } + + if let BcMeta { + response_code: 200, .. + } = msg.meta + { + Ok(()) + } else { + Err(Error::UnintelligibleReply { + reply: std::sync::Arc::new(Box::new(msg)), + why: "The camera did not except the RfAlarmCfg xml", + }) + } } else { - Err(Error::UnintelligibleReply { - reply: std::sync::Arc::new(Box::new(msg)), - why: "The camera did not except the RfAlarmCfg xml", - }) + // Some cameras seem to just not send a reply on success, so after 500ms we return Ok + Ok(()) } } From c30725bc091c1dd951b9f79b16cb6fcdc57ad2d3 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 17:56:22 +0700 Subject: [PATCH 14/23] Update readme --- README.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 2d2dcb7f..13790d2c 100644 --- a/README.md +++ b/README.md @@ -154,6 +154,8 @@ Cellular cameras should select `"cellular"` which only enables `map` and discovery = "cellular" ``` +See the sample config file for more details. + ### MQTT To use mqtt you will to adjust your config file as such: @@ -222,7 +224,16 @@ Currently, discovery is opt-in and camera features must be manually specified. features = ["floodlight"] ``` -See the sample config file for more details. +Avaliable features are: + +- `floodlight`: This adds a light control to home assistant +- `camera`: This adds a camera preview to home assistant. It is only updated every 0.5s and cannot be much more than that since it is updated over mqtt not over RTSP +- `led`: This adds a switch to chage the LED status light on/off to home assistant +- `ir`: This adds a selection switch to chage the IR light on/off/auto to home assistant +- `motion`: This adds a motion detection binary sensor to home assistant +- `reboot`: This adds a reboot button to home assistant +- `pt`: This adds a selection of buttons to control the pan and tilt of the camera + ### Pause From 56476bc1624e13c2d4320ea597f8aed349da2dac Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 18:43:39 +0700 Subject: [PATCH 15/23] Add simple battery level --- src/mqtt/event_cam.rs | 34 ++++++++++++++++++++++++++++++++++ src/mqtt/mod.rs | 8 ++++++++ 2 files changed, 42 insertions(+) diff --git a/src/mqtt/event_cam.rs b/src/mqtt/event_cam.rs index 70e57366..2d4fbb0f 100644 --- a/src/mqtt/event_cam.rs +++ b/src/mqtt/event_cam.rs @@ -33,6 +33,7 @@ pub(crate) enum Messages { Ptz(Direction), Preset(i8), Snap(Vec), + BatteryLevel(u32), } #[derive(Debug, Copy, Clone)] @@ -215,6 +216,11 @@ impl EventCamThread { tx: self.tx.clone(), camera: arc_cam.clone(), }; + + let mut battery_thread = BatteryThread { + tx: self.tx.clone(), + camera: arc_cam.clone(), + }; let mut keepalive_thread = KeepaliveThread { camera: arc_cam.clone(), @@ -274,6 +280,18 @@ impl EventCamThread { Ok(()) } }, + val = async { + info!("{}: Updating Battery Level", camera_config.name); + battery_thread.run().await + } => { + if let Err(e) = val { + error!("Battery thread aborted: {:?}", e); + Err(e) + } else { + debug!("Normal finish on Battery thread"); + Ok(()) + } + }, val = async { info!("{}: Setting up camera actions", camera_config.name); message_handler.listen().await @@ -363,6 +381,22 @@ impl SnapThread { } } +struct BatteryLevelThread { + tx: Sender, + camera: Arc, +} + +impl BatteryLevelThread { + async fn run(&mut self) -> Result<()> { + let mut inter = interval(Duration::from_millis(500)); + loop { + inter.tick().await; + let battery = self.camera.battery_info().await?; + self.tx.send(Messages::BatteryLevel(battery.battery_percent)).await?; + } + } +} + struct KeepaliveThread { camera: Arc, } diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 1c5a4af7..4c48b289 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -223,6 +223,14 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi format!("Failed to publish preview over MQTT for {}", camera_name) })?; } + Messages::BatteryLevel(data) => { + mqtt_sender_cam + .send_message("status/battery_level", format!("{}", data).as_str(), true) + .await + .with_context(|| { + format!("Failed to publish battery level over MQTT for {}", camera_name) + })?; + } _ => {} } } From a449666a8ca8474c5ac015db1a495c98493d587b Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 20:57:29 +0700 Subject: [PATCH 16/23] Add battery sensor to discovery --- src/mqtt/discovery.rs | 54 +++++++++++++++++++++++++++++++++++++++++++ src/mqtt/event_cam.rs | 44 +++++++++++++++++++++++++++-------- 2 files changed, 89 insertions(+), 9 deletions(-) diff --git a/src/mqtt/discovery.rs b/src/mqtt/discovery.rs index 9cad0711..171b0ea8 100644 --- a/src/mqtt/discovery.rs +++ b/src/mqtt/discovery.rs @@ -27,6 +27,8 @@ pub(crate) enum Discoveries { Reboot, #[serde(alias = "pt")] Pt, + #[serde(alias = "battery", alias = "power")] + Battery, } #[derive(Debug, Clone)] @@ -181,6 +183,20 @@ struct DiscoveryButton { payload_press: Option, } +#[derive(Serialize, Debug)] +struct DiscoverySensor { + name: String, + unique_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + icon: Option, + device: DiscoveryDevice, + availability: DiscoveryAvaliablity, + // Button specific + state_topic: String, + state_class: String, + unit_of_measurement: String, +} + /// Enables MQTT discovery for a camera. See docs at https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery pub(crate) async fn enable_discovery( discovery_config: &MqttDiscoveryConfig, @@ -503,6 +519,44 @@ pub(crate) async fn enable_discovery( })?; } } + Discoveries::Battery => { + let config_data = DiscoverySensor { + // Common across all potential features + device: device.clone(), + availability: availability.clone(), + + // Identifiers + name: format!("{} Battery", friendly_name.as_str()), + unique_id: format!("neolink_{}_battery", cam_config.name), + icon: Some("mdi:battery".to_string()), + + // Camera specific + state_topic: format!("neolink/{}/status/battery_level", cam_config.name), + state_class: "measurement".to_string(), + unit_of_measurement: "%".to_string(), + }; + + // Each feature needs to be individually registered + mqtt_sender + .send_message_with_root_topic( + &format!( + "{}/sensor/{}", + discovery_config.topic, &config_data.unique_id + ), + "config", + &serde_json::to_string(&config_data).with_context(|| { + "Cound not serialise discovery battery config into json" + })?, + true, + ) + .await + .with_context(|| { + format!( + "Failed to publish battery auto-discover data on over MQTT for {}", + cam_config.name + ) + })?; + } } } diff --git a/src/mqtt/event_cam.rs b/src/mqtt/event_cam.rs index 2d4fbb0f..47ac6014 100644 --- a/src/mqtt/event_cam.rs +++ b/src/mqtt/event_cam.rs @@ -216,8 +216,8 @@ impl EventCamThread { tx: self.tx.clone(), camera: arc_cam.clone(), }; - - let mut battery_thread = BatteryThread { + + let mut battery_thread = BatteryLevelThread { tx: self.tx.clone(), camera: arc_cam.clone(), }; @@ -372,10 +372,22 @@ struct SnapThread { impl SnapThread { async fn run(&mut self) -> Result<()> { - let mut inter = interval(Duration::from_millis(500)); + let mut tries = 0; + let base_duration = Duration::from_millis(500); loop { - inter.tick().await; - let snapshot = self.camera.get_snapshot().await?; + tokio::time::sleep(base_duration.saturating_mul(tries)).await; + let snapshot = match self.camera.get_snapshot().await { + Ok(info) => { + tries = 1; + info + } + Err(neolink_core::Error::UnintelligibleReply { .. }) => { + // Try again later + tries += 1; + continue; + } + Err(e) => return Err(e.into()), + }; self.tx.send(Messages::Snap(snapshot)).await?; } } @@ -388,11 +400,25 @@ struct BatteryLevelThread { impl BatteryLevelThread { async fn run(&mut self) -> Result<()> { - let mut inter = interval(Duration::from_millis(500)); + let mut tries = 0; + let base_duration = Duration::from_millis(500); loop { - inter.tick().await; - let battery = self.camera.battery_info().await?; - self.tx.send(Messages::BatteryLevel(battery.battery_percent)).await?; + tokio::time::sleep(base_duration.saturating_mul(tries)).await; + let battery = match self.camera.battery_info().await { + Ok(info) => { + tries = 1; + info + } + Err(neolink_core::Error::UnintelligibleReply { .. }) => { + // Try again later + tries += 1; + continue; + } + Err(e) => return Err(e.into()), + }; + self.tx + .send(Messages::BatteryLevel(battery.battery_percent)) + .await?; } } } From 83dc6cc90ba644917e36720c1eac69ad6f0c8cc6 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Wed, 7 Jun 2023 20:58:46 +0700 Subject: [PATCH 17/23] Add battery sensor to discovery README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 13790d2c..8dcf9492 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,7 @@ Status Messages: - `/status disconnected` Sent when the camera goes offline - `/status/battery` Sent in reply to a `/query/battery` an XML encoded version of the battery status +- `/status/battery_level` A simple % value of current battery level - `/status/pir` Sent in reply to a `/query/pir` an XML encoded version of the pir status - `/status/motion` Contains the motion detection alarm status. `on` for motion and `off` for still @@ -233,6 +234,7 @@ Avaliable features are: - `motion`: This adds a motion detection binary sensor to home assistant - `reboot`: This adds a reboot button to home assistant - `pt`: This adds a selection of buttons to control the pan and tilt of the camera +- `battery`: This adds a battery level sensor to home assistant ### Pause From c7bd5650496ef83e1873afaeb61cc670cb1618c1 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Thu, 8 Jun 2023 10:13:42 +0700 Subject: [PATCH 18/23] Handle off byte endian and handle ServieUnavalible in mqtt --- crates/core/src/bc/de.rs | 2 +- crates/core/src/bc/model.rs | 6 +++++- src/mqtt/event_cam.rs | 9 ++++++++- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/crates/core/src/bc/de.rs b/crates/core/src/bc/de.rs index 2029f6af..eb22a3ef 100644 --- a/crates/core/src/bc/de.rs +++ b/crates/core/src/bc/de.rs @@ -198,7 +198,7 @@ fn bc_modern_msg<'a>( fn bc_header(buf: &[u8]) -> IResult<&[u8], BcHeader> { let (buf, _magic) = - error_context("Magic invalid", verify(le_u32, |x| *x == MAGIC_HEADER))(buf)?; + error_context("Magic invalid", verify(le_u32, |x| *x == MAGIC_HEADER || *x == MAGIC_HEADER_REV))(buf)?; let (buf, msg_id) = error_context("MsgID missing", le_u32)(buf)?; let (buf, body_len) = error_context("BodyLen missing", le_u32)(buf)?; let (buf, channel_id) = error_context("ChannelID missing", le_u8)(buf)?; diff --git a/crates/core/src/bc/model.rs b/crates/core/src/bc/model.rs index f7d69969..54b5cb6c 100644 --- a/crates/core/src/bc/model.rs +++ b/crates/core/src/bc/model.rs @@ -3,7 +3,11 @@ use crate::Credentials; pub use super::xml::{BcPayloads, BcXml, Extension}; use std::collections::HashSet; -pub(super) const MAGIC_HEADER: u32 = 0xabcdef0; +pub(super) const MAGIC_HEADER: u32 = 0x0abcdef0; +/// Sometimes will get the BE magic header even though all other numbers are LE? +/// Seems to happens with certain messages like snap that produce jpegs, so perhaps it +/// it is meant to be a hint as to the endianess of the binary payload +pub(super) const MAGIC_HEADER_REV: u32 = 0x0fedcba0; /// Login messages have this ID pub const MSG_ID_LOGIN: u32 = 1; diff --git a/src/mqtt/event_cam.rs b/src/mqtt/event_cam.rs index 47ac6014..7e34d3a0 100644 --- a/src/mqtt/event_cam.rs +++ b/src/mqtt/event_cam.rs @@ -381,11 +381,15 @@ impl SnapThread { tries = 1; info } - Err(neolink_core::Error::UnintelligibleReply { .. }) => { + Err(neolink_core::Error::UnintelligibleReply { reply, why }) => { + log::debug!("Reply: {:?}, why: {:?}", reply, why); // Try again later tries += 1; continue; } + Err(neolink_core::Error::CameraServiceUnavaliable) => { + futures::future::pending().await + } Err(e) => return Err(e.into()), }; self.tx.send(Messages::Snap(snapshot)).await?; @@ -414,6 +418,9 @@ impl BatteryLevelThread { tries += 1; continue; } + Err(neolink_core::Error::CameraServiceUnavaliable) => { + futures::future::pending().await + } Err(e) => return Err(e.into()), }; self.tx From f7b5e89ba8c521ad41618b5241dde9425f79a9d7 Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Thu, 8 Jun 2023 12:52:53 +0700 Subject: [PATCH 19/23] Handle messages and IDs to better replicate offical messaging system --- crates/core/src/bc/de.rs | 6 +- crates/core/src/bc_protocol/abilityinfo.rs | 2 +- crates/core/src/bc_protocol/battery.rs | 2 +- .../core/src/bc_protocol/connection/bcconn.rs | 156 +++++++++++------- .../core/src/bc_protocol/connection/bcsub.rs | 17 +- .../src/bc_protocol/connection/discovery.rs | 2 +- crates/core/src/bc_protocol/errors.rs | 4 +- .../core/src/bc_protocol/floodlight_status.rs | 4 +- crates/core/src/bc_protocol/ledstate.rs | 4 +- crates/core/src/bc_protocol/login.rs | 2 +- crates/core/src/bc_protocol/logout.rs | 2 +- crates/core/src/bc_protocol/motion.rs | 6 +- crates/core/src/bc_protocol/ping.rs | 2 +- crates/core/src/bc_protocol/pirstate.rs | 6 +- crates/core/src/bc_protocol/ptz.rs | 8 +- crates/core/src/bc_protocol/reboot.rs | 2 +- crates/core/src/bc_protocol/snap.rs | 95 +++++++++-- crates/core/src/bc_protocol/stream.rs | 9 +- crates/core/src/bc_protocol/talk.rs | 12 +- crates/core/src/bc_protocol/time.rs | 4 +- crates/core/src/bc_protocol/version.rs | 2 +- src/mqtt/event_cam.rs | 4 +- src/mqtt/mod.rs | 5 +- 23 files changed, 243 insertions(+), 113 deletions(-) diff --git a/crates/core/src/bc/de.rs b/crates/core/src/bc/de.rs index eb22a3ef..4bf895be 100644 --- a/crates/core/src/bc/de.rs +++ b/crates/core/src/bc/de.rs @@ -197,8 +197,10 @@ fn bc_modern_msg<'a>( } fn bc_header(buf: &[u8]) -> IResult<&[u8], BcHeader> { - let (buf, _magic) = - error_context("Magic invalid", verify(le_u32, |x| *x == MAGIC_HEADER || *x == MAGIC_HEADER_REV))(buf)?; + let (buf, _magic) = error_context( + "Magic invalid", + verify(le_u32, |x| *x == MAGIC_HEADER || *x == MAGIC_HEADER_REV), + )(buf)?; let (buf, msg_id) = error_context("MsgID missing", le_u32)(buf)?; let (buf, body_len) = error_context("BodyLen missing", le_u32)(buf)?; let (buf, channel_id) = error_context("ChannelID missing", le_u8)(buf)?; diff --git a/crates/core/src/bc_protocol/abilityinfo.rs b/crates/core/src/bc_protocol/abilityinfo.rs index 6a7b99c4..b1d21235 100644 --- a/crates/core/src/bc_protocol/abilityinfo.rs +++ b/crates/core/src/bc_protocol/abilityinfo.rs @@ -7,7 +7,7 @@ impl BcCamera { pub async fn get_abilityinfo(&self) -> Result { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_get = connection.subscribe(msg_num).await?; + let mut sub_get = connection.subscribe(MSG_ID_ABILITY_INFO, msg_num).await?; let get = Bc { meta: BcMeta { msg_id: MSG_ID_ABILITY_INFO, diff --git a/crates/core/src/bc_protocol/battery.rs b/crates/core/src/bc_protocol/battery.rs index ec4a91de..dbcc9ea7 100644 --- a/crates/core/src/bc_protocol/battery.rs +++ b/crates/core/src/bc_protocol/battery.rs @@ -83,7 +83,7 @@ impl BcCamera { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub = connection.subscribe(msg_num).await?; + let mut sub = connection.subscribe(MSG_ID_BATTERY_INFO, msg_num).await?; let msg = Bc { meta: BcMeta { diff --git a/crates/core/src/bc_protocol/connection/bcconn.rs b/crates/core/src/bc_protocol/connection/bcconn.rs index bae0408b..2746cb61 100644 --- a/crates/core/src/bc_protocol/connection/bcconn.rs +++ b/crates/core/src/bc_protocol/connection/bcconn.rs @@ -18,8 +18,8 @@ type MsgHandler = dyn 'static + Send + Sync + for<'a> Fn(&'a Bc) -> BoxFuture<'a #[derive(Default)] struct Subscriber { - /// Subscribers based on their Num - num: BTreeMap>>, + /// Subscribers based on their ID and their num + num: BTreeMap, Sender>>>, /// Subscribers based on their ID id: BTreeMap>, } @@ -85,12 +85,12 @@ impl BcConnection { Ok(()) } - pub async fn subscribe(&self, msg_num: u16) -> Result { + pub async fn subscribe(&self, msg_id: u32, msg_num: u16) -> Result { let (tx, rx) = channel(100); self.poll_commander - .send(PollCommand::AddSubscriber(msg_num, tx)) + .send(PollCommand::AddSubscriber(msg_id, Some(msg_num), tx)) .await?; - Ok(BcSubscription::new(rx, msg_num as u32, self)) + Ok(BcSubscription::new(rx, Some(msg_num as u32), self)) } /// Some messages are initiated by the camera. This creates a handler for them @@ -115,6 +115,21 @@ impl BcConnection { Ok(()) } + /// Some times we want to wait for a reply on a new message ID + /// to do this we wait for the next packet with a certain ID + /// grab it's message ID and then subscribe to that ID + /// + /// The command Snap that grabs a jpeg payload is an example of this + /// + /// This function creates a temporary handle to grab this single message + pub async fn subscribe_to_id(&self, msg_id: u32) -> Result { + let (tx, rx) = channel(100); + self.poll_commander + .send(PollCommand::AddSubscriber(msg_id, None, tx)) + .await?; + Ok(BcSubscription::new(rx, None, self)) + } + pub(crate) async fn join(&self) -> Result<()> { let mut locked_threads = self.rx_thread.write().await; while let Some(res) = locked_threads.join_next().await { @@ -138,7 +153,7 @@ enum PollCommand { Bc(Box>), AddHandler(u32, Arc), RemoveHandler(u32), - AddSubscriber(u16, Sender>), + AddSubscriber(u32, Option, Sender>), } struct Poller { @@ -152,66 +167,79 @@ impl Poller { while let Some(command) = self.reciever.next().await { yield_now().await; match command { - PollCommand::Bc(boxed_response) => match *boxed_response { - Ok(response) => { - let msg_num = response.meta.msg_num; - let msg_id = response.meta.msg_id; - - let mut remove_it_num = false; - match ( - self.subscribers.id.get(&msg_id), - self.subscribers.num.get(&msg_num), - ) { - (Some(occ), _) => { - if let Some(reply) = occ(&response).await { - assert!(reply.meta.msg_num == response.meta.msg_num); - self.sink.send(Ok(reply)).await?; + PollCommand::Bc(boxed_response) => { + match *boxed_response { + Ok(response) => { + let msg_num = response.meta.msg_num; + let msg_id = response.meta.msg_id; + + match ( + self.subscribers.id.get(&msg_id), + self.subscribers.num.get_mut(&msg_id), + ) { + (Some(occ), _) => { + if let Some(reply) = occ(&response).await { + assert!(reply.meta.msg_num == response.meta.msg_num); + self.sink.send(Ok(reply)).await?; + } } - } - (None, Some(occ)) => { - if occ.capacity() == 0 { - warn!("Reaching limit of channel"); - warn!( - "Remaining: {} of {} message space for {} (ID: {})", - occ.capacity(), - occ.max_capacity(), - &msg_num, - &msg_id - ); - } else { - trace!( - "Remaining: {} of {} message space for {} (ID: {})", - occ.capacity(), - occ.max_capacity(), - &msg_num, - &msg_id - ); + (None, Some(occ)) => { + let sender = + if let Some(sender) = occ.get(&Some(msg_num)).cloned() { + Some(sender) + } else if let Some(sender) = occ.get(&None).cloned() { + // Upgrade a None to a known MsgID + occ.remove(&None); + occ.insert(Some(msg_num), sender.clone()); + Some(sender) + } else { + None + }; + if let Some(sender) = sender { + if sender.capacity() == 0 { + warn!("Reaching limit of channel"); + warn!( + "Remaining: {} of {} message space for {} (ID: {})", + sender.capacity(), + sender.max_capacity(), + &msg_num, + &msg_id + ); + } else { + trace!( + "Remaining: {} of {} message space for {} (ID: {})", + sender.capacity(), + sender.max_capacity(), + &msg_num, + &msg_id + ); + } + if sender.send(Ok(response)).await.is_err() { + occ.remove(&Some(msg_num)); + } + } } - if occ.send(Ok(response)).await.is_err() { - remove_it_num = true; + (None, None) => { + debug!( + "Ignoring uninteresting message id {} (number: {})", + msg_id, msg_num + ); + trace!("Contents: {:?}", response); } } - (None, None) => { - debug!( - "Ignoring uninteresting message id {} (number: {})", - msg_id, msg_num - ); - trace!("Contents: {:?}", response); - } - } - if remove_it_num { - self.subscribers.num.remove(&msg_num); } - } - Err(e) => { - for sub in self.subscribers.num.values() { - let _ = sub.send(Err(e.clone())).await; + Err(e) => { + for sub in self.subscribers.num.values() { + for sender in sub.values() { + let _ = sender.send(Err(e.clone())).await; + } + } + self.subscribers.num.clear(); + self.subscribers.id.clear(); + return Err(e); } - self.subscribers.num.clear(); - self.subscribers.id.clear(); - return Err(e); } - }, + } PollCommand::AddHandler(msg_id, handler) => { match self.subscribers.id.entry(msg_id) { Entry::Vacant(vac_entry) => { @@ -225,8 +253,14 @@ impl Poller { PollCommand::RemoveHandler(msg_id) => { self.subscribers.id.remove(&msg_id); } - PollCommand::AddSubscriber(msg_num, tx) => { - match self.subscribers.num.entry(msg_num) { + PollCommand::AddSubscriber(msg_id, msg_num, tx) => { + match self + .subscribers + .num + .entry(msg_id) + .or_default() + .entry(msg_num) + { Entry::Vacant(vac_entry) => { vac_entry.insert(tx); } diff --git a/crates/core/src/bc_protocol/connection/bcsub.rs b/crates/core/src/bc_protocol/connection/bcsub.rs index 02c89aca..0ec83d7b 100644 --- a/crates/core/src/bc_protocol/connection/bcsub.rs +++ b/crates/core/src/bc_protocol/connection/bcsub.rs @@ -12,7 +12,7 @@ use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt}; pub struct BcSubscription<'a> { rx: ReceiverStream>, - msg_num: u32, + msg_num: Option, conn: &'a BcConnection, } @@ -89,7 +89,7 @@ pub type BcMediaStream<'b> = FramedRead impl<'a> BcSubscription<'a> { pub fn new( rx: Receiver>, - msg_num: u32, + msg_num: Option, conn: &'a BcConnection, ) -> BcSubscription<'a> { BcSubscription { @@ -100,7 +100,11 @@ impl<'a> BcSubscription<'a> { } pub async fn send(&self, bc: Bc) -> Result<()> { - assert!(bc.meta.msg_num as u32 == self.msg_num); + if let Some(msg_num) = self.msg_num { + assert!(bc.meta.msg_num as u32 == msg_num); + } else { + log::debug!("Sending message before msg_num has been aquired"); + } self.conn.send(bc).await?; Ok(()) } @@ -108,7 +112,12 @@ impl<'a> BcSubscription<'a> { pub async fn recv(&mut self) -> Result { let bc = self.rx.next().await.ok_or(Error::DroppedSubscriber)?; if let Ok(bc) = &bc { - assert!(bc.meta.msg_num as u32 == self.msg_num); + if let Some(msg_num) = self.msg_num { + assert!(bc.meta.msg_num as u32 == msg_num); + } else { + // Leaning number now + self.msg_num = Some(bc.meta.msg_num as u32); + } } bc } diff --git a/crates/core/src/bc_protocol/connection/discovery.rs b/crates/core/src/bc_protocol/connection/discovery.rs index 741a7e84..0c6433ce 100644 --- a/crates/core/src/bc_protocol/connection/discovery.rs +++ b/crates/core/src/bc_protocol/connection/discovery.rs @@ -175,7 +175,7 @@ impl Discoverer { Ok(rx) } else { Err(Error::SimultaneousSubscription { - msg_num: (tid as u16), + msg_num: Some(tid as u16), }) } } diff --git a/crates/core/src/bc_protocol/errors.rs b/crates/core/src/bc_protocol/errors.rs index 5346f7f7..fc797343 100644 --- a/crates/core/src/bc_protocol/errors.rs +++ b/crates/core/src/bc_protocol/errors.rs @@ -96,10 +96,10 @@ pub enum Error { GenError(#[error(source)] std::sync::Arc), /// Raised when a connection is subscrbed to more than once for msg_num - #[error(display = "Simultaneous subscription, {}", _0)] + #[error(display = "Simultaneous subscription, {:?}", _0)] SimultaneousSubscription { /// The message number that was subscribed to - msg_num: u16, + msg_num: Option, }, /// Raised when a connection is subscrbed to more than once for msg_id diff --git a/crates/core/src/bc_protocol/floodlight_status.rs b/crates/core/src/bc_protocol/floodlight_status.rs index 27f87ef3..4705df07 100644 --- a/crates/core/src/bc_protocol/floodlight_status.rs +++ b/crates/core/src/bc_protocol/floodlight_status.rs @@ -45,7 +45,9 @@ impl BcCamera { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_set = connection.subscribe(msg_num).await?; + let mut sub_set = connection + .subscribe(MSG_ID_FLOODLIGHT_MANUAL, msg_num) + .await?; let get = Bc { meta: BcMeta { diff --git a/crates/core/src/bc_protocol/ledstate.rs b/crates/core/src/bc_protocol/ledstate.rs index 7acca2e5..f1c33c8f 100644 --- a/crates/core/src/bc_protocol/ledstate.rs +++ b/crates/core/src/bc_protocol/ledstate.rs @@ -7,7 +7,7 @@ impl BcCamera { self.has_ability_ro("ledState").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_get = connection.subscribe(msg_num).await?; + let mut sub_get = connection.subscribe(MSG_ID_GET_LED_STATUS, msg_num).await?; let get = Bc { meta: BcMeta { msg_id: MSG_ID_GET_LED_STATUS, @@ -56,7 +56,7 @@ impl BcCamera { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_set = connection.subscribe(msg_num).await?; + let mut sub_set = connection.subscribe(MSG_ID_SET_LED_STATUS, msg_num).await?; // led_version is a field recieved from the camera but not sent // we set to None to ensure we don't send it to the camera diff --git a/crates/core/src/bc_protocol/login.rs b/crates/core/src/bc_protocol/login.rs index ea9a10aa..96d8e56f 100644 --- a/crates/core/src/bc_protocol/login.rs +++ b/crates/core/src/bc_protocol/login.rs @@ -37,7 +37,7 @@ impl BcCamera { let credentials = self.get_credentials(); let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_login = connection.subscribe(msg_num).await?; + let mut sub_login = connection.subscribe(MSG_ID_LOGIN, msg_num).await?; // Login flow is: Send legacy login message, expect back a modern message with Encryption // details. Then, re-send the login as a modern login message. Expect back a device info diff --git a/crates/core/src/bc_protocol/logout.rs b/crates/core/src/bc_protocol/logout.rs index 2f80f8a7..4b580d34 100644 --- a/crates/core/src/bc_protocol/logout.rs +++ b/crates/core/src/bc_protocol/logout.rs @@ -9,7 +9,7 @@ impl BcCamera { let credentials = self.get_credentials(); let connection = self.get_connection(); let msg_num = self.new_message_num(); - let sub_logout = connection.subscribe(msg_num).await?; + let sub_logout = connection.subscribe(MSG_ID_LOGOUT, msg_num).await?; let username = credentials.username.clone(); let password = credentials.password.as_ref().cloned().unwrap_or_default(); diff --git a/crates/core/src/bc_protocol/motion.rs b/crates/core/src/bc_protocol/motion.rs index a8fae78e..f61f2146 100644 --- a/crates/core/src/bc_protocol/motion.rs +++ b/crates/core/src/bc_protocol/motion.rs @@ -167,7 +167,7 @@ impl BcCamera { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub = connection.subscribe(msg_num).await?; + let mut sub = connection.subscribe(MSG_ID_MOTION_REQUEST, msg_num).await?; let msg = Bc { meta: BcMeta { msg_id: MSG_ID_MOTION_REQUEST, @@ -202,7 +202,7 @@ impl BcCamera { /// This returns a data structure which can be used to /// query motion events pub async fn listen_on_motion(&self) -> Result { - let msg_num = self.start_motion_query().await?; + self.start_motion_query().await?; let connection = self.get_connection(); @@ -213,7 +213,7 @@ impl BcCamera { let mut set = JoinSet::new(); let channel_id = self.channel_id; set.spawn(async move { - let mut sub = connection.subscribe(msg_num).await?; + let mut sub = connection.subscribe_to_id(MSG_ID_MOTION).await?; loop { tokio::task::yield_now().await; diff --git a/crates/core/src/bc_protocol/ping.rs b/crates/core/src/bc_protocol/ping.rs index 93f388c7..64265364 100644 --- a/crates/core/src/bc_protocol/ping.rs +++ b/crates/core/src/bc_protocol/ping.rs @@ -7,7 +7,7 @@ impl BcCamera { pub async fn ping(&self) -> Result<()> { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_ping = connection.subscribe(msg_num).await?; + let mut sub_ping = connection.subscribe(MSG_ID_PING, msg_num).await?; let ping = Bc { meta: BcMeta { diff --git a/crates/core/src/bc_protocol/pirstate.rs b/crates/core/src/bc_protocol/pirstate.rs index bbd223b6..83cd2550 100644 --- a/crates/core/src/bc_protocol/pirstate.rs +++ b/crates/core/src/bc_protocol/pirstate.rs @@ -12,7 +12,7 @@ impl BcCamera { loop { retry_interval.tick().await; let msg_num = self.new_message_num(); - let mut sub_get = connection.subscribe(msg_num).await?; + let mut sub_get = connection.subscribe(MSG_ID_GET_PIR_ALARM, msg_num).await?; let get = Bc { meta: BcMeta { msg_id: MSG_ID_GET_PIR_ALARM, @@ -70,7 +70,9 @@ impl BcCamera { self.has_ability_rw("rfAlarm").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_set = connection.subscribe(msg_num).await?; + let mut sub_set = connection + .subscribe(MSG_ID_START_PIR_ALARM, msg_num) + .await?; let get = Bc { meta: BcMeta { diff --git a/crates/core/src/bc_protocol/ptz.rs b/crates/core/src/bc_protocol/ptz.rs index a90fb6e6..15869342 100644 --- a/crates/core/src/bc_protocol/ptz.rs +++ b/crates/core/src/bc_protocol/ptz.rs @@ -25,7 +25,7 @@ impl BcCamera { self.has_ability_rw("control").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_set = connection.subscribe(msg_num).await?; + let mut sub_set = connection.subscribe(MSG_ID_PTZ_CONTROL, msg_num).await?; let direction_str = match direction { Direction::Up => "up", @@ -88,7 +88,7 @@ impl BcCamera { self.has_ability_rw("control").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_set = connection.subscribe(msg_num).await?; + let mut sub_set = connection.subscribe(MSG_ID_GET_PTZ_PRESET, msg_num).await?; let send = Bc { meta: BcMeta { @@ -136,7 +136,9 @@ impl BcCamera { self.has_ability_rw("control").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_set = connection.subscribe(msg_num).await?; + let mut sub_set = connection + .subscribe(MSG_ID_PTZ_CONTROL_PRESET, msg_num) + .await?; let command = if name.is_some() { "setPos" } else { "toPos" }; let send = Bc { diff --git a/crates/core/src/bc_protocol/reboot.rs b/crates/core/src/bc_protocol/reboot.rs index 4142679d..6e95c0c6 100644 --- a/crates/core/src/bc_protocol/reboot.rs +++ b/crates/core/src/bc_protocol/reboot.rs @@ -7,7 +7,7 @@ impl BcCamera { self.has_ability_rw("reboot").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub = connection.subscribe(msg_num).await?; + let mut sub = connection.subscribe(MSG_ID_REBOOT, msg_num).await?; let msg = Bc { meta: BcMeta { diff --git a/crates/core/src/bc_protocol/snap.rs b/crates/core/src/bc_protocol/snap.rs index 45855c74..2a698f19 100644 --- a/crates/core/src/bc_protocol/snap.rs +++ b/crates/core/src/bc_protocol/snap.rs @@ -1,4 +1,4 @@ -use futures::{StreamExt, TryStreamExt}; +// use futures::{StreamExt, TryStreamExt}; use super::{BcCamera, Error, Result}; use crate::bc::{model::*, xml::*}; @@ -8,7 +8,7 @@ impl BcCamera { pub async fn get_snapshot(&self) -> Result> { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_get = connection.subscribe(msg_num).await?; + let mut sub_get = connection.subscribe(MSG_ID_SNAP, msg_num).await?; let get = Bc { meta: BcMeta { msg_id: MSG_ID_SNAP, @@ -57,17 +57,90 @@ impl BcCamera { .. }) = msg.body { - log::trace!("Got snap {} with size {}", filename, expected_size); + log::debug!("Got snap XML {} with size {}", filename, expected_size); + // Messages are now sent on ID 109 but not with the same message ID + // preumably because the camera considers it to be a new message rather + // than a reply + // + // This means we need to listen for the next 109 grab the message num and + // subscribe to it. This is what `subscribe_to_next` is for + let mut sub_get = connection.subscribe_to_id(MSG_ID_SNAP).await?; let expected_size = expected_size as usize; - let binary_stream = sub_get.payload_stream(); - let result: Vec<_> = binary_stream - .map_ok(|i| tokio_stream::iter(i).map(Result::Ok)) - .try_flatten() - .take(expected_size) - .try_collect() - .await?; - log::trace!("Got whole of the snap: {}", result.len()); + let mut result: Vec<_> = vec![]; + log::debug!("Waiting for packets on {}", msg_num); + let mut msg = sub_get.recv().await?; + + while msg.meta.response_code == 200 { + // sends 200 while more is to come + // 201 when finished + + if let BcBody::ModernMsg(ModernMsg { + extension: + Some(Extension { + binary_data: Some(1), + .. + }), + payload: Some(BcPayloads::Binary(data)), + }) = msg.body + { + result.extend_from_slice(&data); + } else { + return Err(Error::UnintelligibleReply { + reply: std::sync::Arc::new(Box::new(msg)), + why: "Expected binary data but got something else", + }); + } + log::debug!( + "Got packet size is now {} of {}", + result.len(), + expected_size + ); + msg = sub_get.recv().await?; + } + + if msg.meta.response_code == 201 { + // 201 means all binary data sent + if let BcBody::ModernMsg(ModernMsg { + extension: + Some(Extension { + binary_data: Some(1), + .. + }), + payload, + }) = msg.body + { + if let Some(BcPayloads::Binary(data)) = payload { + // Add last data if present (may be zero if preveious packet contained it) + result.extend_from_slice(&data); + } + log::debug!( + "Got all packets size is now {} of {}", + result.len(), + expected_size + ); + if result.len() != expected_size { + log::warn!("Snap did not recieve expected number of byes"); + } + } else { + return Err(Error::UnintelligibleReply { + reply: std::sync::Arc::new(Box::new(msg)), + why: "Expected binary data but got something else", + }); + } + } else { + // anything else is an error + return Err(Error::CameraServiceUnavaliable); + } + + // let binary_stream = sub_get.payload_stream(); + // let result: Vec<_>= binary_stream + // .map_ok(|i| tokio_stream::iter(i).map(Result::Ok)) + // .try_flatten() + // .take(expected_size) + // .try_collect() + // .await?; + log::debug!("Snapshot recieved: {} of {}", result.len(), expected_size); Ok(result) } else { Err(Error::UnintelligibleReply { diff --git a/crates/core/src/bc_protocol/stream.rs b/crates/core/src/bc_protocol/stream.rs index 226c3091..8dd312c5 100644 --- a/crates/core/src/bc_protocol/stream.rs +++ b/crates/core/src/bc_protocol/stream.rs @@ -111,7 +111,7 @@ impl BcCamera { let handle = task::spawn(async move { tokio::task::yield_now().await; - let mut sub_video = connection.subscribe(msg_num).await?; + let mut sub_video = connection.subscribe(MSG_ID_VIDEO, msg_num).await?; // On an E1 and swann cameras: // - mainStream always has a value of 0 @@ -224,13 +224,14 @@ impl BcCamera { }, ); // debug!("Stream: Send Stop"); - sub_video.send(stop_video).await?; + let mut sub_stop = connection.subscribe(MSG_ID_VIDEO_STOP, msg_num).await?; + sub_stop.send(stop_video).await?; // debug!("Stream: Sent Stop"); tokio::select! { v = async { loop { - let msg = sub_video.recv().await?; + let msg = sub_stop.recv().await?; if let BcMeta { response_code: 200, msg_id: MSG_ID_VIDEO_STOP, @@ -268,7 +269,7 @@ impl BcCamera { } let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_video = connection.subscribe(msg_num).await?; + let mut sub_video = connection.subscribe(MSG_ID_VIDEO_STOP, msg_num).await?; // On an E1 and swann cameras: // - mainStream always has a value of 0 diff --git a/crates/core/src/bc_protocol/talk.rs b/crates/core/src/bc_protocol/talk.rs index f4e08c28..df787ab6 100644 --- a/crates/core/src/bc_protocol/talk.rs +++ b/crates/core/src/bc_protocol/talk.rs @@ -17,7 +17,7 @@ impl BcCamera { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub = connection.subscribe(msg_num).await?; + let mut sub = connection.subscribe(MSG_ID_TALKRESET, msg_num).await?; let msg = Bc { meta: BcMeta { @@ -60,7 +60,7 @@ impl BcCamera { pub async fn talk_ability(&self) -> Result { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_get = connection.subscribe(msg_num).await?; + let mut sub_get = connection.subscribe(MSG_ID_TALKABILITY, msg_num).await?; let get = Bc { meta: BcMeta { msg_id: MSG_ID_TALKABILITY, @@ -120,7 +120,7 @@ impl BcCamera { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub = connection.subscribe(msg_num).await?; + let mut sub = connection.subscribe(MSG_ID_TALKCONFIG, msg_num).await?; if &talk_config.audio_config.audio_type != "adpcm" { return Err(Error::UnknownTalkEncoding); @@ -182,7 +182,7 @@ impl BcCamera { let full_block_size = block_size + 4; // Block size + predictor state let msg_num = self.new_message_num(); - let sub = connection.subscribe(msg_num).await?; + let sub = connection.subscribe(MSG_ID_TALK, msg_num).await?; const BLOCK_PER_PAYLOAD: usize = 4; const BLOCK_HEADER_SIZE: usize = 4; @@ -261,7 +261,7 @@ impl BcCamera { let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub = connection.subscribe(msg_num).await?; + let mut sub = connection.subscribe(MSG_ID_TALKCONFIG, msg_num).await?; if &talk_config.audio_config.audio_type != "adpcm" { return Err(Error::UnknownTalkEncoding); @@ -323,7 +323,7 @@ impl BcCamera { let full_block_size = block_size + 4; // Block size + predictor state let msg_num = self.new_message_num(); - let sub = connection.subscribe(msg_num).await?; + let sub = connection.subscribe(MSG_ID_TALK, msg_num).await?; const BLOCK_PER_PAYLOAD: usize = 1; const BLOCK_HEADER_SIZE: usize = 4; diff --git a/crates/core/src/bc_protocol/time.rs b/crates/core/src/bc_protocol/time.rs index 593fd717..91aec3ec 100644 --- a/crates/core/src/bc_protocol/time.rs +++ b/crates/core/src/bc_protocol/time.rs @@ -17,7 +17,7 @@ impl BcCamera { self.has_ability_ro("general").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_get_general = connection.subscribe(msg_num).await?; + let mut sub_get_general = connection.subscribe(MSG_ID_GET_GENERAL, msg_num).await?; let get = Bc { meta: BcMeta { msg_id: MSG_ID_GET_GENERAL, @@ -102,7 +102,7 @@ impl BcCamera { self.has_ability_rw("general").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_set_general = connection.subscribe(msg_num).await?; + let mut sub_set_general = connection.subscribe(MSG_ID_SET_GENERAL, msg_num).await?; let set = Bc::new_from_xml( BcMeta { msg_id: MSG_ID_SET_GENERAL, diff --git a/crates/core/src/bc_protocol/version.rs b/crates/core/src/bc_protocol/version.rs index b562f5a6..df7779b6 100644 --- a/crates/core/src/bc_protocol/version.rs +++ b/crates/core/src/bc_protocol/version.rs @@ -7,7 +7,7 @@ impl BcCamera { self.has_ability_ro("version").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_version = connection.subscribe(msg_num).await?; + let mut sub_version = connection.subscribe(MSG_ID_VERSION, msg_num).await?; let version = Bc { meta: BcMeta { diff --git a/src/mqtt/event_cam.rs b/src/mqtt/event_cam.rs index 7e34d3a0..98a195ea 100644 --- a/src/mqtt/event_cam.rs +++ b/src/mqtt/event_cam.rs @@ -388,6 +388,7 @@ impl SnapThread { continue; } Err(neolink_core::Error::CameraServiceUnavaliable) => { + log::debug!("Snap not supported"); futures::future::pending().await } Err(e) => return Err(e.into()), @@ -405,7 +406,7 @@ struct BatteryLevelThread { impl BatteryLevelThread { async fn run(&mut self) -> Result<()> { let mut tries = 0; - let base_duration = Duration::from_millis(500); + let base_duration = Duration::from_secs(15); loop { tokio::time::sleep(base_duration.saturating_mul(tries)).await; let battery = match self.camera.battery_info().await { @@ -419,6 +420,7 @@ impl BatteryLevelThread { continue; } Err(neolink_core::Error::CameraServiceUnavaliable) => { + log::debug!("Battery not supported"); futures::future::pending().await } Err(e) => return Err(e.into()), diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index 4c48b289..3d180635 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -228,7 +228,10 @@ async fn listen_on_camera(cam_config: Arc, mqtt_config: &MqttConfi .send_message("status/battery_level", format!("{}", data).as_str(), true) .await .with_context(|| { - format!("Failed to publish battery level over MQTT for {}", camera_name) + format!( + "Failed to publish battery level over MQTT for {}", + camera_name + ) })?; } _ => {} From fbbfddaa371e7953bfca02bb1804809f1353542f Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Thu, 8 Jun 2023 13:22:54 +0700 Subject: [PATCH 20/23] Handle closing and upgrade message numbers and IDs more cleanly --- .../core/src/bc_protocol/connection/bcconn.rs | 39 +++++++++++++------ crates/core/src/bc_protocol/snap.rs | 10 ++--- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/crates/core/src/bc_protocol/connection/bcconn.rs b/crates/core/src/bc_protocol/connection/bcconn.rs index 2746cb61..5e0f71ae 100644 --- a/crates/core/src/bc_protocol/connection/bcconn.rs +++ b/crates/core/src/bc_protocol/connection/bcconn.rs @@ -184,17 +184,28 @@ impl Poller { } } (None, Some(occ)) => { - let sender = - if let Some(sender) = occ.get(&Some(msg_num)).cloned() { - Some(sender) - } else if let Some(sender) = occ.get(&None).cloned() { - // Upgrade a None to a known MsgID - occ.remove(&None); - occ.insert(Some(msg_num), sender.clone()); - Some(sender) - } else { - None - }; + let sender = if let Some(sender) = + occ.get(&Some(msg_num)).filter(|a| !a.is_closed()).cloned() + { + // Connection with id exists and is not closed + Some(sender) + } else if let Some(sender) = occ.get(&None).cloned() { + // Upgrade a None to a known MsgID + occ.remove(&None); + occ.insert(Some(msg_num), sender.clone()); + Some(sender) + } else if occ + .get(&Some(msg_num)) + .map(|a| a.is_closed()) + .unwrap_or(false) + { + // Connection is closed and there is no None to replace it + // Remove it for cleanup and report no sender + occ.remove(&Some(msg_num)); + None + } else { + None + }; if let Some(sender) = sender { if sender.capacity() == 0 { warn!("Reaching limit of channel"); @@ -217,6 +228,12 @@ impl Poller { if sender.send(Ok(response)).await.is_err() { occ.remove(&Some(msg_num)); } + } else { + debug!( + "Ignoring uninteresting message id {} (number: {})", + msg_id, msg_num + ); + trace!("Contents: {:?}", response); } } (None, None) => { diff --git a/crates/core/src/bc_protocol/snap.rs b/crates/core/src/bc_protocol/snap.rs index 2a698f19..a1d23e16 100644 --- a/crates/core/src/bc_protocol/snap.rs +++ b/crates/core/src/bc_protocol/snap.rs @@ -57,7 +57,7 @@ impl BcCamera { .. }) = msg.body { - log::debug!("Got snap XML {} with size {}", filename, expected_size); + log::trace!("Got snap XML {} with size {}", filename, expected_size); // Messages are now sent on ID 109 but not with the same message ID // preumably because the camera considers it to be a new message rather // than a reply @@ -68,7 +68,7 @@ impl BcCamera { let expected_size = expected_size as usize; let mut result: Vec<_> = vec![]; - log::debug!("Waiting for packets on {}", msg_num); + log::trace!("Waiting for packets on {}", msg_num); let mut msg = sub_get.recv().await?; while msg.meta.response_code == 200 { @@ -91,7 +91,7 @@ impl BcCamera { why: "Expected binary data but got something else", }); } - log::debug!( + log::trace!( "Got packet size is now {} of {}", result.len(), expected_size @@ -114,7 +114,7 @@ impl BcCamera { // Add last data if present (may be zero if preveious packet contained it) result.extend_from_slice(&data); } - log::debug!( + log::trace!( "Got all packets size is now {} of {}", result.len(), expected_size @@ -140,7 +140,7 @@ impl BcCamera { // .take(expected_size) // .try_collect() // .await?; - log::debug!("Snapshot recieved: {} of {}", result.len(), expected_size); + log::trace!("Snapshot recieved: {} of {}", result.len(), expected_size); Ok(result) } else { Err(Error::UnintelligibleReply { From 4f0169ef8f9f717f5c9046b5ddfde8b22c94cede Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Thu, 8 Jun 2023 13:40:58 +0700 Subject: [PATCH 21/23] fmt --- crates/core/src/bc_protocol/ptz.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/core/src/bc_protocol/ptz.rs b/crates/core/src/bc_protocol/ptz.rs index 62a6bfd1..ccbcf930 100644 --- a/crates/core/src/bc_protocol/ptz.rs +++ b/crates/core/src/bc_protocol/ptz.rs @@ -193,7 +193,9 @@ impl BcCamera { self.has_ability_rw("control").await?; let connection = self.get_connection(); let msg_num = self.new_message_num(); - let mut sub_set = connection.subscribe(MSG_ID_PTZ_CONTROL_PRESET, msg_num).await?; + let mut sub_set = connection + .subscribe(MSG_ID_PTZ_CONTROL_PRESET, msg_num) + .await?; let preset = Preset { id: preset_id, From c6505952d5a6e64e86c7ac32b1a6edc6db66c80c Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Thu, 8 Jun 2023 16:42:20 +0700 Subject: [PATCH 22/23] Add config options for mqtt threads --- README.md | 23 +++++++++++++++++++++++ src/config.rs | 15 +++++++++++++++ src/mqtt/event_cam.rs | 10 +++++----- 3 files changed, 43 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c3ea4de8..89ab8a82 100644 --- a/README.md +++ b/README.md @@ -209,6 +209,7 @@ Status Messages: - `/status/motion` Contains the motion detection alarm status. `on` for motion and `off` for still - `/status/ptz/preset` Sent in reply to a `/query/ptz/preset` an XML encoded version of the PTZ presets +- `/status/preview` a base64 encoded camera image updated every 0.5s Query Messages: @@ -216,6 +217,28 @@ Query Messages: - `/query/pir` Request that the camera reports its pir status - `/query/ptz/preset` Request that the camera reports its PTZ presets +### MQTT Disable Features + +Certain features like preview and motion detection may not be desired +you can disable them by them with the following config options. +Disabling these may help to conserve battery + +```toml +enable_motion = false # motion detection + # (limited battery drain since it + # is a passive listening connection) + +enable_pings = false # keep alive pings that keep the camera connected + +enable_light = false # flood lights only avaliable on some camera + # (limited battery drain since it + # is a passive listening connection) + +enable_battery = false # battery updates in `/status/battery_level` + +enable_preview = false # preview image in `/status/preview` +``` + #### MQTT Discovery [MQTT Discovery](https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery) is partially supported. diff --git a/src/config.rs b/src/config.rs index 87de87a4..2b0bf128 100644 --- a/src/config.rs +++ b/src/config.rs @@ -148,6 +148,17 @@ pub(crate) struct MqttConfig { #[serde(default)] pub(crate) client_auth: Option<(std::path::PathBuf, std::path::PathBuf)>, + + #[serde(default = "default_true")] + pub(crate) enable_motion: bool, + #[serde(default = "default_true")] + pub(crate) enable_pings: bool, + #[serde(default = "default_true")] + pub(crate) enable_light: bool, + #[serde(default = "default_true")] + pub(crate) enable_battery: bool, + #[serde(default = "default_true")] + pub(crate) enable_preview: bool, #[serde(default)] pub(crate) discovery: Option, @@ -170,6 +181,10 @@ fn validate_mqtt_config(config: &MqttConfig) -> Result<(), ValidationError> { } } +const fn default_true() -> bool { + true +} + fn default_mqtt() -> Option { None } diff --git a/src/mqtt/event_cam.rs b/src/mqtt/event_cam.rs index 15c28adc..197db838 100644 --- a/src/mqtt/event_cam.rs +++ b/src/mqtt/event_cam.rs @@ -237,7 +237,7 @@ impl EventCamThread { val = async { info!("{}: Listening to Camera Motion", camera_config.name); motion_thread.run().await - } => { + }, if camera_config.mqtt.as_ref().expect("Should have an mqtt config at this point").enable_motion => { if let Err(e) = val { error!("Motion thread aborted: {:?}", e); Err(e) @@ -249,7 +249,7 @@ impl EventCamThread { val = async { debug!("{}: Starting Pings", camera_config.name); keepalive_thread.run().await - } => { + }, if camera_config.mqtt.as_ref().expect("Should have an mqtt config at this point").enable_pings => { if let Err(e) = val { debug!("Ping thread aborted: {:?}", e); Err(e) @@ -261,7 +261,7 @@ impl EventCamThread { val = async { info!("{}: Listening to FloodLight Status", camera_config.name); flight_thread.run().await - } => { + }, if camera_config.mqtt.as_ref().expect("Should have an mqtt config at this point").enable_light => { if let Err(e) = val { error!("FloodLight thread aborted: {:?}", e); Err(e) @@ -273,7 +273,7 @@ impl EventCamThread { val = async { info!("{}: Updating Preview", camera_config.name); snap_thread.run().await - } => { + }, if camera_config.mqtt.as_ref().expect("Should have an mqtt config at this point").enable_preview => { if let Err(e) = val { error!("Snap thread aborted: {:?}", e); Err(e) @@ -285,7 +285,7 @@ impl EventCamThread { val = async { info!("{}: Updating Battery Level", camera_config.name); battery_thread.run().await - } => { + }, if camera_config.mqtt.as_ref().expect("Should have an mqtt config at this point").enable_battery => { if let Err(e) = val { error!("Battery thread aborted: {:?}", e); Err(e) From f73dbacd2674585b9e77ea173a63a16bb8cbb8cb Mon Sep 17 00:00:00 2001 From: QuantumEntangledAndy Date: Thu, 8 Jun 2023 16:42:41 +0700 Subject: [PATCH 23/23] fmt --- src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.rs b/src/config.rs index 2b0bf128..075b87df 100644 --- a/src/config.rs +++ b/src/config.rs @@ -148,7 +148,7 @@ pub(crate) struct MqttConfig { #[serde(default)] pub(crate) client_auth: Option<(std::path::PathBuf, std::path::PathBuf)>, - + #[serde(default = "default_true")] pub(crate) enable_motion: bool, #[serde(default = "default_true")]