Skip to content

Commit

Permalink
Merge pull request #45 from PoorRican/generic_inputs
Browse files Browse the repository at this point in the history
Generic I/O interfaces. Elementary control system.
  • Loading branch information
PoorRican committed Feb 16, 2023
2 parents 8e1ce44 + 5255c1e commit f43d42d
Show file tree
Hide file tree
Showing 30 changed files with 996 additions and 316 deletions.
13 changes: 13 additions & 0 deletions src/action.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
mod command;
mod subscriber;
mod publisher;
mod types;
mod subscribers;
mod commands;

pub use command::*;
pub use commands::*;
pub use subscriber::*;
pub use subscribers::*;
pub use publisher::*;
pub use types::*;
9 changes: 9 additions & 0 deletions src/action/command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use crate::io::IOEvent;


pub type CommandType = Box<dyn Command>;

/// Abstraction for single atomic output operation
pub trait Command {
fn execute(&self) -> Option<IOEvent>;
}
3 changes: 3 additions & 0 deletions src/action/commands.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod notifier;

pub use notifier::*;
32 changes: 32 additions & 0 deletions src/action/commands/notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use std::sync::{Arc, Mutex};
use crate::action::{Command, CommandType};
use crate::helpers::{Deferrable, Deferred};
use crate::io::IOEvent;

/// Simple command for printing a message to stdout
pub struct SimpleNotifier {
msg: String
}

impl SimpleNotifier {
pub fn new(msg: String) -> Self {
Self { msg }
}
pub fn command(msg: String) -> CommandType {
Box::new(Self::new(msg))
}
}

impl Command for SimpleNotifier {
fn execute(&self) -> Option<IOEvent> {
println!("{}", self.msg);
None
}
}

impl Deferrable for SimpleNotifier {
type Inner = CommandType;
fn deferred(self) -> Deferred<Self::Inner> {
Arc::new(Mutex::new(Box::new(self)))
}
}
59 changes: 59 additions & 0 deletions src/action/publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! Implement observer design pattern to implement control system based off of polling of `Input` objects
//!
//! # Description
//! The goal of a dedicated `Publisher` implementation being stored as a field is to add a layer of indirection
//! between `Input` and `Output` to serve as a bridge. Both input and output should be unaware of the other.
//! However, events generated by Input::read() are routed to Publisher::notify() which is propagated to
//! Subscriber implementations and therefore events are passed to outputs.
//!
//! `Publisher` objects should be stored a struct which implements `Input`. When `Input::read()` is called,
//! `Input::publisher().notify()` should also be called as well. `notify()` should thereby call
//! `Subscriber::evaluate()` on any listeners.

use std::sync::{Arc, Mutex};
use crate::action::SubscriberType;
use crate::helpers::{Deferrable, Deferred};
use crate::io::IOEvent;

pub trait NamedRoutine {
fn name(&self) -> String;
}

/// Trait to implement on Input objects
pub trait Publisher: Deferrable {
fn subscribers(&self) -> &[Deferred<SubscriberType>];
fn subscribe(&mut self, subscriber: Deferred<SubscriberType>);

fn notify(&mut self, data: &IOEvent);
}

/// Concrete instance of publisher object
#[derive(Default, Clone)]
pub struct PublisherInstance {
subscribers: Vec<Deferred<SubscriberType>>
}

impl Publisher for PublisherInstance {
fn subscribers(&self) -> &[Deferred<SubscriberType>] { &self.subscribers }

fn subscribe(&mut self, subscriber: Deferred<SubscriberType>) {
self.subscribers.push(subscriber)
}

/// Call `Subscriber::evaluate()` on all associated `Subscriber` implementations.
fn notify(&mut self, data: &IOEvent) {
for subscriber in self.subscribers.iter_mut() {
// TODO: `IOEvent` shall be sent to `OutputDevice` and shall be logged
// TODO: results should be aggregated
subscriber.lock().unwrap().evaluate(data);
}
}
}

impl Deferrable for PublisherInstance {
type Inner = PublisherInstance;

fn deferred(self) -> Deferred<Self::Inner> {
Arc::new(Mutex::new(self))
}
}
31 changes: 31 additions & 0 deletions src/action/subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use crate::helpers::Deferred;
use crate::action::PublisherInstance;
use crate::io::IOEvent;

pub type SubscriberType = Box<dyn SubscriberStrategy>;

/// Subscriber to Publisher which enacts a dynamic strategy
///
/// The relationship between publisher and subscriber is dually-linked as
/// publisher has a reference to subscriber via `subscribers` and subscriber
/// has a reference via `publisher()`.
///
/// During the build process (handled by `ActionBuilder`), a publisher is not
/// associated with the initialized subscriber. In this state, it is considered an
/// "orphan" and can be checked via `orphan()`. During the build state, `add_publisher()`
/// creates the reverse association.
pub trait SubscriberStrategy {
fn name(&self) -> String;
/// Primary method to evaluate incoming data
/// Returned IOEvent should be logged
fn evaluate(&mut self, data: &IOEvent) -> Option<IOEvent>;

fn publisher(&self) -> &Option<Deferred<PublisherInstance>>;
fn add_publisher(&mut self, publisher: Deferred<PublisherInstance>);
fn orphan(&self) -> bool {
match self.publisher() {
Some(_) => true,
None => false
}
}
}
5 changes: 5 additions & 0 deletions src/action/subscribers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod pid;
mod threshold;

pub use pid::*;
pub use threshold::*;
39 changes: 39 additions & 0 deletions src/action/subscribers/pid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use crate::action::{ThresholdMonitor, PublisherInstance, SubscriberStrategy};
use crate::helpers::Deferred;
use crate::io::{IOEvent, IOType, OutputType, };

