diff --git a/backend/src/endpoints.rs b/backend/src/endpoints.rs index 091980b..5ac86a1 100644 --- a/backend/src/endpoints.rs +++ b/backend/src/endpoints.rs @@ -53,7 +53,7 @@ pub async fn things_update( let payload = payload.into_inner(); service - .update(&Id { application, thing }, payload, &OPTS) + .update(&Id { application, thing }, &payload, &OPTS) .await?; Ok(HttpResponse::NoContent().json(json!({}))) @@ -67,7 +67,7 @@ pub async fn things_patch( let payload = payload.into_inner(); service - .update(&path.into_inner(), JsonPatchUpdater(payload), &OPTS) + .update(&path.into_inner(), &JsonPatchUpdater(payload), &OPTS) .await?; Ok(HttpResponse::NoContent().json(json!({}))) @@ -81,7 +81,7 @@ pub async fn things_merge( let payload = payload.into_inner(); service - .update(&path.into_inner(), JsonMergeUpdater(payload), &OPTS) + .update(&path.into_inner(), &JsonMergeUpdater(payload), &OPTS) .await?; Ok(HttpResponse::NoContent().json(json!({}))) @@ -97,7 +97,7 @@ pub async fn things_update_reported_state Result { let payload = payload.into_inner(); - service.update(&path.into_inner(), payload, &OPTS).await?; + service.update(&path.into_inner(), &payload, &OPTS).await?; Ok(HttpResponse::NoContent().json(json!({}))) } @@ -200,7 +200,8 @@ pub async fn things_delete( service: web::Data>, path: web::Path, ) -> Result { - service.delete(&path.into_inner()).await?; + // FIXME: allow adding preconditions + service.delete(&path.into_inner(), None).await?; Ok(HttpResponse::NoContent().json(json!({}))) } diff --git a/core/src/command/mod.rs b/core/src/command/mod.rs index fb31238..977e497 100644 --- a/core/src/command/mod.rs +++ b/core/src/command/mod.rs @@ -15,7 +15,7 @@ pub struct Command { #[async_trait] pub trait CommandSink: Sized + Send + Sync + 'static { - type Error: std::error::Error; + type Error: std::error::Error + Send + Sync; type Config: Clone + Debug + DeserializeOwned; fn from_config(spawner: &mut dyn Spawner, config: Self::Config) -> anyhow::Result; diff --git a/core/src/lib.rs b/core/src/lib.rs index 5166a7b..656cf69 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -17,3 +17,39 @@ pub mod version; pub mod waker; pub use self::serde::is_default; +use crate::model::Thing; + +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct Preconditions<'o> { + /// Required resource version. + pub resource_version: Option<&'o str>, + /// Required resource UID. + pub uid: Option<&'o str>, +} + +impl<'o> From<&'o Thing> for Preconditions<'o> { + fn from(thing: &'o Thing) -> Self { + Self { + resource_version: thing.metadata.resource_version.as_deref(), + uid: thing.metadata.uid.as_deref(), + } + } +} + +impl Preconditions<'_> { + pub fn matches(&self, thing: &Thing) -> bool { + if let Some(resource_version) = self.resource_version { + if Some(resource_version) != thing.metadata.resource_version.as_deref() { + return false; + } + } + + if let Some(uid) = self.uid { + if Some(uid) != thing.metadata.uid.as_deref() { + return false; + } + } + + return true; + } +} diff --git a/core/src/machine/mod.rs b/core/src/machine/mod.rs index 2859708..db7e007 100644 --- a/core/src/machine/mod.rs +++ b/core/src/machine/mod.rs @@ -1,19 +1,16 @@ mod deno; mod desired; - -use crate::command::Command; -use crate::machine::deno::{DenoOptions, Json}; -use crate::machine::desired::{CommandBuilder, Context, DesiredReconciler, FeatureContext}; -use crate::model::{ - Changed, Code, DesiredFeatureMethod, DesiredFeatureReconciliation, DesiredMode, JsonSchema, - Metadata, Reconciliation, Schema, SyntheticType, Thing, ThingState, Timer, WakerExt, - WakerReason, +mod recon; + +use crate::machine::recon::Reconciler; +use crate::{ + command::Command, + machine::deno::{DenoOptions, Json}, + model::{Code, JsonSchema, Metadata, Schema, Thing, ThingState}, + processor::Message, }; -use crate::processor::Message; use anyhow::anyhow; -use chrono::{DateTime, Duration, Utc}; use deno_core::url::Url; -use indexmap::IndexMap; use jsonschema::{Draft, JSONSchema, SchemaResolver, SchemaResolverError}; use lazy_static::lazy_static; use prometheus::{register_histogram, Histogram}; @@ -25,10 +22,16 @@ lazy_static! { register_histogram!("timer_delay", "Amount of time by which timers are delayed").unwrap(); } +#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] +pub struct OutboxMessage { + pub thing: String, + pub message: Message, +} + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Mutator: {0}")] - Mutator(Box), + Mutator(Box), #[error("Reconciler: {0}")] Reconcile(#[source] anyhow::Error), #[error("Validation failed: {0}")] @@ -48,6 +51,11 @@ pub struct Outcome { pub commands: Vec, } +pub struct DeletionOutcome { + pub thing: Thing, + pub outbox: Vec, +} + impl Machine { pub fn new(thing: Thing) -> Self { Self { thing } @@ -64,9 +72,6 @@ impl Machine { .update(|_| async { Ok::<_, Infallible>(new_thing) }) .await?; - // validate the outcome - Self::validate(&outcome.new_thing)?; - // done Ok(outcome) } @@ -76,7 +81,7 @@ impl Machine { where F: FnOnce(Thing) -> Fut, Fut: Future>, - E: std::error::Error + 'static, + E: std::error::Error + Send + Sync + 'static, { // capture immutable or internal metadata let Metadata { @@ -84,6 +89,7 @@ impl Machine { application, uid, creation_timestamp, + deletion_timestamp, generation, resource_version, annotations: _, @@ -126,6 +132,7 @@ impl Machine { application, uid, creation_timestamp, + deletion_timestamp, generation, resource_version, ..new_thing.metadata @@ -142,6 +149,65 @@ impl Machine { }) } + pub async fn delete(thing: Thing) -> Result { + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(1); + + let thing = Arc::new(thing); + let mut deletion_outbox = vec![]; + + for (name, deleting) in &thing.reconciliation.deleting { + match &deleting.code { + Code::JavaScript(script) => { + // We align with the names the other scripts + #[derive(Clone, Debug, serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct Input { + current_state: Arc, + new_state: Arc, + + outbox: Vec, + logs: Vec, + } + #[derive(Clone, Default, Debug, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + struct Output { + #[serde(default)] + outbox: Vec, + #[serde(default)] + logs: Vec, + } + + let exec = deno::Execution::new( + format!("delete-{}", name), + script, + DenoOptions { deadline }, + ) + .run::<_, Json, ()>(Input { + current_state: thing.clone(), + new_state: thing.clone(), + outbox: vec![], + logs: vec![], + }) + .await + .map_err(Error::Reconcile)?; + + // we can only ignore logs, as either we succeed (and would delete the logs) + // or we fail, and don't store them. + let Output { + outbox, + logs: _logs, + } = exec.output.0; + deletion_outbox.extend(outbox) + } + } + } + + Ok(DeletionOutcome { + thing: (*thing).clone(), + outbox: deletion_outbox, + }) + } + fn validate(new_thing: &Thing) -> Result<(), Error> { match &new_thing.schema { Some(Schema::Json(schema)) => match schema { @@ -174,510 +240,6 @@ impl Machine { } } -#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)] -pub struct OutboxMessage { - pub thing: String, - pub message: Message, -} - -pub struct Reconciler { - deadline: tokio::time::Instant, - current_thing: Arc, - new_thing: Thing, - outbox: Vec, - commands: Vec, -} - -impl Reconciler { - pub fn new(current_thing: Arc, new_thing: Thing) -> Self { - let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(1); - - Self { - current_thing, - new_thing, - deadline, - outbox: Default::default(), - commands: Default::default(), - } - } - - pub async fn run(mut self) -> Result { - // cleanup first - self.cleanup(); - - // detect reported state changes - self.sync_reported_state(); - - // synthetics - self.generate_synthetics().await?; - - // run code - let Reconciliation { changed, timers } = self.new_thing.reconciliation.clone(); - self.reconcile_changed(changed).await?; - self.reconcile_timers(timers).await?; - - // reconcile desired state - self.reconcile_desired_state().await?; - - Ok(Outcome { - new_thing: self.new_thing, - outbox: self.outbox, - commands: self.commands, - }) - } - - fn cleanup(&mut self) { - // clear reconcile waker - self.new_thing.clear_wakeup(WakerReason::Reconcile); - - // clear old logs first, otherwise logging of state will continuously grow - // FIXME: remove when we only send a view of the state to the reconcile code - for (_, v) in &mut self.new_thing.reconciliation.changed { - v.last_log.clear(); - } - } - - fn sync_reported_state(&mut self) { - // we ensure that all reported values which changed from the previous value get an updated - // last_update timestamp - for (k, next) in &mut self.new_thing.reported_state { - if let Some(previous) = self.current_thing.reported_state.get(k) { - if previous.value != next.value { - next.last_update = Utc::now(); - } - } - } - } - - async fn generate_synthetics(&mut self) -> Result<(), Error> { - let now = Utc::now(); - - let new_state = Arc::new(self.new_thing.clone()); - - for (name, mut syn) in &mut self.new_thing.synthetic_state { - let value = - Self::run_synthetic(name, &syn.r#type, new_state.clone(), self.deadline).await?; - if syn.value != value { - syn.value = value; - syn.last_update = now; - } - } - - Ok(()) - } - - /// sync the state with the reported and expected state - fn sync_desired_state(&mut self) -> Result<(), Error> { - let mut waker = self.new_thing.waker(); - - for (name, mut desired) in &mut self.new_thing.desired_state { - // update the last change timestamp - - // find the current value - let reported_value = self - .new_thing - .synthetic_state - .get(name) - .map(|state| &state.value) - .or_else(|| { - self.new_thing - .reported_state - .get(name) - .map(|state| &state.value) - }) - .unwrap_or(&Value::Null); - let desired_value = &desired.value; - - // check if there is a change from the previous state - if let Some(previous) = self.current_thing.desired_state.get(name) { - if previous.value != desired.value || previous.valid_until != desired.valid_until { - // desired value changed, start reconciling again - desired.reconciliation = - DesiredFeatureReconciliation::Reconciling { last_attempt: None }; - desired.last_update = Utc::now(); - } - } - - if matches!(desired.method, DesiredFeatureMethod::Manual) { - continue; - } - - match (&desired.reconciliation, &desired.mode) { - // Mode is disabled, and we already are ... - (DesiredFeatureReconciliation::Disabled { .. }, DesiredMode::Disabled) => { - // ... do nothing - } - - // Mode is disabled, but we are not... - (_, DesiredMode::Disabled) => { - // ... mark disabled - desired.reconciliation = - DesiredFeatureReconciliation::Disabled { when: Utc::now() }; - } - - // Mode is not disabled, but we are are - (DesiredFeatureReconciliation::Disabled { .. }, _) => { - if reported_value != desired_value { - // not the same - if desired.valid_until.map(|u| u > Utc::now()).unwrap_or(true) { - // the value is still valid, back to reconciling - desired.reconciliation = - DesiredFeatureReconciliation::Reconciling { last_attempt: None }; - } else { - // the value is no longer valid - desired.reconciliation = DesiredFeatureReconciliation::Failed { - when: Utc::now(), - reason: Some( - "Activated reconciliation with expired value".to_string(), - ), - }; - } - } else { - // equals => means success - desired.reconciliation = - DesiredFeatureReconciliation::Succeeded { when: Utc::now() } - } - } - - // Mode is "keep sync", and we succeeded - (DesiredFeatureReconciliation::Succeeded { .. }, DesiredMode::Sync) => { - // if we should keep it in sync, check values and if the value is still valid - if reported_value != desired_value - && desired.valid_until.map(|u| u > Utc::now()).unwrap_or(true) - { - // if not, back to reconciling - desired.reconciliation = - DesiredFeatureReconciliation::Reconciling { last_attempt: None }; - - if let Some(valid_until) = desired.valid_until { - // and set waker - waker.wakeup_at(valid_until, WakerReason::Reconcile); - } - } - } - - // succeeded and not (sync), or failed - (DesiredFeatureReconciliation::Succeeded { .. }, _) - | (DesiredFeatureReconciliation::Failed { .. }, _) => { - // we do nothing - } - - // we are reconciling - (DesiredFeatureReconciliation::Reconciling { .. }, _) => { - if reported_value == desired_value { - // value changed to expected value -> success - desired.reconciliation = - DesiredFeatureReconciliation::Succeeded { when: Utc::now() }; - } else if let Some(valid_until) = desired.valid_until { - // value did not change to expected value, and expired -> failure - if valid_until < Utc::now() { - desired.reconciliation = DesiredFeatureReconciliation::Failed { - when: Utc::now(), - reason: None, - }; - } else { - // otherwise, start waker - waker.wakeup_at(valid_until, WakerReason::Reconcile); - } - } - // else -> keep going - } - } - } - - // set possible waker - - self.new_thing.set_waker(waker); - - // done - - Ok(()) - } - - async fn reconcile_desired_state(&mut self) -> Result<(), Error> { - // sync first - self.sync_desired_state()?; - - // get the current waker - let mut waker = self.new_thing.waker(); - let new_thing = Arc::new(self.new_thing.clone()); - - let mut commands = CommandBuilder::default(); - - let mut context = Context { - new_thing, - deadline: self.deadline, - waker: &mut waker, - commands: &mut commands, - }; - - // process next - for (name, desired) in &mut self.new_thing.desired_state { - let value = desired.value.clone(); - - match &mut desired.reconciliation { - DesiredFeatureReconciliation::Disabled { .. } - | DesiredFeatureReconciliation::Succeeded { .. } - | DesiredFeatureReconciliation::Failed { .. } => { - // we do nothing - } - DesiredFeatureReconciliation::Reconciling { last_attempt } => { - match &desired.method { - DesiredFeatureMethod::Manual | DesiredFeatureMethod::External => { - // we do nothing - } - - DesiredFeatureMethod::Command(command) => command - .reconcile( - &mut context, - FeatureContext { - name, - last_attempt, - value, - }, - ) - .await - .map_err(|err| Error::Reconcile(anyhow!(err)))?, - - DesiredFeatureMethod::Code(code) => code - .reconcile( - &mut context, - FeatureContext { - name, - last_attempt, - value, - }, - ) - .await - .map_err(Error::Reconcile)?, - } - } - } - } - - self.commands.extend( - commands - .into_commands(&self.new_thing.metadata.application) - .map_err(|err| Error::Reconcile(anyhow!(err)))?, - ); - - // set waker, possibly changed - self.new_thing.set_waker(waker); - - // done - Ok(()) - } - - async fn reconcile_changed(&mut self, changed: IndexMap) -> Result<(), Error> { - for (name, mut changed) in changed { - let ExecutionResult { logs } = self - .run_code(format!("changed-{}", name), &changed.code) - .await?; - - changed.last_log = logs; - self.new_thing.reconciliation.changed.insert(name, changed); - } - - Ok(()) - } - - async fn reconcile_timers(&mut self, timers: IndexMap) -> Result<(), Error> { - for (name, mut timer) in timers { - let due = match timer.stopped { - true => { - // timer is stopped, just keep it stopped - timer.last_started = None; - None - } - false => { - let last_started = match timer.last_started { - None => { - let now = Utc::now(); - timer.last_started = Some(now); - now - } - Some(last_started) => last_started, - }; - - // now check if the timer is due - match (timer.last_run, timer.initial_delay) { - (Some(last_run), _) => { - // timer already ran, check if it is due again - Some(Self::find_next_run_from( - last_started, - timer.period, - last_run, - )) - } - (None, None) => { - // timer never ran, and there is no delay, run now - Some(Utc::now()) - } - (None, Some(initial_delay)) => { - // timer never ran, check it the first run is due - Some( - last_started - + Duration::from_std(initial_delay) - .unwrap_or_else(|_| Duration::max_value()), - ) - } - } - } - }; - - if let Some(due) = due { - let diff = Utc::now() - due; - - let next_run = if diff >= Duration::zero() { - log::debug!("Late by: {diff}"); - TIMER_DELAY.observe(diff.num_milliseconds() as f64); - - let now = Utc::now(); - - self.run_code(format!("timer-{}", name), &timer.code) - .await?; - - let next_run = - Self::find_next_run(timer.last_started.unwrap_or(now), timer.period); - - log::info!("Next run: {next_run}"); - - timer.last_run = Some(now); - - next_run - } else { - due - }; - - self.new_thing.wakeup_at(next_run, WakerReason::Reconcile); - } - - self.new_thing.reconciliation.timers.insert(name, timer); - } - - Ok(()) - } - - async fn run_code(&mut self, name: String, code: &Code) -> Result { - match code { - Code::JavaScript(script) => { - #[derive(serde::Serialize)] - #[serde(rename_all = "camelCase")] - struct Input { - current_state: Arc, - new_state: Thing, - // the following items are scooped off by the output, but we need to initialize - // them to present, but empty values for the scripts. - outbox: Vec, - logs: Vec, - } - - #[derive(Clone, Default, Debug, serde::Deserialize)] - #[serde(rename_all = "camelCase")] - pub struct Output { - #[serde(default)] - new_state: Option, - #[serde(default)] - outbox: Vec, - #[serde(default)] - logs: Vec, - #[serde(default, with = "deno::duration")] - waker: Option, - } - - let opts = DenoOptions { - deadline: self.deadline, - }; - let deno = deno::Execution::new(name, script, opts); - let out = deno - .run::<_, Json, ()>(Input { - current_state: self.current_thing.clone(), - new_state: self.new_thing.clone(), - outbox: vec![], - logs: vec![], - }) - .await - .map_err(Error::Reconcile)?; - - // FIXME: record error (if any) - - let Output { - new_state, - waker, - outbox, - logs, - } = out.output.0; - - let mut new_state = new_state.unwrap_or_else(|| self.new_thing.clone()); - - // schedule the waker, in the new state - if let Some(duration) = waker { - new_state.wakeup(duration, WakerReason::Reconcile); - } - // set the new state - self.new_thing = new_state; - // extend outbox - self.outbox.extend(outbox); - - // done - Ok(ExecutionResult { logs }) - } - } - } - - async fn run_synthetic( - name: &str, - r#type: &SyntheticType, - new_state: Arc, - deadline: tokio::time::Instant, - ) -> Result { - match r#type { - SyntheticType::JavaScript(script) => { - #[derive(serde::Serialize)] - #[serde(rename_all = "camelCase")] - struct Input { - new_state: Arc, - } - - let opts = DenoOptions { deadline }; - let deno = deno::Execution::new(name, script, opts); - let out = deno - .run::<_, (), Value>(Input { new_state }) - .await - .map_err(Error::Reconcile)?; - - Ok(out.return_value) - } - SyntheticType::Alias(alias) => match new_state.reported_state.get(alias) { - Some(value) => Ok(value.value.clone()), - None => Ok(Value::Null), - }, - } - } - - fn find_next_run_from( - last_started: DateTime, - period: std::time::Duration, - now: DateTime, - ) -> DateTime { - let period_ms = period.as_millis().clamp(0, u32::MAX as u128) as u32; - let diff = (now - last_started).num_milliseconds(); - - if diff < 0 { - return Utc::now(); - } - - let diff = diff.clamp(0, u32::MAX as i64) as u32; - let periods = (diff / period_ms) + 1; - - last_started + Duration::milliseconds((periods * period_ms) as i64) - } - - fn find_next_run(last_started: DateTime, period: std::time::Duration) -> DateTime { - Self::find_next_run_from(last_started, period, Utc::now()) - } -} - pub struct ExecutionResult { pub logs: Vec, } @@ -696,7 +258,6 @@ mod test { use crate::model::{Metadata, ReportedFeature}; use chrono::{DateTime, TimeZone, Utc}; use std::collections::BTreeMap; - use std::time::Duration; #[tokio::test] async fn test_create() { @@ -795,6 +356,7 @@ mod test { application: "default".to_string(), uid: Some(UID.to_string()), creation_timestamp: Some(creation_timestamp()), + deletion_timestamp: None, generation: Some(1), resource_version: Some("1".to_string()), annotations: Default::default(), @@ -813,29 +375,4 @@ mod test { internal: Default::default(), } } - - #[test] - fn test_next() { - assert_next((0, 0, 0), (0, 1, 0), 1, (0, 1, 1)); - assert_next((0, 0, 0), (0, 1, 2), 10, (0, 1, 10)); - assert_next((0, 0, 0), (0, 0, 1), 1, (0, 0, 2)); - } - - fn assert_next( - started: (u32, u32, u32), - now: (u32, u32, u32), - period: u64, - expected: (u32, u32, u32), - ) { - let day = Utc.ymd(2022, 1, 1); - - assert_eq!( - Reconciler::find_next_run_from( - day.and_hms(started.0, started.1, started.2), - Duration::from_secs(period), - day.and_hms(now.0, now.1, now.2) - ), - day.and_hms(expected.0, expected.1, expected.2) - ); - } } diff --git a/core/src/machine/recon.rs b/core/src/machine/recon.rs new file mode 100644 index 0000000..b0308a5 --- /dev/null +++ b/core/src/machine/recon.rs @@ -0,0 +1,561 @@ +use crate::{ + command::Command, + machine::{ + deno::{self, DenoOptions, Json}, + desired::{CommandBuilder, Context, DesiredReconciler, FeatureContext}, + Error, ExecutionResult, OutboxMessage, Outcome, TIMER_DELAY, + }, + model::{ + Changed, Code, DesiredFeatureMethod, DesiredFeatureReconciliation, DesiredMode, + Reconciliation, SyntheticType, Thing, Timer, WakerExt, WakerReason, + }, +}; +use anyhow::anyhow; +use chrono::{DateTime, Duration, Utc}; +use indexmap::IndexMap; +use serde_json::Value; +use std::sync::Arc; + +pub struct Reconciler { + deadline: tokio::time::Instant, + current_thing: Arc, + new_thing: Thing, + outbox: Vec, + commands: Vec, +} + +impl Reconciler { + pub fn new(current_thing: Arc, new_thing: Thing) -> Self { + let deadline = tokio::time::Instant::now() + tokio::time::Duration::from_secs(1); + + Self { + current_thing, + new_thing, + deadline, + outbox: Default::default(), + commands: Default::default(), + } + } + + pub async fn run(mut self) -> Result { + // cleanup first + self.cleanup(); + + // synthetics + self.generate_synthetics().await?; + + // run code + let Reconciliation { + changed, + timers, + deleting: _, + } = self.new_thing.reconciliation.clone(); + // reconcile changed and timers, but not deleting, as we don't delete + self.reconcile_changed(changed).await?; + self.reconcile_timers(timers).await?; + + // reconcile desired state + self.reconcile_desired_state().await?; + + // detect reported state changes + self.sync_reported_state(); + + Ok(Outcome { + new_thing: self.new_thing, + outbox: self.outbox, + commands: self.commands, + }) + } + + fn cleanup(&mut self) { + // clear reconcile waker + self.new_thing.clear_wakeup(WakerReason::Reconcile); + + // clear old logs first, otherwise logging of state will continuously grow + // FIXME: remove when we only send a view of the state to the reconcile code + for (_, v) in &mut self.new_thing.reconciliation.changed { + v.last_log.clear(); + } + } + + /// Synchronize the reported state changes with the previous state + /// + /// In case a value changed, the timestamp will be set to "now", otherwise the timestamp + /// will be copied from the previous value. + fn sync_reported_state(&mut self) { + let now = Utc::now(); + + for (k, next) in &mut self.new_thing.reported_state { + if let Some(previous) = self.current_thing.reported_state.get(k) { + if previous.value != next.value { + // set to now + next.last_update = now; + } else { + // restore timestamp + next.last_update = previous.last_update; + } + } + } + } + + async fn generate_synthetics(&mut self) -> Result<(), Error> { + let now = Utc::now(); + + let new_state = Arc::new(self.new_thing.clone()); + + for (name, mut syn) in &mut self.new_thing.synthetic_state { + let value = + Self::run_synthetic(name, &syn.r#type, new_state.clone(), self.deadline).await?; + if syn.value != value { + syn.value = value; + syn.last_update = now; + } + } + + Ok(()) + } + + /// sync the state with the reported and expected state + fn sync_desired_state(&mut self) -> Result<(), Error> { + let mut waker = self.new_thing.waker(); + + for (name, mut desired) in &mut self.new_thing.desired_state { + // update the last change timestamp + + // find the current value + let reported_value = self + .new_thing + .synthetic_state + .get(name) + .map(|state| &state.value) + .or_else(|| { + self.new_thing + .reported_state + .get(name) + .map(|state| &state.value) + }) + .unwrap_or(&Value::Null); + let desired_value = &desired.value; + + // check if there is a change from the previous state + if let Some(previous) = self.current_thing.desired_state.get(name) { + if previous.value != desired.value || previous.valid_until != desired.valid_until { + // desired value changed, start reconciling again + desired.reconciliation = + DesiredFeatureReconciliation::Reconciling { last_attempt: None }; + desired.last_update = Utc::now(); + } + } + + if matches!(desired.method, DesiredFeatureMethod::Manual) { + continue; + } + + match (&desired.reconciliation, &desired.mode) { + // Mode is disabled, and we already are ... + (DesiredFeatureReconciliation::Disabled { .. }, DesiredMode::Disabled) => { + // ... do nothing + } + + // Mode is disabled, but we are not... + (_, DesiredMode::Disabled) => { + // ... mark disabled + desired.reconciliation = + DesiredFeatureReconciliation::Disabled { when: Utc::now() }; + } + + // Mode is not disabled, but we are are + (DesiredFeatureReconciliation::Disabled { .. }, _) => { + if reported_value != desired_value { + // not the same + if desired.valid_until.map(|u| u > Utc::now()).unwrap_or(true) { + // the value is still valid, back to reconciling + desired.reconciliation = + DesiredFeatureReconciliation::Reconciling { last_attempt: None }; + } else { + // the value is no longer valid + desired.reconciliation = DesiredFeatureReconciliation::Failed { + when: Utc::now(), + reason: Some( + "Activated reconciliation with expired value".to_string(), + ), + }; + } + } else { + // equals => means success + desired.reconciliation = + DesiredFeatureReconciliation::Succeeded { when: Utc::now() } + } + } + + // Mode is "keep sync", and we succeeded + (DesiredFeatureReconciliation::Succeeded { .. }, DesiredMode::Sync) => { + // if we should keep it in sync, check values and if the value is still valid + if reported_value != desired_value + && desired.valid_until.map(|u| u > Utc::now()).unwrap_or(true) + { + // if not, back to reconciling + desired.reconciliation = + DesiredFeatureReconciliation::Reconciling { last_attempt: None }; + + if let Some(valid_until) = desired.valid_until { + // and set waker + waker.wakeup_at(valid_until, WakerReason::Reconcile); + } + } + } + + // succeeded and not (sync), or failed + (DesiredFeatureReconciliation::Succeeded { .. }, _) + | (DesiredFeatureReconciliation::Failed { .. }, _) => { + // we do nothing + } + + // we are reconciling + (DesiredFeatureReconciliation::Reconciling { .. }, _) => { + if reported_value == desired_value { + // value changed to expected value -> success + desired.reconciliation = + DesiredFeatureReconciliation::Succeeded { when: Utc::now() }; + } else if let Some(valid_until) = desired.valid_until { + // value did not change to expected value, and expired -> failure + if valid_until < Utc::now() { + desired.reconciliation = DesiredFeatureReconciliation::Failed { + when: Utc::now(), + reason: None, + }; + } else { + // otherwise, start waker + waker.wakeup_at(valid_until, WakerReason::Reconcile); + } + } + // else -> keep going + } + } + } + + // set possible waker + + self.new_thing.set_waker(waker); + + // done + + Ok(()) + } + + async fn reconcile_desired_state(&mut self) -> Result<(), Error> { + // sync first + self.sync_desired_state()?; + + // get the current waker + let mut waker = self.new_thing.waker(); + let new_thing = Arc::new(self.new_thing.clone()); + + let mut commands = CommandBuilder::default(); + + let mut context = Context { + new_thing, + deadline: self.deadline, + waker: &mut waker, + commands: &mut commands, + }; + + // process next + for (name, desired) in &mut self.new_thing.desired_state { + let value = desired.value.clone(); + + match &mut desired.reconciliation { + DesiredFeatureReconciliation::Disabled { .. } + | DesiredFeatureReconciliation::Succeeded { .. } + | DesiredFeatureReconciliation::Failed { .. } => { + // we do nothing + } + DesiredFeatureReconciliation::Reconciling { last_attempt } => { + match &desired.method { + DesiredFeatureMethod::Manual | DesiredFeatureMethod::External => { + // we do nothing + } + + DesiredFeatureMethod::Command(command) => command + .reconcile( + &mut context, + FeatureContext { + name, + last_attempt, + value, + }, + ) + .await + .map_err(|err| Error::Reconcile(anyhow!(err)))?, + + DesiredFeatureMethod::Code(code) => code + .reconcile( + &mut context, + FeatureContext { + name, + last_attempt, + value, + }, + ) + .await + .map_err(Error::Reconcile)?, + } + } + } + } + + self.commands.extend( + commands + .into_commands(&self.new_thing.metadata.application) + .map_err(|err| Error::Reconcile(anyhow!(err)))?, + ); + + // set waker, possibly changed + self.new_thing.set_waker(waker); + + // done + Ok(()) + } + + async fn reconcile_changed(&mut self, changed: IndexMap) -> Result<(), Error> { + for (name, mut changed) in changed { + let ExecutionResult { logs } = self + .run_code(format!("changed-{}", name), &changed.code) + .await?; + + changed.last_log = logs; + self.new_thing.reconciliation.changed.insert(name, changed); + } + + Ok(()) + } + + async fn reconcile_timers(&mut self, timers: IndexMap) -> Result<(), Error> { + for (name, mut timer) in timers { + let due = match timer.stopped { + true => { + // timer is stopped, just keep it stopped + timer.last_started = None; + None + } + false => { + let last_started = match timer.last_started { + None => { + let now = Utc::now(); + timer.last_started = Some(now); + now + } + Some(last_started) => last_started, + }; + + // now check if the timer is due + match (timer.last_run, timer.initial_delay) { + (Some(last_run), _) => { + // timer already ran, check if it is due again + Some(Self::find_next_run_from( + last_started, + timer.period, + last_run, + )) + } + (None, None) => { + // timer never ran, and there is no delay, run now + Some(Utc::now()) + } + (None, Some(initial_delay)) => { + // timer never ran, check it the first run is due + Some( + last_started + + Duration::from_std(initial_delay) + .unwrap_or_else(|_| Duration::max_value()), + ) + } + } + } + }; + + if let Some(due) = due { + let diff = Utc::now() - due; + + let next_run = if diff >= Duration::zero() { + log::debug!("Late by: {diff}"); + TIMER_DELAY.observe(diff.num_milliseconds() as f64); + + let now = Utc::now(); + + self.run_code(format!("timer-{}", name), &timer.code) + .await?; + + let next_run = + Self::find_next_run(timer.last_started.unwrap_or(now), timer.period); + + log::info!("Next run: {next_run}"); + + timer.last_run = Some(now); + + next_run + } else { + due + }; + + self.new_thing.wakeup_at(next_run, WakerReason::Reconcile); + } + + self.new_thing.reconciliation.timers.insert(name, timer); + } + + Ok(()) + } + + async fn run_code(&mut self, name: String, code: &Code) -> Result { + match code { + Code::JavaScript(script) => { + #[derive(serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct Input { + current_state: Arc, + new_state: Thing, + // the following items are scooped off by the output, but we need to initialize + // them to present, but empty values for the scripts. + outbox: Vec, + logs: Vec, + } + + #[derive(Clone, Default, Debug, serde::Deserialize)] + #[serde(rename_all = "camelCase")] + pub struct Output { + #[serde(default)] + new_state: Option, + #[serde(default)] + outbox: Vec, + #[serde(default)] + logs: Vec, + #[serde(default, with = "deno::duration")] + waker: Option, + } + + let opts = DenoOptions { + deadline: self.deadline, + }; + let deno = deno::Execution::new(name, script, opts); + let out = deno + .run::<_, Json, ()>(Input { + current_state: self.current_thing.clone(), + new_state: self.new_thing.clone(), + outbox: vec![], + logs: vec![], + }) + .await + .map_err(Error::Reconcile)?; + + // FIXME: record error (if any) + + let Output { + new_state, + waker, + outbox, + logs, + } = out.output.0; + + let mut new_state = new_state.unwrap_or_else(|| self.new_thing.clone()); + + // schedule the waker, in the new state + if let Some(duration) = waker { + new_state.wakeup(duration, WakerReason::Reconcile); + } + // set the new state + self.new_thing = new_state; + // extend outbox + self.outbox.extend(outbox); + + // done + Ok(ExecutionResult { logs }) + } + } + } + + async fn run_synthetic( + name: &str, + r#type: &SyntheticType, + new_state: Arc, + deadline: tokio::time::Instant, + ) -> Result { + match r#type { + SyntheticType::JavaScript(script) => { + #[derive(serde::Serialize)] + #[serde(rename_all = "camelCase")] + struct Input { + new_state: Arc, + } + + let opts = DenoOptions { deadline }; + let deno = deno::Execution::new(name, script, opts); + let out = deno + .run::<_, (), Value>(Input { new_state }) + .await + .map_err(Error::Reconcile)?; + + Ok(out.return_value) + } + SyntheticType::Alias(alias) => match new_state.reported_state.get(alias) { + Some(value) => Ok(value.value.clone()), + None => Ok(Value::Null), + }, + } + } + + fn find_next_run_from( + last_started: DateTime, + period: std::time::Duration, + now: DateTime, + ) -> DateTime { + let period_ms = period.as_millis().clamp(0, u32::MAX as u128) as u32; + let diff = (now - last_started).num_milliseconds(); + + if diff < 0 { + return Utc::now(); + } + + let diff = diff.clamp(0, u32::MAX as i64) as u32; + let periods = (diff / period_ms) + 1; + + last_started + Duration::milliseconds((periods * period_ms) as i64) + } + + fn find_next_run(last_started: DateTime, period: std::time::Duration) -> DateTime { + Self::find_next_run_from(last_started, period, Utc::now()) + } +} + +#[cfg(test)] +mod test { + + use super::*; + use chrono::{TimeZone, Utc}; + use std::time::Duration; + + #[test] + fn test_next() { + assert_next((0, 0, 0), (0, 1, 0), 1, (0, 1, 1)); + assert_next((0, 0, 0), (0, 1, 2), 10, (0, 1, 10)); + assert_next((0, 0, 0), (0, 0, 1), 1, (0, 0, 2)); + } + + fn assert_next( + started: (u32, u32, u32), + now: (u32, u32, u32), + period: u64, + expected: (u32, u32, u32), + ) { + let day = Utc.ymd(2022, 1, 1); + + assert_eq!( + Reconciler::find_next_run_from( + day.and_hms(started.0, started.1, started.2), + Duration::from_secs(period), + day.and_hms(now.0, now.1, now.2) + ), + day.and_hms(expected.0, expected.1, expected.2) + ); + } +} diff --git a/core/src/model/mod.rs b/core/src/model/mod.rs index 2fdfac5..a19e254 100644 --- a/core/src/model/mod.rs +++ b/core/src/model/mod.rs @@ -1,13 +1,16 @@ mod desired; +mod recon; +mod waker; pub use desired::*; +pub use recon::*; +pub use waker::*; use crate::processor::Event; use crate::service::Id; use base64::STANDARD; use base64_serde::base64_serde_type; use chrono::{DateTime, Duration, Utc}; -use indexmap::IndexMap; use serde_json::Value; use std::collections::{BTreeMap, BTreeSet}; @@ -42,6 +45,7 @@ impl Thing { application: application.into(), uid: None, creation_timestamp: None, + deletion_timestamp: None, generation: None, resource_version: None, annotations: Default::default(), @@ -87,6 +91,14 @@ impl Thing { } } } + + pub fn outbox(&self) -> &[Event] { + const EMPTY: [Event; 0] = []; + self.internal + .as_ref() + .map(|internal| internal.outbox.as_slice()) + .unwrap_or(&EMPTY) + } } /// The state view on thing model. @@ -161,6 +173,8 @@ pub struct Metadata { pub uid: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub creation_timestamp: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub deletion_timestamp: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub generation: Option, @@ -190,94 +204,6 @@ pub enum JsonSchema { Draft7(Value), } -#[derive( - Clone, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, -)] -#[serde(rename_all = "camelCase")] -pub struct Reconciliation { - #[serde(default, skip_serializing_if = "IndexMap::is_empty")] - pub changed: IndexMap, - #[serde(default, skip_serializing_if = "IndexMap::is_empty")] - pub timers: IndexMap, -} - -impl Reconciliation { - pub fn is_empty(&self) -> bool { - self.changed.is_empty() && self.timers.is_empty() - } -} - -#[derive( - Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, -)] -#[serde(rename_all = "camelCase")] -pub struct Changed { - #[serde(flatten)] - pub code: Code, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub last_log: Vec, -} - -impl From for Changed { - fn from(code: Code) -> Self { - Self { - code, - last_log: Default::default(), - } - } -} - -#[derive( - Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, -)] -#[serde(rename_all = "camelCase")] -pub struct Timer { - /// the code to run - #[serde(flatten)] - pub code: Code, - /// the period the timer is scheduled - #[serde(with = "humantime_serde")] - #[schemars(schema_with = "crate::schemars::humantime")] - pub period: std::time::Duration, - /// A flag to stop the timer - #[serde(default)] - pub stopped: bool, - /// the latest timestamp the timer was started - #[serde(default, skip_serializing_if = "Option::is_none")] - pub last_started: Option>, - /// the timestamp the timer last ran - #[serde(default, skip_serializing_if = "Option::is_none")] - pub last_run: Option>, - /// the logs of the last run - #[serde(default, skip_serializing_if = "Vec::is_empty")] - pub last_log: Vec, - - /// an optional, initial delay. if there is none, the time will be run the first time it is - /// configured - #[serde(with = "humantime_serde")] - #[serde(default, skip_serializing_if = "Option::is_none")] - #[schemars(schema_with = "crate::schemars::humantime")] - pub initial_delay: Option, -} - -impl Timer { - pub fn new( - period: std::time::Duration, - initial_delay: Option, - code: Code, - ) -> Self { - Self { - code, - last_log: Default::default(), - period, - last_started: None, - last_run: None, - stopped: false, - initial_delay, - } - } -} - #[derive( Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, )] @@ -354,90 +280,6 @@ impl Internal { } } -pub trait WakerExt { - fn wakeup_at(&mut self, when: DateTime, reason: WakerReason); - - fn wakeup(&mut self, delay: Duration, reason: WakerReason) { - self.wakeup_at(Utc::now() + delay, reason); - } - - fn clear_wakeup(&mut self, reason: WakerReason); -} - -impl WakerExt for Thing { - fn wakeup_at(&mut self, when: DateTime, reason: WakerReason) { - match &mut self.internal { - Some(internal) => { - internal.wakeup_at(when, reason); - } - None => { - let mut internal = Internal::default(); - internal.wakeup_at(when, reason); - self.internal = Some(internal); - } - } - } - - fn clear_wakeup(&mut self, reason: WakerReason) { - if let Some(internal) = &mut self.internal { - internal.clear_wakeup(reason); - } - } -} - -impl WakerExt for Internal { - fn wakeup_at(&mut self, when: DateTime, reason: WakerReason) { - self.waker.wakeup_at(when, reason); - } - - fn clear_wakeup(&mut self, reason: WakerReason) { - self.waker.clear_wakeup(reason); - } -} - -#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Waker { - pub when: Option>, - pub why: BTreeSet, -} - -impl Waker { - pub fn is_empty(&self) -> bool { - self.when.is_none() - } -} - -impl WakerExt for Waker { - fn wakeup_at(&mut self, when: DateTime, reason: WakerReason) { - self.why.insert(reason); - match self.when { - None => self.when = Some(when), - Some(w) => { - if w > when { - self.when = Some(when); - } - } - } - } - - fn clear_wakeup(&mut self, reason: WakerReason) { - self.why.remove(&reason); - if self.why.is_empty() { - self.when = None; - } - } -} - -#[derive( - Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, -)] -#[serde(rename_all = "camelCase")] -pub enum WakerReason { - Reconcile, - Outbox, -} - #[cfg(test)] mod test { use super::*; diff --git a/core/src/model/recon.rs b/core/src/model/recon.rs new file mode 100644 index 0000000..9f6910f --- /dev/null +++ b/core/src/model/recon.rs @@ -0,0 +1,134 @@ +use super::*; +use chrono::{DateTime, Utc}; +use indexmap::IndexMap; +use std::time::Duration; + +#[derive( + Clone, Debug, Default, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub struct Reconciliation { + #[serde(default, skip_serializing_if = "IndexMap::is_empty")] + pub changed: IndexMap, + #[serde(default, skip_serializing_if = "IndexMap::is_empty")] + pub timers: IndexMap, + #[serde(default, skip_serializing_if = "IndexMap::is_empty")] + pub deleting: IndexMap, +} + +impl Reconciliation { + pub fn is_empty(&self) -> bool { + self.changed.is_empty() && self.timers.is_empty() && self.deleting.is_empty() + } +} + +#[derive( + Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub struct Changed { + #[serde(flatten)] + pub code: Code, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub last_log: Vec, +} + +impl From for Changed { + fn from(code: Code) -> Self { + Self { + code, + last_log: Default::default(), + } + } +} + +#[derive( + Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub struct Timer { + /// the code to run + #[serde(flatten)] + pub code: Code, + /// the period the timer is scheduled + #[serde(with = "humantime_serde")] + #[schemars(schema_with = "crate::schemars::humantime")] + pub period: Duration, + /// A flag to stop the timer + #[serde(default)] + pub stopped: bool, + /// the latest timestamp the timer was started + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_started: Option>, + /// the timestamp the timer last ran + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_run: Option>, + /// the logs of the last run + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub last_log: Vec, + + /// an optional, initial delay. if there is none, the time will be run the first time it is + /// configured + #[serde(with = "humantime_serde")] + #[serde(default, skip_serializing_if = "Option::is_none")] + #[schemars(schema_with = "crate::schemars::humantime")] + pub initial_delay: Option, +} + +impl Timer { + pub fn new(period: Duration, initial_delay: Option, code: Code) -> Self { + Self { + code, + last_log: Default::default(), + period, + last_started: None, + last_run: None, + stopped: false, + initial_delay, + } + } +} + +#[derive( + Clone, Debug, PartialEq, Eq, serde::Deserialize, serde::Serialize, schemars::JsonSchema, +)] +#[serde(rename_all = "camelCase")] +pub struct Deleting { + #[serde(flatten)] + /// Code that will be executed before the thing will be deleted. + pub code: Code, +} + +#[cfg(test)] +mod test { + use super::*; + use serde_json::json; + + #[test] + pub fn test_ser() { + let mut thing = Thing::new("default", "thing1"); + thing.reconciliation.deleting.insert( + "test".to_string(), + Deleting { + code: Code::JavaScript("other code".to_string()), + }, + ); + + assert_eq!( + json!({ + "metadata": { + "application": "default", + "name": "thing1" + }, + "reconciliation": { + "deleting": { + "test": { + "javaScript": "other code", + }, + } + } + }), + serde_json::to_value(thing).unwrap() + ); + } +} diff --git a/core/src/model/waker.rs b/core/src/model/waker.rs new file mode 100644 index 0000000..f08281e --- /dev/null +++ b/core/src/model/waker.rs @@ -0,0 +1,87 @@ +use super::*; +use chrono::{DateTime, Utc}; + +pub trait WakerExt { + fn wakeup_at(&mut self, when: DateTime, reason: WakerReason); + + fn wakeup(&mut self, delay: Duration, reason: WakerReason) { + self.wakeup_at(Utc::now() + delay, reason); + } + + fn clear_wakeup(&mut self, reason: WakerReason); +} + +impl WakerExt for Thing { + fn wakeup_at(&mut self, when: DateTime, reason: WakerReason) { + match &mut self.internal { + Some(internal) => { + internal.wakeup_at(when, reason); + } + None => { + let mut internal = Internal::default(); + internal.wakeup_at(when, reason); + self.internal = Some(internal); + } + } + } + + fn clear_wakeup(&mut self, reason: WakerReason) { + if let Some(internal) = &mut self.internal { + internal.clear_wakeup(reason); + } + } +} + +impl WakerExt for Internal { + fn wakeup_at(&mut self, when: DateTime, reason: WakerReason) { + self.waker.wakeup_at(when, reason); + } + + fn clear_wakeup(&mut self, reason: WakerReason) { + self.waker.clear_wakeup(reason); + } +} + +#[derive(Clone, Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Waker { + pub when: Option>, + pub why: BTreeSet, +} + +impl Waker { + pub fn is_empty(&self) -> bool { + self.when.is_none() + } +} + +impl WakerExt for Waker { + fn wakeup_at(&mut self, when: DateTime, reason: WakerReason) { + self.why.insert(reason); + match self.when { + None => self.when = Some(when), + Some(w) => { + if w > when { + self.when = Some(when); + } + } + } + } + + fn clear_wakeup(&mut self, reason: WakerReason) { + self.why.remove(&reason); + if self.why.is_empty() { + self.when = None; + } + } +} + +#[derive( + Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize, +)] +#[serde(rename_all = "camelCase")] +pub enum WakerReason { + Reconcile, + Outbox, + Deletion, +} diff --git a/core/src/notifier/mod.rs b/core/src/notifier/mod.rs index 35fb3b9..4983e53 100644 --- a/core/src/notifier/mod.rs +++ b/core/src/notifier/mod.rs @@ -14,7 +14,7 @@ pub enum Error { #[async_trait] pub trait Notifier: Sized + Send + Sync + 'static { type Config: Clone + Debug + Send + Sync + serde::de::DeserializeOwned + 'static; - type Error: std::error::Error + Debug; + type Error: std::error::Error + Debug + Send + Sync; fn from_config(config: &Self::Config) -> anyhow::Result; diff --git a/core/src/processor/mod.rs b/core/src/processor/mod.rs index ae59ff4..3bc3b93 100644 --- a/core/src/processor/mod.rs +++ b/core/src/processor/mod.rs @@ -4,13 +4,13 @@ pub mod source; use crate::{ app::Spawner, command::CommandSink, - model::{Thing, WakerReason}, + model::{Reconciliation, Thing, WakerReason}, notifier::Notifier, processor::{sink::Sink, source::Source}, service::{ - self, DesiredStateValueUpdater, DesiredStateValueUpdaterError, Id, JsonMergeUpdater, - JsonPatchUpdater, MergeError, PatchError, ReportedStateUpdater, Service, UpdateMode, - UpdateOptions, Updater, + self, Cleanup, DesiredStateValueUpdater, Id, InfallibleUpdater, JsonMergeUpdater, + JsonPatchUpdater, MapValueInserter, MapValueRemover, ReportedStateUpdater, Service, + UpdateMode, UpdateOptions, Updater, UpdaterExt, }, storage::{self, Storage}, }; @@ -23,7 +23,7 @@ use prometheus::{ IntCounterVec, }; use serde_json::Value; -use std::{collections::BTreeMap, convert::Infallible}; +use std::collections::BTreeMap; use uuid::Uuid; lazy_static! { @@ -40,6 +40,7 @@ pub struct Event { pub id: String, pub timestamp: DateTime, pub application: String, + // FIXME: rename to thing pub device: String, pub message: Message, } @@ -89,6 +90,44 @@ pub enum Message { Wakeup { reasons: Vec, }, + /// Create a thing if it doesn't yet exists, and register a child. + RegisterChild { + #[serde(rename = "$ref")] + r#ref: String, + #[serde(default)] + template: ThingTemplate, + }, + /// Unregister a child, and delete the thing if it was the last child. + UnregisterChild { + #[serde(rename = "$ref")] + r#ref: String, + }, +} + +#[derive(Clone, Debug, Default, PartialEq, serde::Deserialize, serde::Serialize)] +#[serde(rename_all = "camelCase")] +pub struct ThingTemplate { + #[serde(default, skip_serializing_if = "Reconciliation::is_empty")] + pub reconciliation: Reconciliation, +} + +impl InfallibleUpdater for ThingTemplate { + fn update(&self, mut thing: Thing) -> Thing { + thing + .reconciliation + .changed + .extend(self.reconciliation.changed.clone()); + thing + .reconciliation + .timers + .extend(self.reconciliation.timers.clone()); + thing + .reconciliation + .deleting + .extend(self.reconciliation.deleting.clone()); + + thing + } } impl Message { @@ -146,11 +185,9 @@ impl From for ReportedStateUpdater { } } -impl Updater for ReportStateBuilder { - type Error = ::Error; - - fn update(self, thing: Thing) -> Result { - ReportedStateUpdater::from(self).update(thing) +impl InfallibleUpdater for ReportStateBuilder { + fn update(&self, thing: Thing) -> Thing { + InfallibleUpdater::update(&ReportedStateUpdater::from(self.clone()), thing) } } @@ -195,16 +232,190 @@ where Self { service, source } } + /// Cleanup a thing, ignore if missing. + /// + /// NOTE: This function respects a change in the `deletion_timestamp` and will trigger a + /// deletion if the updater sets it. + async fn run_cleanup( + service: &Service, + id: &Id, + updater: U, + ) -> Result<(), anyhow::Error> + where + U: Updater + Send + Sync, + { + let opts = UpdateOptions { + ignore_unclean_inbox: false, + }; + + loop { + let thing = service.get(&id).await?; + match thing { + Some(thing) => { + if thing.metadata.deletion_timestamp.is_some() { + log::debug!("Thing is already being deleted"); + // cleaned up + break; + } + let thing = updater.update(thing)?; + + let result = if thing.metadata.deletion_timestamp.is_some() { + // perform delete + service + .delete(&id, Some(&(&thing).into())) + .await + .map(|_| ()) + } else { + // perform update + service.update(&id, &thing, &opts).await.map(|_| ()) + }; + match result { + Ok(_) => { + break; + } + Err(service::Error::Storage(storage::Error::PreconditionFailed)) => { + // retry + continue; + } + Err(err) => { + return Err(anyhow!(err)); + } + } + } + None => { + // cleaned up + break; + } + } + } + + Ok(()) + } + + /// Either update or insert a new thing + async fn run_upsert( + service: &Service, + id: &Id, + updater: U, + ) -> Result<(), anyhow::Error> + where + U: Updater + Send + Sync, + { + let opts = UpdateOptions { + ignore_unclean_inbox: false, + }; + + // FIXME: consider taking this into the service + + loop { + let thing = service.get(&id).await?; + match thing { + Some(thing) => { + let thing = updater.update(thing)?; + match service.update(&id, &thing, &opts).await { + Ok(_) => { + break; + } + Err(service::Error::Storage( + storage::Error::NotFound | storage::Error::PreconditionFailed, + )) => { + // retry + continue; + } + Err(err) => { + return Err(anyhow!(err)); + } + } + } + None => { + let thing = Thing::new(&id.application, &id.thing); + let thing = updater.update(thing)?; + + match service.create(thing).await { + Ok(_) => { + break; + } + Err(service::Error::Storage(storage::Error::AlreadyExists)) => { + // retry + continue; + } + Err(err) => { + return Err(anyhow!(err)); + } + } + } + } + } + + Ok(()) + } + + async fn run_update( + service: &Service, + id: &Id, + updater: U, + ) -> Result<(), anyhow::Error> + where + U: Updater, + { + let opts = UpdateOptions { + ignore_unclean_inbox: false, + }; + + loop { + match service.update(id, &updater, &opts).await { + Ok(_) => { + log::debug!("Processing complete ... ok!"); + UPDATES.with_label_values(&["ok"]).inc(); + break; + } + Err(service::Error::Storage(storage::Error::PreconditionFailed)) => { + UPDATES.with_label_values(&["oplock"]).inc(); + // op-lock failure, retry + continue; + } + Err(service::Error::Storage(storage::Error::NotFound)) => { + UPDATES.with_label_values(&["not-found"]).inc(); + // the thing does not exists, skip + break; + } + Err(service::Error::Storage(storage::Error::NotAllowed)) => { + UPDATES.with_label_values(&["not-allowed"]).inc(); + // not allowed to modify thing, skip + break; + } + Err(service::Error::Notifier(err)) => { + UPDATES.with_label_values(&["notifier"]).inc(); + log::warn!("Failed to notify: {err}"); + // not much we can do + // FIXME: consider using a circuit breaker + break; + } + Err(service::Error::Machine(err)) => { + UPDATES.with_label_values(&["machine"]).inc(); + log::info!("Failed to process state machine: {err}"); + // the state machine turned the state into some error (e.g. validation) + // ignore and continue + // FIXME: consider adding a "status" field with the error + break; + } + Err(err) => { + UPDATES.with_label_values(&["other"]).inc(); + log::warn!("Failed to process: {err}"); + return Err(anyhow!("Failed to process: {err}")); + } + } + } + + Ok(()) + } + pub async fn run(self) -> anyhow::Result<()> { self.source .run(|event| async { log::debug!("Event: {event:?}"); EVENTS.inc(); - let opts = UpdateOptions { - ignore_unclean_inbox: false, - }; - let _timer = PROCESSING_TIME.start_timer(); let Event { @@ -219,48 +430,51 @@ where thing: device, }; - loop { - match self.service.update(&id, message.clone(), &opts).await { - Ok(_) => { - log::debug!("Processing complete ... ok!"); - UPDATES.with_label_values(&["ok"]).inc(); - break; - } - Err(service::Error::Storage(storage::Error::PreconditionFailed)) => { - UPDATES.with_label_values(&["oplock"]).inc(); - // op-lock failure, retry - continue; - } - Err(service::Error::Storage(storage::Error::NotFound)) => { - UPDATES.with_label_values(&["not-found"]).inc(); - // the thing does not exists, skip - break; - } - Err(service::Error::Storage(storage::Error::NotAllowed)) => { - UPDATES.with_label_values(&["not-allowed"]).inc(); - // not allowed to modify thing, skip - break; - } - Err(service::Error::Notifier(err)) => { - UPDATES.with_label_values(&["notifier"]).inc(); - log::warn!("Failed to notify: {err}"); - // not much we can do - // FIXME: consider using a circuit breaker - break; - } - Err(service::Error::Machine(err)) => { - UPDATES.with_label_values(&["machine"]).inc(); - log::info!("Failed to process state machine: {err}"); - // the state machine turned the state into some error (e.g. validation) - // ignore and continue - // FIXME: consider adding a "status" field with the error - break; - } - Err(err) => { - UPDATES.with_label_values(&["other"]).inc(); - log::warn!("Failed to process: {err}"); - return Err(anyhow!("Failed to process: {err}")); - } + match message { + Message::RegisterChild { r#ref, template } => { + Self::run_upsert( + &self.service, + &id, + MapValueInserter("$children".to_string(), r#ref).and_then(template), + ) + .await?; + } + Message::UnregisterChild { r#ref } => { + Self::run_cleanup( + &self.service, + &id, + MapValueRemover("$children".to_string(), r#ref) + .and_then(Cleanup("$children".to_string())), + ) + .await?; + } + Message::ReportState { state, partial } => { + Self::run_update( + &self.service, + &id, + ReportedStateUpdater( + state, + match partial { + true => UpdateMode::Merge, + false => UpdateMode::Replace, + }, + ), + ) + .await? + } + Message::Merge(merge) => { + Self::run_update(&self.service, &id, JsonMergeUpdater(merge)).await? + } + Message::Patch(patch) => { + Self::run_update(&self.service, &id, JsonPatchUpdater(patch)).await? + } + Message::Wakeup { reasons: _ } => { + // don't do any real change, this will just reconcile and process what is necessary + Self::run_update(&self.service, &id, ()).await? + } + Message::SetDesiredValue { values } => { + Self::run_update(&self.service, &id, DesiredStateValueUpdater(values)) + .await? } } @@ -273,41 +487,3 @@ where Ok(()) } } - -#[derive(Debug, thiserror::Error)] -pub enum MessageError { - #[error("That should be impossible")] - Infallible(#[from] Infallible), - #[error("Failed to apply JSON patch: {0}")] - Patch(#[from] PatchError), - #[error("Failed to apply JSON merge: {0}")] - Merge(#[from] MergeError), - #[error("Failed to apply desired value: {0}")] - SetDesiredValues(#[from] DesiredStateValueUpdaterError), -} - -impl Updater for Message { - type Error = MessageError; - - fn update(self, thing: Thing) -> Result { - match self { - Message::ReportState { state, partial } => Ok(ReportedStateUpdater( - state, - match partial { - true => UpdateMode::Merge, - false => UpdateMode::Replace, - }, - ) - .update(thing)?), - Message::Patch(patch) => Ok(JsonPatchUpdater(patch).update(thing)?), - Message::Merge(merge) => Ok(JsonMergeUpdater(merge).update(thing)?), - Message::Wakeup { reasons: _ } => { - // don't do any real change, this will just reconcile and process what is necessary - Ok(thing) - } - Message::SetDesiredValue { values } => { - Ok(DesiredStateValueUpdater(values).update(thing)?) - } - } - } -} diff --git a/core/src/service/mod.rs b/core/src/service/mod.rs index d931051..7c0348b 100644 --- a/core/src/service/mod.rs +++ b/core/src/service/mod.rs @@ -8,11 +8,12 @@ pub use updater::*; use crate::app::Spawner; use crate::command::CommandSink; -use crate::machine::{Machine, OutboxMessage, Outcome}; +use crate::machine::{DeletionOutcome, Machine, OutboxMessage, Outcome}; use crate::model::{Thing, WakerExt, WakerReason}; use crate::notifier::Notifier; use crate::processor::{sink::Sink, Event}; use crate::storage::{self, Storage}; +use crate::Preconditions; use chrono::{Duration, Utc}; use lazy_static::lazy_static; use prometheus::{register_int_counter, IntCounter}; @@ -154,26 +155,98 @@ impl Service Result> { - self.storage - .delete(&id.application, &id.thing) + pub async fn delete( + &self, + id: &Id, + opts: Option<&Preconditions<'_>>, + ) -> Result> { + // get the current thing + + log::debug!("Deleting thing: {id}"); + + let mut thing = match self.storage.get(&id.application, &id.thing).await { + Ok(Some(thing)) => thing, + // not found, we are done here + Err(storage::Error::NotFound) | Ok(None) => return Ok(false), + Err(err) => return Err(Error::Storage(err)), + }; + + if let Some(opts) = opts { + if !opts.matches(&thing) { + return Err(Error::Storage(storage::Error::PreconditionFailed)); + } + } + + if thing.metadata.deletion_timestamp.is_some() { + // already marked as deleted + return Ok(false); + } + + let original_empty = thing + .internal + .as_ref() + .map(|internal| internal.outbox.is_empty()) + .unwrap_or(true); + + // mark deleted + thing.metadata.deletion_timestamp = Some(Utc::now()); + + // run machine for deletion + let DeletionOutcome { mut thing, outbox } = Machine::delete(thing).await?; + // add outbox + Self::add_outbox(&mut thing, outbox); + // check if the thing's outbox contains events + if !thing.outbox().is_empty() { + // if so, store, which also stores the deletion marker + thing = self.storage.update(thing).await.map_err(Error::Storage)?; + // from here on, we are marked deleted and need to re-process if we fail with the next step + if original_empty { + // it was originally empty, so we can send and ack, otherwise the waker will + // trigger (and delete) us later + thing = self.send_and_ack(thing).await?; + } + } + + if thing.outbox().is_empty() { + // if the outbox is empty, delete + self.storage + .delete_with( + &id.application, + &id.thing, + Preconditions { + resource_version: thing.metadata.resource_version.as_deref(), + uid: thing.metadata.uid.as_deref(), + }, + ) + .await + .or_else(|err| match err { + // if we didn't find what we want to delete, this is just fine + storage::Error::NotFound => Ok(false), + err => Err(Error::Storage(err)), + })?; + } + + // notify + self.notifier + .notify(&thing) .await - .or_else(|err| match err { - // if we didn't find what we want to delete, this is just fine - storage::Error::NotFound => Ok(false), - err => Err(Error::Storage(err)), - }) + .map_err(Error::Notifier)?; + + // done + Ok(true) } pub async fn update( &self, id: &Id, - updater: U, + updater: &U, opts: &UpdateOptions, ) -> Result> where U: Updater, { + log::debug!("Updating thing: {id}"); + let current_thing = self .storage .get(&id.application, &id.thing) @@ -181,6 +254,19 @@ impl Service Service Service>(); + let outbox = outbox.to_vec(); match self.sink.publish_iter(outbox).await { Ok(()) => { diff --git a/core/src/service/updater.rs b/core/src/service/updater.rs index 3c04a14..ec57413 100644 --- a/core/src/service/updater.rs +++ b/core/src/service/updater.rs @@ -1,23 +1,73 @@ -use crate::model::{ - DesiredFeature, DesiredFeatureMethod, DesiredFeatureReconciliation, DesiredMode, - Reconciliation, ReportedFeature, SyntheticFeature, SyntheticType, Thing, +use crate::{ + model::{ + Deleting, DesiredFeature, DesiredFeatureMethod, DesiredFeatureReconciliation, DesiredMode, + Reconciliation, ReportedFeature, SyntheticFeature, SyntheticType, Thing, + }, + processor::SetDesiredValue, }; -use crate::processor::SetDesiredValue; use chrono::{DateTime, Utc}; +use indexmap::IndexMap; +use serde_json::{json, Value}; +use std::{ + collections::{btree_map::Entry, BTreeMap}, + convert::Infallible, + fmt::Debug, + time::Duration, +}; + pub use json_patch::Patch; -use serde_json::Value; -use std::collections::{btree_map::Entry, BTreeMap}; -use std::convert::Infallible; -use std::time::Duration; pub trait Updater { - type Error: std::error::Error + 'static; + type Error: std::error::Error + Send + Sync + 'static; + + fn update(&self, thing: Thing) -> Result; +} + +pub trait UpdaterExt: Updater + Sized { + fn and_then(self, updater: U) -> AndThenUpdater + where + U: Updater, + { + AndThenUpdater(self, updater) + } +} + +impl UpdaterExt for T where T: Updater {} + +#[derive(Debug, thiserror::Error)] +pub enum AndThenError +where + E1: std::error::Error, + E2: std::error::Error, +{ + #[error("First: {0}")] + First(#[source] E1), + #[error("Second: {0}")] + Second(#[source] E2), +} + +pub struct AndThenUpdater(U1, U2) +where + U1: Updater, + U2: Updater; - fn update(self, thing: Thing) -> Result; +impl Updater for AndThenUpdater +where + U1: Updater, + U2: Updater, +{ + type Error = AndThenError; + + fn update(&self, thing: Thing) -> Result { + Ok(self + .1 + .update(self.0.update(thing).map_err(AndThenError::First)?) + .map_err(AndThenError::Second)?) + } } pub trait InfallibleUpdater { - fn update(self, thing: Thing) -> Thing; + fn update(&self, thing: Thing) -> Thing; } impl Updater for I @@ -26,11 +76,17 @@ where { type Error = Infallible; - fn update(self, thing: Thing) -> Result { + fn update(&self, thing: Thing) -> Result { Ok(InfallibleUpdater::update(self, thing)) } } +impl InfallibleUpdater for () { + fn update(&self, thing: Thing) -> Thing { + thing + } +} + pub enum UpdateMode { Merge, Replace, @@ -45,14 +101,93 @@ impl UpdateMode { } } +pub struct MapValueInserter(pub String, pub String); + +impl InfallibleUpdater for MapValueInserter { + fn update(&self, mut thing: Thing) -> Thing { + match thing.reported_state.entry(self.0.clone()) { + Entry::Occupied(mut entry) => { + let e = entry.get_mut(); + match &mut e.value { + Value::Object(fields) => { + fields.insert(self.1.clone(), Value::Null); + } + _ => { + *e = ReportedFeature::now(json!({ self.1.clone(): null })); + } + } + } + Entry::Vacant(entry) => { + entry.insert(ReportedFeature::now(json!({ self.1.clone(): null }))); + } + } + + thing + } +} + +/// Cleanup, in case a reported value is empty +/// +/// NOTE: This only works for calls which respect a change on the `deletion_timestamp` field, which +/// currently only the processor does. +pub struct Cleanup(pub String); + +impl InfallibleUpdater for Cleanup { + fn update(&self, mut thing: Thing) -> Thing { + if thing + .reported_state + .get(&self.0) + .map(|f| match &f.value { + Value::Object(map) => map.is_empty(), + Value::Array(array) => array.is_empty(), + // all other types are considered "empty" + _ => true, + }) + .unwrap_or(true) + { + log::debug!( + "Reference is empty, scheduling deletion of {}/{}", + thing.metadata.application, + thing.metadata.name + ); + // mark deleted + thing.metadata.deletion_timestamp = Some(Utc::now()); + } + + thing + } +} + +pub struct MapValueRemover(pub String, pub String); + +impl InfallibleUpdater for MapValueRemover { + fn update(&self, mut thing: Thing) -> Thing { + match thing.reported_state.entry(self.0.clone()) { + Entry::Occupied(mut entry) => match &mut entry.get_mut().value { + Value::Object(fields) => { + fields.remove(&self.1); + } + _ => { + // nothing to do + } + }, + Entry::Vacant(_) => { + // nothing to do + } + } + + thing + } +} + pub struct ReportedStateUpdater(pub BTreeMap, pub UpdateMode); impl InfallibleUpdater for ReportedStateUpdater { - fn update(self, mut thing: Thing) -> Thing { + fn update(&self, mut thing: Thing) -> Thing { match self.1 { // merge new data into current, update timestamps when the value has indeed changed UpdateMode::Merge => { - for (key, value) in self.0 { + for (key, value) in self.0.clone() { match thing.reported_state.entry(key) { Entry::Occupied(mut e) => { let e = e.get_mut(); @@ -72,7 +207,7 @@ impl InfallibleUpdater for ReportedStateUpdater { // data differed from the newly provided. UpdateMode::Replace => { let mut new_state = BTreeMap::new(); - for (key, value) in self.0 { + for (key, value) in self.0.clone() { match thing.reported_state.remove_entry(&key) { Some((key, feature)) => { if feature.value == value { @@ -95,15 +230,15 @@ impl InfallibleUpdater for ReportedStateUpdater { } impl InfallibleUpdater for Reconciliation { - fn update(self, mut thing: Thing) -> Thing { - thing.reconciliation = self; + fn update(&self, mut thing: Thing) -> Thing { + thing.reconciliation = self.clone(); thing } } impl InfallibleUpdater for Thing { - fn update(self, _: Thing) -> Thing { - self + fn update(&self, _: Thing) -> Thing { + self.clone() } } @@ -121,7 +256,7 @@ pub enum PatchError { impl Updater for JsonPatchUpdater { type Error = PatchError; - fn update(self, thing: Thing) -> Result { + fn update(&self, thing: Thing) -> Result { let mut json = serde_json::to_value(thing)?; json_patch::patch(&mut json, &self.0)?; Ok(serde_json::from_value(json)?) @@ -138,7 +273,7 @@ pub struct MergeError(#[from] serde_json::Error); impl Updater for JsonMergeUpdater { type Error = MergeError; - fn update(self, thing: Thing) -> Result { + fn update(&self, thing: Thing) -> Result { let mut json = serde_json::to_value(thing)?; json_patch::merge(&mut json, &self.0); Ok(serde_json::from_value(json)?) @@ -148,14 +283,14 @@ impl Updater for JsonMergeUpdater { pub struct SyntheticStateUpdater(pub String, pub SyntheticType); impl InfallibleUpdater for SyntheticStateUpdater { - fn update(self, mut thing: Thing) -> Thing { - match thing.synthetic_state.entry(self.0) { + fn update(&self, mut thing: Thing) -> Thing { + match thing.synthetic_state.entry(self.0.clone()) { Entry::Occupied(mut entry) => { - entry.get_mut().r#type = self.1; + entry.get_mut().r#type = self.1.clone(); } Entry::Vacant(entry) => { entry.insert(SyntheticFeature { - r#type: self.1, + r#type: self.1.clone(), last_update: Utc::now(), value: Default::default(), }); @@ -198,7 +333,7 @@ pub struct DesiredStateUpdater(pub String, pub DesiredStateUpdate); impl Updater for DesiredStateUpdater { type Error = DesiredStateUpdaterError; - fn update(self, mut thing: Thing) -> Result { + fn update(&self, mut thing: Thing) -> Result { let DesiredStateUpdate { value, valid_until, @@ -206,14 +341,14 @@ impl Updater for DesiredStateUpdater { reconciliation, method, mode, - } = self.1; + } = self.1.clone(); let valid_until = valid_until.or(valid_for .map(chrono::Duration::from_std) .transpose()? .map(|d| Utc::now() + d)); - match thing.desired_state.entry(self.0) { + match thing.desired_state.entry(self.0.clone()) { Entry::Occupied(mut entry) => { // we update what we got let entry = entry.get_mut(); @@ -259,10 +394,10 @@ pub struct DesiredStateValueUpdater(pub BTreeMap); impl Updater for DesiredStateValueUpdater { type Error = DesiredStateValueUpdaterError; - fn update(self, mut thing: Thing) -> Result { + fn update(&self, mut thing: Thing) -> Result { let mut missing = vec![]; - for (name, set) in self.0 { + for (name, set) in self.0.clone() { if let Some(state) = thing.desired_state.get_mut(&name) { match set { SetDesiredValue::Value(value) => { @@ -287,6 +422,48 @@ impl Updater for DesiredStateValueUpdater { } } +pub struct AnnotationsUpdater(pub BTreeMap>); + +impl AnnotationsUpdater { + pub fn new, V: Into>(annotation: A, value: V) -> Self { + let mut map = BTreeMap::new(); + map.insert(annotation.into(), Some(value.into())); + Self(map) + } +} + +impl InfallibleUpdater for AnnotationsUpdater { + fn update(&self, mut thing: Thing) -> Thing { + for (k, v) in &self.0 { + match v { + Some(v) => { + thing + .metadata + .annotations + .insert(k.to_string(), v.to_string()); + } + None => { + thing.metadata.annotations.remove(k); + } + } + } + thing + } +} + +impl InfallibleUpdater for IndexMap { + fn update(&self, mut thing: Thing) -> Thing { + for (k, v) in self { + thing + .reconciliation + .deleting + .insert(k.to_string(), v.clone()); + } + + thing + } +} + #[cfg(test)] mod test { @@ -305,7 +482,7 @@ mod test { let mut data = BTreeMap::::new(); data.insert("foo".into(), "bar".into()); let mut thing = - InfallibleUpdater::update(ReportedStateUpdater(data, UpdateMode::Merge), thing); + InfallibleUpdater::update(&ReportedStateUpdater(data, UpdateMode::Merge), thing); assert_eq!( thing.reported_state.remove("foo").map(|s| s.value), @@ -320,11 +497,53 @@ mod test { let mut data = BTreeMap::::new(); data.insert("foo".into(), "bar".into()); let mut thing = - InfallibleUpdater::update(ReportedStateUpdater(data, UpdateMode::Replace), thing); + InfallibleUpdater::update(&ReportedStateUpdater(data, UpdateMode::Replace), thing); assert_eq!( thing.reported_state.remove("foo").map(|s| s.value), Some(Value::String("bar".to_string())) ); } + + #[test] + fn test_map_value() { + let thing = new_thing(); + + let thing = InfallibleUpdater::update( + &MapValueInserter("$children".to_string(), "id1".to_string()), + thing, + ); + assert_eq!( + thing.reported_state["$children"].value, + json!({ + "id1": null, + }) + ); + let thing = InfallibleUpdater::update( + &MapValueInserter("$children".to_string(), "id2".to_string()), + thing, + ); + assert_eq!( + thing.reported_state["$children"].value, + json!({ + "id1": null, + "id2": null, + }) + ); + let thing = InfallibleUpdater::update( + &MapValueRemover("$children".to_string(), "id1".to_string()), + thing, + ); + assert_eq!( + thing.reported_state["$children"].value, + json!({ + "id2": null, + }) + ); + let thing = InfallibleUpdater::update( + &MapValueRemover("$children".to_string(), "id2".to_string()), + thing, + ); + assert_eq!(thing.reported_state["$children"].value, json!({})); + } } diff --git a/core/src/storage/mod.rs b/core/src/storage/mod.rs index f9a5902..ec182ca 100644 --- a/core/src/storage/mod.rs +++ b/core/src/storage/mod.rs @@ -1,12 +1,17 @@ pub mod postgres; -use crate::model::{Metadata, Thing}; +use crate::{ + model::{Metadata, Thing}, + Preconditions, +}; use async_trait::async_trait; -use std::fmt::Debug; -use std::future::Future; +use std::{fmt::Debug, future::Future}; #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum Error +where + E: Send + Sync, +{ /// Returned when an option should modify a thing, but it could not be found. /// /// Not used, when not finding the things isn't a problem. @@ -27,7 +32,11 @@ pub enum Error { } #[derive(thiserror::Error)] -pub enum UpdateError { +pub enum UpdateError +where + SE: Send + Sync, + UE: Send + Sync, +{ #[error("Service error: {0}")] Service(#[from] Error), #[error("Mutator error: {0}")] @@ -37,7 +46,7 @@ pub enum UpdateError { #[async_trait] pub trait Storage: Sized + Send + Sync + 'static { type Config: Clone + Debug + Send + Sync + serde::de::DeserializeOwned + 'static; - type Error: std::error::Error + Debug; + type Error: std::error::Error + Debug + Send + Sync; fn from_config(config: &Self::Config) -> anyhow::Result; @@ -66,6 +75,7 @@ pub trait Storage: Sized + Send + Sync + 'static { application, uid, creation_timestamp, + deletion_timestamp, resource_version, generation, annotations: _, @@ -81,6 +91,7 @@ pub trait Storage: Sized + Send + Sync + 'static { application, uid, creation_timestamp, + deletion_timestamp, resource_version, generation, ..new_thing.metadata @@ -97,5 +108,16 @@ pub trait Storage: Sized + Send + Sync + 'static { } /// Delete a thing. Return `true` if the thing was deleted, `false` if it didn't exist. - async fn delete(&self, application: &str, name: &str) -> Result>; + async fn delete(&self, application: &str, name: &str) -> Result> { + self.delete_with(application, name, Default::default()) + .await + } + + /// Delete a thing. Return `true` if the thing was deleted, `false` if it didn't exist. + async fn delete_with( + &self, + application: &str, + name: &str, + opts: Preconditions<'_>, + ) -> Result>; } diff --git a/core/src/storage/postgres/mod.rs b/core/src/storage/postgres/mod.rs index 95d28e8..034967c 100644 --- a/core/src/storage/postgres/mod.rs +++ b/core/src/storage/postgres/mod.rs @@ -6,6 +6,7 @@ use crate::{ SyntheticFeature, Thing, }, storage::{self}, + Preconditions, }; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -28,6 +29,7 @@ pub struct Config { pub struct ThingEntity { pub uid: Uuid, pub creation_timestamp: DateTime, + pub deletion_timestamp: Option>, pub generation: u32, pub resource_version: Uuid, @@ -79,6 +81,7 @@ impl TryFrom for ThingEntity { Ok(Self { uid: row.try_get("UID")?, creation_timestamp: row.try_get("CREATION_TIMESTAMP")?, + deletion_timestamp: row.try_get("DELETION_TIMESTAMP")?, generation: row.try_get::<_, i64>("GENERATION")? as u32, resource_version: row.try_get("RESOURCE_VERSION")?, @@ -145,6 +148,7 @@ impl super::Storage for Storage { SELECT UID, CREATION_TIMESTAMP, + DELETION_TIMESTAMP, GENERATION, RESOURCE_VERSION, ANNOTATIONS, @@ -179,6 +183,7 @@ WHERE application: application.to_string(), uid: Some(entity.uid.to_string()), creation_timestamp: Some(entity.creation_timestamp), + deletion_timestamp: entity.deletion_timestamp, resource_version: Some(entity.resource_version.to_string()), generation: Some(entity.generation), @@ -322,7 +327,7 @@ WHERE let annotations = Json(&thing.metadata.annotations); let labels = Json(&thing.metadata.labels); - let mut types: Vec = Vec::new(); + let mut types = Vec::new(); let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new(); types.push(Type::VARCHAR); params.push(name); @@ -385,7 +390,12 @@ RETURNING } } - async fn delete(&self, application: &str, name: &str) -> Result { + async fn delete_with( + &self, + application: &str, + name: &str, + opts: Preconditions<'_>, + ) -> Result { if let Err(storage::Error::NotFound) = self.ensure_app(application, || storage::Error::NotFound) { @@ -396,30 +406,43 @@ RETURNING log::debug!("Deleting thing: {application} / {name}"); - let stmt = con - .prepare_typed_cached( - r#" + let mut types = vec![ + Type::VARCHAR, // name + Type::VARCHAR, // application + ]; + + let mut params: Vec<&(dyn ToSql + Sync)> = Vec::new(); + params.push(&name); + params.push(&application); + + let mut stmt = r#" DELETE FROM things WHERE NAME = $1 AND APPLICATION = $2 -"#, - &[ - Type::VARCHAR, // name - Type::VARCHAR, // application - ], - ) - .await - .map_err(Error::Postgres)?; +"# + .to_string(); + + if let Some(resource_version) = opts.resource_version.as_ref() { + types.push(Type::VARCHAR); + params.push(resource_version); + stmt.push_str(&format!("AND RESOURCE_VERSION::text=${}", types.len())); + } - // FIXME: add uid and resource version + if let Some(uid) = opts.uid.as_ref() { + types.push(Type::VARCHAR); + params.push(uid); + stmt.push_str(&format!("AND UID::text=${}", types.len())); + } - let rows = con - .execute(&stmt, &[&name, &application]) + let stmt = con + .prepare_typed_cached(&stmt, &types) .await .map_err(Error::Postgres)?; + let rows = con.execute(&stmt, ¶ms).await.map_err(Error::Postgres)?; + Ok(rows > 0) } } diff --git a/core/tests/base/hierarchy.rs b/core/tests/base/hierarchy.rs new file mode 100644 index 0000000..ca0d349 --- /dev/null +++ b/core/tests/base/hierarchy.rs @@ -0,0 +1,297 @@ +use crate::common::mock::{setup, RunningContext}; +use drogue_doppelgaenger_core::{ + model::{Code, Deleting, Thing}, + processor::{Event, Message}, + service::{AnnotationsUpdater, Id, UpdateOptions}, +}; +use std::time::Duration; + +#[tokio::test] +async fn test_hierarchy() -> anyhow::Result<()> { + let RunningContext { + service, + mut notifier, + mut sink, + runner, + .. + } = setup().run(true); + + let common = r#" +function log(text) { + context.logs.push(text) +} + +if (context.newState.metadata.annotations === undefined) { + context.newState.metadata.annotations = {}; +} + +if (context.newState.reportedState === undefined) { + context.newState.reportedState = {}; +} + +function sendMessage(thing, message) { + context.outbox.push({thing, message}); +} + +function $ref() { + return context.newState.metadata.name; +} + +function normalize(group) { + if (group === undefined) { + return group; + } + return group.split('/').filter(t => t !== "") +} + +function parentGroup(group) { + if (group === undefined) { + return group; + } + if (group.length > 0) { + return group.slice(0,-1); + } else { + return undefined; + } +} + +function registerChild(reg, thing, $ref) { + if (reg) { + const deleting = context.newState.reconciliation.deleting["hierarchy"]; + const changed = context.newState.reconciliation.changed["hierarchy"]; + sendMessage(thing, {registerChild: {$ref, template: { + reconciliation: { + changed: { + hierarchy: changed, + }, + deleting: { + hierarchy: deleting, + } + } + }}}); + } else { + sendMessage(thing, {unregisterChild: {$ref}}); + } +} + +function registerChannel(reg, device, channel) { + log(`Register channel: ${device} / ${channel} (${reg})`); + + if (reg) { + if (context.newState.metadata.annotations["io.drogue/device"] !== device + || context.newState.metadata.annotations["io.drogue/channel"] !== channel + ) { + context.newState.metadata.annotations["io.drogue/device"] = device; + context.newState.metadata.annotations["io.drogue/channel"] = channel; + registerChild(true, device, $ref()) + } + + context.newState.reportedState["parent"] = {lastUpdate: new Date().toISOString(), value: {$ref: device}}; + } else { + registerChild(false, device, $ref()) + } +} + +function registerDevice(reg, device) { + log(`Register device: ${device} (${reg})`); + + let group = normalize(context.newState.metadata.annotations["io.drogue/group"]); + + if (group !== undefined) { + group = group.join('/'); + const parent = "/" + group; + if (reg) { + context.newState.metadata.annotations["io.drogue/device"] = device; + if (context.currentState.metadata.annotations?.["io.drogue/group"] !== group) { + registerChild(true, parent, $ref()); + } + context.newState.reportedState["parent"] = {lastUpdate: new Date().toISOString(), value: {$ref: parent}}; + } else { + registerChild(false, parent, $ref()); + } + } +} + +function registerGroup(reg, group) { + log(`Register group: ${group} (${reg})`); + + group = normalize(group); + groupValue = group.join('/'); + const parent = parentGroup(group); + + if (parent !== undefined) { + if (reg) { + if (context.newState.metadata.annotations["io.drogue/group"] !== groupValue) { + context.newState.metadata.annotations["io.drogue/group"] = groupValue; + + registerChild(true, "/" + parent.join('/'), $ref()) + } + context.newState.reportedState["parent"] = {lastUpdate: new Date().toISOString(), value: {$ref: parent}}; + } else { + registerChild(false, "/" + parent.join('/'), $ref()) + } + } +} + +function register(reg) { + const ref = $ref(); + if (ref.startsWith('/')) { + // group + registerGroup(reg, ref); + } else { + const s = ref.split('/', 2); + if (s.length >= 2) { + // channel + registerChannel(reg, s[0], s[1]); + } else { + // device + registerDevice(reg, s[0]); + } + } +} +"#; + + // expect a group of [foo, bar, baz], a thing named "device" and a thing named "device/channel" + let id = Id::new("default", "device/channel"); + let mut thing = Thing::with_id(&id); + thing.reconciliation.deleting.insert( + "hierarchy".to_string(), + Deleting { + code: Code::JavaScript(format!( + r#" +{common} + +register(false); +"# + )), + }, + ); + thing.reconciliation.changed.insert( + "hierarchy".to_string(), + Code::JavaScript(format!( + r#" +{common} + +register(true); +"# + )) + .into(), + ); + let thing = service.create(thing).await.unwrap(); + + log::info!("Thing: {thing:#?}"); + + assert_eq!(thing.metadata.annotations["io.drogue/device"], "device"); + assert_eq!(thing.metadata.annotations["io.drogue/channel"], "channel"); + + // FIXME: should listen to events instead + tokio::time::sleep(Duration::from_secs(1)).await; + + let device_id = Id::new("default", "device"); + service.get(&device_id).await?.expect("Device level thing"); + + // now set the group + let device = service + .update( + &device_id, + &AnnotationsUpdater::new("io.drogue/group", "foo/bar/baz"), + &UpdateOptions { + ignore_unclean_inbox: false, + }, + ) + .await?; + + log::info!("Device thing: {device:#?}"); + + assert_eq!( + device.metadata.annotations["io.drogue/group"], + "foo/bar/baz" + ); + + // FIXME: should listen to events instead + tokio::time::sleep(Duration::from_secs(1)).await; + + let group_l_3 = service + .get(&Id::new("default", "/foo/bar/baz")) + .await? + .expect("Group level 3 thing"); + let group_l_2 = service + .get(&Id::new("default", "/foo/bar")) + .await + .unwrap() + .expect("Group level 2 thing"); + let group_l_1 = service + .get(&Id::new("default", "/foo")) + .await + .unwrap() + .expect("Group level 1 thing"); + let root = service + .get(&Id::new("default", "/")) + .await + .unwrap() + .expect("Root level thing"); + + // do some update, so ensure e.g. reported state updates don't cause any further events + runner + .send_wait(Event::new( + &id.application, + &id.thing, + Message::report_state(true).state("foo", "bar"), + )) + .await + .unwrap(); + + // not start the destruction + + service + .delete(&id, None) + .await + .expect("Deletion successful"); + + // FIXME: should listen to events instead + tokio::time::sleep(Duration::from_secs(2)).await; + + // everything must be gone + for i in [ + "/", + "/foo", + "/foo/bar", + "/foo/bar/baz", + "device", + "device/channel", + ] { + let thing = service.get(&Id::new("default", i)).await?; + assert!(thing.is_none(), "Thing {i} must be gone: {thing:#?}",); + } + + let changes = notifier.drain().await; + for (n, c) in changes.iter().enumerate() { + log::debug!("Change {n:2}: {c:?}") + } + assert_eq!( + changes.len(), + 1 // create channel + + 1 // auto-create device + + 1 // manual update device + + 4 // auto-create groups + + 1 // reported state update + + 1 // delete channel + + 5 // auto-delete device and groups + ); + + let events = sink.drain().await; + for (n, e) in events.iter().enumerate() { + log::debug!("Event {n}: {e:?}") + } + assert_eq!( + events.len(), + 1 // creating the device thing + + 4 // creating the group things + + 4 // deleting the group things + + 1 // deleting the device thing + ); + + // done + runner.shutdown().await.unwrap(); + Ok(()) +} diff --git a/core/tests/base/mod.rs b/core/tests/base/mod.rs index 71fc5e6..c7f6f45 100644 --- a/core/tests/base/mod.rs +++ b/core/tests/base/mod.rs @@ -1,3 +1,4 @@ +mod hierarchy; mod processor; mod service; mod waker; diff --git a/core/tests/base/service.rs b/core/tests/base/service.rs index 283dfdf..672a0d6 100644 --- a/core/tests/base/service.rs +++ b/core/tests/base/service.rs @@ -53,9 +53,9 @@ async fn delete() { assert_eq!(notifier.drain().await, vec![thing]); - let found = service.delete(&id).await.unwrap(); + let found = service.delete(&id, None).await.unwrap(); assert_eq!(found, true); - let found = service.delete(&id).await.unwrap(); + let found = service.delete(&id, None).await.unwrap(); assert_eq!(found, false); } @@ -97,7 +97,7 @@ async fn update() { ..thing.clone() }; - let thing_1 = service.update(&id, thing_1, &OPTS).await.unwrap(); + let thing_1 = service.update(&id, &thing_1, &OPTS).await.unwrap(); assert_eq!(notifier.drain().await, vec![thing_1.clone()]); @@ -167,7 +167,7 @@ async fn update_no_change() { assert_eq!(notifier.drain().await, vec![thing.clone()]); - let thing_1 = service.update(&id, thing.clone(), &OPTS).await.unwrap(); + let thing_1 = service.update(&id, &thing, &OPTS).await.unwrap(); assert_eq!(notifier.drain().await, vec![]); diff --git a/core/tests/common/mock.rs b/core/tests/common/mock.rs index 13606b9..f8ed055 100644 --- a/core/tests/common/mock.rs +++ b/core/tests/common/mock.rs @@ -13,6 +13,7 @@ use drogue_doppelgaenger_core::{ service::{Id, Service}, storage::{Error, Storage}, waker::{self, TargetId, Waker}, + Preconditions, }; use std::collections::{btree_map::Entry, BTreeMap, HashMap}; use std::convert::Infallible; @@ -146,12 +147,30 @@ impl Storage for MockStorage { result } - async fn delete(&self, application: &str, name: &str) -> Result> { + async fn delete_with( + &self, + application: &str, + name: &str, + opts: Preconditions<'_>, + ) -> Result> { if application != self.application { return Ok(false); } - Ok(self.things.write().await.remove(name).is_some()) + Ok(match self.things.write().await.entry(name.to_string()) { + Entry::Occupied(entry) => { + if !opts.matches(entry.get()) { + false + } else { + entry.remove(); + true + } + } + Entry::Vacant(entry) => { + // nothing do to + false + } + }) } } diff --git a/core/tests/failures/event.rs b/core/tests/failures/event.rs index d17b628..aa1d6d0 100644 --- a/core/tests/failures/event.rs +++ b/core/tests/failures/event.rs @@ -25,7 +25,7 @@ impl<'t> TestRunner<'t> { .service .update( self.source, - ReportStateBuilder::partial().state("foo", "bar"), + &ReportStateBuilder::partial().state("foo", "bar"), self.opts, ) .await; diff --git a/database-migration/migrations/00000000000000_init/up.sql b/database-migration/migrations/00000000000000_init/up.sql index f1e19d8..2f9427a 100644 --- a/database-migration/migrations/00000000000000_init/up.sql +++ b/database-migration/migrations/00000000000000_init/up.sql @@ -4,6 +4,7 @@ CREATE TABLE things ( APPLICATION VARCHAR(64) NOT NULL, UID uuid NOT NULL, CREATION_TIMESTAMP TIMESTAMP WITH TIME ZONE NOT NULL, + DELETION_TIMESTAMP TIMESTAMP WITH TIME ZONE NULL, -- resource information RESOURCE_VERSION uuid NOT NULL, diff --git a/examples/20_reconcile/recon1.js b/examples/20_reconcile/recon1.js index edf7fee..aa3f0df 100644 --- a/examples/20_reconcile/recon1.js +++ b/examples/20_reconcile/recon1.js @@ -1,13 +1,13 @@ function updateLabel(key, value) { if (value !== undefined) { - if (newState.metadata.labels === undefined) { - newState.metadata.labels = {}; + if (context.newState.metadata.labels === undefined) { + context.newState.metadata.labels = {}; } - newState.metadata.labels[key] = value; + context.newState.metadata.labels[key] = value; } else { - if (newState.metadata.labels !== undefined) { - delete newState.metadata.labels[key]; + if (context.newState.metadata.labels !== undefined) { + delete context.newState.metadata.labels[key]; } } } @@ -17,5 +17,5 @@ function flagLabel(key, state) { } // check over temp -flagLabel("overTemp", newState?.reportedState?.temperature?.value > 60); -flagLabel("highTemp", newState?.reportedState?.temperature?.value > 50); +flagLabel("overTemp", context.newState?.reportedState?.temperature?.value > 60); +flagLabel("highTemp", context.newState?.reportedState?.temperature?.value > 50); diff --git a/examples/20_reconcile/recon2.js b/examples/20_reconcile/recon2.js index 26e7627..47c588c 100644 --- a/examples/20_reconcile/recon2.js +++ b/examples/20_reconcile/recon2.js @@ -4,13 +4,13 @@ const PROPERTY = "temp"; function updateLabel(key, value) { if (value !== undefined) { - if (newState.metadata.labels === undefined) { - newState.metadata.labels = {}; + if (context.newState.metadata.labels === undefined) { + context.newState.metadata.labels = {}; } - newState.metadata.labels[key] = value; + context.newState.metadata.labels[key] = value; } else { - if (newState.metadata.labels !== undefined) { - delete newState.metadata.labels[key]; + if (context.newState.metadata.labels !== undefined) { + delete context.newState.metadata.labels[key]; } } } @@ -20,32 +20,32 @@ function flagLabel(key, state) { } // check over temp -flagLabel("highTemp", newState?.reportedState?.[PROPERTY]?.value > WARNING_THRESHOLD); -flagLabel("overTemp", newState?.reportedState?.[PROPERTY]?.value > ALARM_THRESHOLD); +flagLabel("highTemp", context.newState?.reportedState?.[PROPERTY]?.value > WARNING_THRESHOLD); +flagLabel("overTemp", context.newState?.reportedState?.[PROPERTY]?.value > ALARM_THRESHOLD); function log(text) { - //logs.push(text) + //context.logs.push(text) } //log(`Before: ${JSON.stringify(newState, null, 2)}`); function changed(property) { - let currentValue = currentState?.reportedState?.[property]?.value; - let newValue = newState?.reportedState?.[property]?.value; + let currentValue = context.currentState?.reportedState?.[property]?.value; + let newValue = context.newState?.reportedState?.[property]?.value; return currentValue !== newValue; } function changedAnd(property, predicate) { - let currentValue = currentState?.reportedState?.[property]?.value; - let newValue = newState?.reportedState?.[property]?.value; + let currentValue = context.currentState?.reportedState?.[property]?.value; + let newValue = context.newState?.reportedState?.[property]?.value; return (currentValue !== newValue) && predicate(newValue); } function whenChanged(property, callback, or) { - let currentValue = currentState?.reportedState?.[property]?.value; - let newValue = newState?.reportedState?.[property]?.value; + let currentValue = context.currentState?.reportedState?.[property]?.value; + let newValue = context.newState?.reportedState?.[property]?.value; let orResult = false; if (or !== undefined) { @@ -59,7 +59,7 @@ function whenChanged(property, callback, or) { function whenConditionChanged(condition, property, mapper, callback) { const conditionAnnotation = "condition/" + condition; - const hasAnnotation = newState.metadata.annotations?.[conditionAnnotation] !== undefined; + const hasAnnotation = context.newState.metadata.annotations?.[conditionAnnotation] !== undefined; log(`Has annotation: ${hasAnnotation}`); @@ -86,7 +86,7 @@ function whenConditionChanged(condition, property, mapper, callback) { // Should be a system function function sendMessage(thing, message) { log(`Schedule message - Thing: ${thing}, Message: ${JSON.stringify(message, null, 2)}`); - outbox.push({thing, message}); + context.outbox.push({thing, message}); } function sendMerge(thing, merge) { @@ -98,7 +98,7 @@ function sendPatch(thing, patch) { } function addReference(thing) { - const me = newState.metadata.name; + const me = context.newState.metadata.name; const lastUpdate = new Date().toISOString(); sendMerge(thing, { reportedState: { @@ -113,7 +113,7 @@ function addReference(thing) { } function removeReference(thing) { - const me = newState.metadata.name; + const me = context.newState.metadata.name; const lastUpdate = new Date().toISOString(); sendMerge(thing, { reportedState: {