/// Subscriber routine to actively maintain an arbitrary threshold using PID
pub struct PIDMonitor {
name: String,
threshold: IOType,
publisher: Option<Deferred<PublisherInstance>>,

output: Deferred<OutputType>,
}

impl ThresholdMonitor for PIDMonitor {
fn threshold(&self) -> IOType {
self.threshold
}
}

impl SubscriberStrategy for PIDMonitor {
fn name(&self) -> String {
self.name.clone()
}
fn evaluate(&mut self, data: &IOEvent) -> Option<IOEvent> {
todo!()
// maintain PID
}

fn publisher(&self) -> &Option<Deferred<PublisherInstance>> {
&self.publisher
}

fn add_publisher(&mut self, publisher: Deferred<PublisherInstance>) {
match self.publisher {
None => self.publisher = Some(publisher),
Some(_) => ()
}
}
}
93 changes: 93 additions & 0 deletions src/action/subscribers/threshold.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::sync::{Arc, Mutex};
use crate::action::{BaseCommandFactory, Command, SimpleNotifier, PublisherInstance, SubscriberStrategy, SubscriberType};
use crate::helpers::{Deferrable, Deferred};
use crate::io::{IOEvent, IOType};

/// Generic command that monitors a threshold
pub trait ThresholdMonitor: SubscriberStrategy {
fn threshold(&self) -> IOType;
}

#[derive(Debug, Clone)]
/// Enum used by `ThresholdMonitor` logic
/// Controls when comparison of external value and threshold returns `true`.
pub enum Comparison {
GT,
LT,
}

/// Notify if threshold is exceeded
#[derive(Clone)]
pub struct ThresholdNotifier {
name: String,
threshold: IOType,
publisher: Option<Deferred<PublisherInstance>>,

trigger: Comparison,
factory: BaseCommandFactory,
}

impl ThresholdNotifier {
pub fn new(
name: String,
threshold: IOType,
trigger: Comparison,
factory: BaseCommandFactory
) -> Self {
Self {
name,
threshold,
publisher: None,
trigger,
factory,
}
}
}

impl ThresholdMonitor for ThresholdNotifier {
fn threshold(&self) -> IOType {
self.threshold
}
}

impl SubscriberStrategy for ThresholdNotifier {
fn name(&self) -> String {
self.name.clone()
}

fn evaluate(&mut self, event: &IOEvent) -> Option<IOEvent> {
let value = event.data.value;
let exceed = match &self.trigger {
&Comparison::GT => value >= self.threshold,
&Comparison::LT => value <= self.threshold,
};
if exceed {
// insert command here
let msg = format!("{} exceeded {}", value, self.threshold);
let command = SimpleNotifier::new(msg);
// Some(event.invert(1.0)) // re-enable this when dynamic IOTypes have been implemented
command.execute()
} else {
None
}
}

fn publisher(&self) -> &Option<Deferred<PublisherInstance>> {
&self.publisher
}

fn add_publisher(&mut self, publisher: Deferred<PublisherInstance>) {
match self.publisher {
None => self.publisher = Some(publisher),
Some(_) => ()
}
}
}

impl Deferrable for ThresholdNotifier {
type Inner = SubscriberType;

fn deferred(self) -> Deferred<Self::Inner> {
return Arc::new(Mutex::new(Box::new(self)));
}
}
15 changes: 15 additions & 0 deletions src/action/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! Type aliases for functions and closures to assist `ActionBuilder`.
//! These aliases allow for strongly structuring the dynamic initialization of subscriber/command instances.
use crate::action::{Command, Comparison, PublisherInstance, SubscriberType, CommandType};
use crate::helpers::Deferred;
use crate::io::IOType;

// Command Factories
pub type BaseCommandFactory = fn(IOType, IOType) -> CommandType;

// **********************
// Subscriber Factories *
// **********************

/// Type alias for a function or closure that returns a `ThresholdNotifier` instance
pub type ThresholdNotifierFactory = fn(String, IOType, Comparison, BaseCommandFactory) -> Deferred<SubscriberType>;
48 changes: 48 additions & 0 deletions src/builders.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
mod action;

pub use action::*;

use std::sync::{Arc, Mutex};
use crate::action::{BaseCommandFactory, Comparison, ThresholdNotifier, SimpleNotifier, Publisher,
PublisherInstance};
use crate::helpers::{Deferrable, Deferred};
use crate::io::{Device, GenericInput, IdType, InputType, IOKind, IOType};
use crate::settings::Settings;
use crate::storage::OwnedLog;

/// Init input and `OwnedLog`, then set owner on log. Return deferred log and deferred input.
pub fn input_log_builder(
name: &str,
id: &IdType,
kind: &Option<IOKind>,
settings: Option<Arc<Settings>>,
) -> (Deferred<OwnedLog>, Deferred<InputType>) {
let log = Arc::new(Mutex::new(OwnedLog::new(*id, settings)));
let input = GenericInput::new(name.to_string(), *id, *kind, log.clone());

let wrapped = input.deferred();
log.lock().unwrap().set_owner(wrapped.clone());

(log, wrapped)
}

pub fn pubsub_builder(input: Deferred<InputType>, name: String, threshold: IOType, trigger: Comparison,
factory: BaseCommandFactory) {
let binding = PublisherInstance::default();
let publisher = binding.deferred();

// attempt to add publisher. Existing publisher is not overwritten.
let _ = input.try_lock().unwrap().add_publisher(publisher.clone());

let notifier = ThresholdNotifier::new(
name.clone(),
threshold,
trigger,
factory,
);
let deferred = notifier.deferred();
let mut binding = publisher.try_lock().unwrap();
binding.subscribe(deferred);

println!("Initialized and setup up subscriber: {}", name);
}
Loading

0 comments on commit f43d42d

Please sign in to comment.