From e0484856354c805540a3ca7b45c6ecbe7c4fb777 Mon Sep 17 00:00:00 2001 From: Sergey Kasmy Date: Fri, 7 Apr 2023 17:47:43 +0200 Subject: [PATCH 1/8] Move action processing to task impl --- fetcher-config/src/jobs/source/http.rs | 1 - fetcher-config/src/jobs/task.rs | 2 +- fetcher-core/src/action.rs | 30 +------------------ fetcher-core/src/task.rs | 40 ++++++++++++++++---------- fetcher/src/args.rs | 4 +-- 5 files changed, 29 insertions(+), 48 deletions(-) diff --git a/fetcher-config/src/jobs/source/http.rs b/fetcher-config/src/jobs/source/http.rs index b19e96ab..f8094345 100644 --- a/fetcher-config/src/jobs/source/http.rs +++ b/fetcher-config/src/jobs/source/http.rs @@ -10,7 +10,6 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, OneOrMany}; use url::Url; -// TODO: use a map #[serde_as] #[derive(Deserialize, Serialize, Clone, Debug)] #[serde(transparent)] diff --git a/fetcher-config/src/jobs/task.rs b/fetcher-config/src/jobs/task.rs index 8e380e26..fcf571bb 100644 --- a/fetcher-config/src/jobs/task.rs +++ b/fetcher-config/src/jobs/task.rs @@ -86,7 +86,7 @@ impl Task { } }) .unwrap_or_else(|| { - // replace with "source.supports_replies()". There's a point to keeping the map even if the sink doesn't support it, e.g. if it's changed from stdout to discord later on + // TODO: replace with "source.supports_replies()". There's a point to keeping the map even if the sink doesn't support it, e.g. if it's changed from stdout to discord later on self.sink .as_ref() .map_or(false, Sink::has_message_id_support) diff --git a/fetcher-core/src/action.rs b/fetcher-core/src/action.rs index 8f7edf94..01644f6d 100644 --- a/fetcher-core/src/action.rs +++ b/fetcher-core/src/action.rs @@ -9,11 +9,7 @@ pub mod filter; pub mod transform; -use self::{ - filter::Filter, - transform::{error::TransformError, Transform}, -}; -use crate::entry::Entry; +use self::{filter::Filter, transform::Transform}; /// An action that modifies a list of entries in some way #[derive(Debug)] @@ -24,30 +20,6 @@ pub enum Action { Transform(Box), } -impl Action { - /// Processes `entries` using the [`Action`] - /// - /// # Errors - /// if there was error transforming `entries`. Filtering out never fails - pub async fn process(&self, mut entries: Vec) -> Result, TransformError> { - match self { - Action::Filter(f) => { - f.filter(&mut entries).await; - Ok(entries) - } - Action::Transform(tr) => { - let mut fully_transformed = Vec::new(); - - for entry in entries { - fully_transformed.extend(tr.transform(entry).await?); - } - - Ok(fully_transformed) - } - } - } -} - impl From> for Action { fn from(filter: Box) -> Self { Action::Filter(filter) diff --git a/fetcher-core/src/task.rs b/fetcher-core/src/task.rs index 10c5c7c8..cadc3362 100644 --- a/fetcher-core/src/task.rs +++ b/fetcher-core/src/task.rs @@ -58,10 +58,7 @@ impl Task { tracing::debug!("Got {} raw entries from the sources", raw.len()); tracing::trace!("Raw entries: {raw:#?}"); - let processed = match &self.actions { - Some(actions) => process_entries(raw, actions).await?, - None => raw, - }; + let processed = self.process_entries(raw).await?; let processed_len = processed.len(); tracing::debug!("{processed_len} entries remained after processing"); @@ -87,6 +84,30 @@ impl Task { Ok(()) } + async fn process_entries( + &mut self, + mut entries: Vec, + ) -> Result, TransformError> { + for act in self.actions.iter().flatten() { + match act { + Action::Filter(f) => { + f.filter(&mut entries).await; + } + Action::Transform(tr) => { + let mut fully_transformed = Vec::new(); + + for entry in entries { + fully_transformed.extend(tr.transform(entry).await?); + } + + entries = fully_transformed; + } + } + } + + Ok(entries) + } + #[tracing::instrument(level = "trace", skip_all, fields(entry_id = ?entry.id))] async fn send_entry(&mut self, entry: Entry) -> Result<(), Error> { tracing::trace!("Sending and marking as read entry"); @@ -135,17 +156,6 @@ impl Task { } } -async fn process_entries( - mut entries: Vec, - actions: &[Action], -) -> Result, TransformError> { - for a in actions { - entries = a.process(entries).await?; - } - - Ok(entries) -} - fn remove_duplicates(entries: Vec) -> Vec { let num_og_entries = entries.len(); diff --git a/fetcher/src/args.rs b/fetcher/src/args.rs index d21bc328..06feda49 100644 --- a/fetcher/src/args.rs +++ b/fetcher/src/args.rs @@ -11,7 +11,7 @@ use fetcher_config::jobs::{ }; use argh::FromArgs; -use color_eyre::{eyre::eyre, Report, Result}; +use color_eyre::{Report, Result}; use std::{path::PathBuf, str::FromStr}; /// Automatic news fetching and parsing @@ -129,7 +129,7 @@ impl FromStr for Setting { } } -/// Wrapper around Job foreign struct to implement `FromStr` from valid job JSON +/// Wrapper around Job foreign struct to implement `FromStr` from valid job in JSON format #[derive(Debug)] pub struct JsonJobs(Vec<(JobName, ConfigJob)>); From 707445860c32ccc93b0e9ed39c3442bc5d69f621 Mon Sep 17 00:00:00 2001 From: Sergey Kasmy Date: Fri, 7 Apr 2023 18:23:47 +0200 Subject: [PATCH 2/8] Make Sink an Action --- fetcher-config/src/jobs/task.rs | 3 +- fetcher-core/src/action.rs | 12 +++ fetcher-core/src/task.rs | 166 ++++++++++++++++---------------- fetcher/src/main.rs | 17 +++- 4 files changed, 112 insertions(+), 86 deletions(-) diff --git a/fetcher-config/src/jobs/task.rs b/fetcher-config/src/jobs/task.rs index fcf571bb..0cd740a1 100644 --- a/fetcher-config/src/jobs/task.rs +++ b/fetcher-config/src/jobs/task.rs @@ -121,11 +121,12 @@ impl Task { (None, None) => None, }; + // FIXME: parse sink as action Ok(CTask { tag, source: self.source.map(|x| x.parse(rf, external)).transpose()?, actions, - sink: self.sink.try_map(|x| x.parse(external))?, + // sink: self.sink.try_map(|x| x.parse(external))?, entry_to_msg_map, }) } diff --git a/fetcher-core/src/action.rs b/fetcher-core/src/action.rs index 01644f6d..84bce310 100644 --- a/fetcher-core/src/action.rs +++ b/fetcher-core/src/action.rs @@ -9,6 +9,8 @@ pub mod filter; pub mod transform; +use crate::sink::Sink; + use self::{filter::Filter, transform::Transform}; /// An action that modifies a list of entries in some way @@ -16,8 +18,12 @@ use self::{filter::Filter, transform::Transform}; pub enum Action { /// Filter out entries Filter(Box), + /// Transform some entries into one or more new entries Transform(Box), + + /// Send entries to the Sink + Sink(Box), } impl From> for Action { @@ -31,3 +37,9 @@ impl From> for Action { Action::Transform(transform) } } + +impl From> for Action { + fn from(sink: Box) -> Self { + Action::Sink(sink) + } +} diff --git a/fetcher-core/src/task.rs b/fetcher-core/src/task.rs index cadc3362..7d3c3432 100644 --- a/fetcher-core/src/task.rs +++ b/fetcher-core/src/task.rs @@ -12,10 +12,10 @@ use std::collections::HashSet; use self::entry_to_msg_map::EntryToMsgMap; use crate::{ - action::{transform::error::TransformError, Action}, + action::Action, entry::Entry, error::Error, - sink::Sink, + sink::{message::MessageId, Sink}, source::Source, }; @@ -33,9 +33,6 @@ pub struct Task { /// A list of optional transformators which to run the data received from the source through pub actions: Option>, - /// The sink where to send the data to - pub sink: Option>, - /// Map of an entry to a message. Used when an entry is a reply to an older entry to be able to show that as a message, too pub entry_to_msg_map: Option, } @@ -49,45 +46,23 @@ impl Task { pub async fn run(&mut self) -> Result<(), Error> { tracing::trace!("Running task"); - let entries = { - let raw = match &mut self.source { - Some(source) => source.fetch().await?, - None => vec![Entry::default()], // return just an empty entry if there is no source - }; - - tracing::debug!("Got {} raw entries from the sources", raw.len()); - tracing::trace!("Raw entries: {raw:#?}"); - - let processed = self.process_entries(raw).await?; - - let processed_len = processed.len(); - tracing::debug!("{processed_len} entries remained after processing"); - tracing::trace!("Entries after processing: {processed:#?}"); - - let deduped = remove_duplicates(processed); - - if processed_len - deduped.len() > 0 { - tracing::info!( - "Removed {} duplicate entries", - processed_len - deduped.len() - ); - } - - deduped + let raw = match &mut self.source { + Some(source) => source.fetch().await?, + None => vec![Entry::default()], // return just an empty entry if there is no source }; - // entries should be sorted newest to oldest but we should send oldest first - for entry in entries.into_iter().rev() { - self.send_entry(entry).await?; - } + tracing::debug!("Got {} raw entries from the sources", raw.len()); + tracing::trace!("Raw entries: {raw:#?}"); + + self.process_entries(raw).await?; Ok(()) } - async fn process_entries( - &mut self, - mut entries: Vec, - ) -> Result, TransformError> { + // TODO: figure out a way to split into several functions to avoid 15 level nesting? + // It's a bit difficult because this function can't be a method because we are borrowing self.actions + // throughout the entire process + async fn process_entries(&mut self, mut entries: Vec) -> Result<(), Error> { for act in self.actions.iter().flatten() { match act { Action::Filter(f) => { @@ -102,53 +77,48 @@ impl Task { entries = fully_transformed; } - } - } + Action::Sink(s) => { + let undeduped_len = entries.len(); + tracing::trace!("Entries to send before dedup: {undeduped_len}"); - Ok(entries) - } - - #[tracing::instrument(level = "trace", skip_all, fields(entry_id = ?entry.id))] - async fn send_entry(&mut self, entry: Entry) -> Result<(), Error> { - tracing::trace!("Sending and marking as read entry"); - - let msgid = match self.sink.as_ref() { - // send message if it isn't empty or raw_contents of they aren't - Some(sink) if !entry.msg.is_empty() || entry.raw_contents.is_some() => { - // use raw_contents as msg.body if the message is empty - let mut msg = entry.msg; - - if msg.is_empty() { - msg.body = Some( - entry - .raw_contents - .expect("raw_contents should be some because of the match guard"), - ); - } + entries = remove_duplicates(entries); - let tag = self.tag.as_deref(); - let reply_to = self - .entry_to_msg_map - .as_mut() - .and_then(|map| map.get_if_exists(entry.reply_to.as_ref())); - - tracing::debug!( - "Sending {msg:?} to a sink with tag {tag:?}, replying to {reply_to:?}" - ); - sink.send(msg, reply_to, tag).await? - } - _ => None, - }; - - if let Some(entry_id) = entry.id { - if let Some(source) = &mut self.source { - tracing::debug!("Marking {entry_id:?} as read"); - source.mark_as_read(&entry_id).await?; - } + if undeduped_len - entries.len() > 0 { + tracing::info!( + "Removed {} duplicate entries before sending", + undeduped_len - entries.len() + ); + } - if let Some((msgid, map)) = msgid.zip(self.entry_to_msg_map.as_mut()) { - tracing::debug!("Associating entry {entry_id:?} with message {msgid:?}"); - map.insert(entry_id, msgid).await?; + tracing::trace!("Sending entries: {entries:#?}"); + + // entries should be sorted newest to oldest but we should send oldest first + for entry in entries.iter().rev() { + // FIXME: remove clone + let msgid = send_entry( + &**s, + self.entry_to_msg_map.as_mut(), + self.tag.as_deref(), + entry.clone(), + ) + .await?; + + // FIXME: remove clone + if let Some(entry_id) = entry.id.clone() { + if let Some(mar) = &mut self.source { + tracing::debug!("Marking {entry_id:?} as read"); + mar.mark_as_read(&entry_id).await?; + } + + if let Some((msgid, map)) = msgid.zip(self.entry_to_msg_map.as_mut()) { + tracing::debug!( + "Associating entry {entry_id:?} with message {msgid:?}" + ); + map.insert(entry_id, msgid).await?; + } + } + } + } } } @@ -156,6 +126,38 @@ impl Task { } } +#[tracing::instrument(level = "trace", skip_all, fields(entry_id = ?entry.id))] +async fn send_entry( + sink: &dyn Sink, + mut entry_to_msg_map: Option<&mut EntryToMsgMap>, + tag: Option<&str>, + entry: Entry, +) -> Result, Error> { + tracing::trace!("Sending entry"); + + // send message if it isn't empty or raw_contents of they aren't + if !entry.msg.is_empty() || entry.raw_contents.is_some() { + // use raw_contents as msg.body if the message is empty + let mut msg = entry.msg; + + if msg.is_empty() { + msg.body = Some( + entry + .raw_contents + .expect("raw_contents should be some because of the match guard"), + ); + } + + let reply_to = entry_to_msg_map + .as_mut() + .and_then(|map| map.get_if_exists(entry.reply_to.as_ref())); + + tracing::debug!("Sending {msg:?} to a sink with tag {tag:?}, replying to {reply_to:?}"); + return Ok(sink.send(msg, reply_to, tag).await?); + } + + Ok(None) +} fn remove_duplicates(entries: Vec) -> Vec { let num_og_entries = entries.len(); diff --git a/fetcher/src/main.rs b/fetcher/src/main.rs index 7fd4b557..01e80b6a 100644 --- a/fetcher/src/main.rs +++ b/fetcher/src/main.rs @@ -138,7 +138,16 @@ async fn async_main() -> Result<()> { job.inner.refresh_time = None; for task in &mut job.inner.tasks { - task.sink = None; + if let Some(actions) = task.actions.take() { + let no_sink_acts = actions + .into_iter() + .filter(|a| !matches!(a, Action::Sink(_))) + .collect::>(); + + if !no_sink_acts.is_empty() { + task.actions = Some(no_sink_acts); + } + } } } @@ -305,8 +314,10 @@ async fn run_command(run_args: args::Run, cx: Context) -> Result<()> { } // don't send anything anywhere, just print - if let Some(sink) = &mut task.sink { - *sink = Box::new(Stdout); + for act in task.actions.iter_mut().flatten() { + if let Action::Sink(sink) = act { + *sink = Box::new(Stdout); + } } // don't save entry to msg map to the fs From fd6a2d388b736051d3e5b868057b45e661e1f6d7 Mon Sep 17 00:00:00 2001 From: Sergey Kasmy Date: Fri, 7 Apr 2023 18:45:54 +0200 Subject: [PATCH 3/8] Move out read marking to a separate function --- fetcher-core/src/task.rs | 47 +++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 15 deletions(-) diff --git a/fetcher-core/src/task.rs b/fetcher-core/src/task.rs index 7d3c3432..2fa05398 100644 --- a/fetcher-core/src/task.rs +++ b/fetcher-core/src/task.rs @@ -13,7 +13,7 @@ use std::collections::HashSet; use self::entry_to_msg_map::EntryToMsgMap; use crate::{ action::Action, - entry::Entry, + entry::{Entry, EntryId}, error::Error, sink::{message::MessageId, Sink}, source::Source, @@ -95,7 +95,7 @@ impl Task { // entries should be sorted newest to oldest but we should send oldest first for entry in entries.iter().rev() { // FIXME: remove clone - let msgid = send_entry( + let msg_id = send_entry( &**s, self.entry_to_msg_map.as_mut(), self.tag.as_deref(), @@ -103,19 +103,14 @@ impl Task { ) .await?; - // FIXME: remove clone - if let Some(entry_id) = entry.id.clone() { - if let Some(mar) = &mut self.source { - tracing::debug!("Marking {entry_id:?} as read"); - mar.mark_as_read(&entry_id).await?; - } - - if let Some((msgid, map)) = msgid.zip(self.entry_to_msg_map.as_mut()) { - tracing::debug!( - "Associating entry {entry_id:?} with message {msgid:?}" - ); - map.insert(entry_id, msgid).await?; - } + if let Some(entry_id) = entry.id.as_ref() { + mark_entry_as_read( + entry_id, + msg_id, + self.source.as_mut(), + self.entry_to_msg_map.as_mut(), + ) + .await?; } } } @@ -158,6 +153,28 @@ async fn send_entry( Ok(None) } + +async fn mark_entry_as_read( + entry_id: &EntryId, + msg_id: Option, + // source: Option<&mut dyn Source>, // TODO: this doesn't work. Why? + source: Option<&mut Box>, + entry_to_msg_map: Option<&mut EntryToMsgMap>, +) -> Result<(), Error> { + // FIXME: remove clone + if let Some(mar) = source { + tracing::debug!("Marking {entry_id:?} as read"); + mar.mark_as_read(entry_id).await?; + } + + if let Some((msgid, map)) = msg_id.zip(entry_to_msg_map) { + tracing::debug!("Associating entry {entry_id:?} with message {msgid:?}"); + map.insert(entry_id.clone(), msgid).await?; + } + + Ok(()) +} + fn remove_duplicates(entries: Vec) -> Vec { let num_og_entries = entries.len(); From 9743e7d0e44c8a0b56a32335d458c3ba43bcf9c8 Mon Sep 17 00:00:00 2001 From: Sergey Kasmy Date: Fri, 7 Apr 2023 19:01:02 +0200 Subject: [PATCH 4/8] Avoid unnecessary entry.clone() when sending to a sink --- .../src/action/transform/entry/print.rs | 2 +- fetcher-core/src/exec.rs | 4 +- fetcher-core/src/sink.rs | 2 +- fetcher-core/src/sink/discord.rs | 4 +- fetcher-core/src/sink/stdout.rs | 4 +- fetcher-core/src/sink/telegram.rs | 16 +++--- fetcher-core/src/task.rs | 53 ++++++++++--------- fetcher/src/main.rs | 2 +- 8 files changed, 46 insertions(+), 41 deletions(-) diff --git a/fetcher-core/src/action/transform/entry/print.rs b/fetcher-core/src/action/transform/entry/print.rs index 27b54c26..5d6b1c0f 100644 --- a/fetcher-core/src/action/transform/entry/print.rs +++ b/fetcher-core/src/action/transform/entry/print.rs @@ -39,7 +39,7 @@ impl TransformEntry for DebugPrint { }; Stdout - .send(msg, None, Some("debug print")) + .send(&msg, None, Some("debug print")) .await .expect("stdout is unavailable"); diff --git a/fetcher-core/src/exec.rs b/fetcher-core/src/exec.rs index 2d278b42..9d26ef63 100644 --- a/fetcher-core/src/exec.rs +++ b/fetcher-core/src/exec.rs @@ -84,11 +84,11 @@ impl Sink for Exec { /// * if the data couldn't be passed to the stdin pipe of the process async fn send( &self, - message: Message, + message: &Message, _reply_to: Option<&MessageId>, _tag: Option<&str>, ) -> Result, SinkError> { - let Some(body) = message.body else { + let Some(body) = &message.body else { return Ok(None); }; diff --git a/fetcher-core/src/sink.rs b/fetcher-core/src/sink.rs index 3fb6c698..703bd8b5 100644 --- a/fetcher-core/src/sink.rs +++ b/fetcher-core/src/sink.rs @@ -32,7 +32,7 @@ pub trait Sink: Debug + Send + Sync { /// Send the message with an optional tag (usually represented as a hashtag) async fn send( &self, - message: Message, + message: &Message, reply_to: Option<&MessageId>, tag: Option<&str>, ) -> Result, SinkError>; diff --git a/fetcher-core/src/sink/discord.rs b/fetcher-core/src/sink/discord.rs index 9e516ad9..2580c4e5 100644 --- a/fetcher-core/src/sink/discord.rs +++ b/fetcher-core/src/sink/discord.rs @@ -70,7 +70,7 @@ impl Discord { impl Sink for Discord { async fn send( &self, - msg: Message, + msg: &Message, reply_to: Option<&MessageId>, tag: Option<&str>, ) -> Result, SinkError> { @@ -173,7 +173,7 @@ impl Sink for Discord { .await .map_err(|e| SinkError::Discord { source: e, - msg: Box::new(msg), + msg: Box::new(msg.clone()), })?; last_message = Some(msg.id); diff --git a/fetcher-core/src/sink/stdout.rs b/fetcher-core/src/sink/stdout.rs index c443f061..9378b2db 100644 --- a/fetcher-core/src/sink/stdout.rs +++ b/fetcher-core/src/sink/stdout.rs @@ -25,7 +25,7 @@ impl Sink for Stdout { /// if there was an error writing to stdout async fn send( &self, - msg: Message, + msg: &Message, _reply_to: Option<&MessageId>, tag: Option<&str>, ) -> Result, SinkError> { @@ -33,7 +33,7 @@ impl Sink for Stdout { "------------------------------\nMessage:\nTitle: {title}\n\nBody:\n{body}\n\nLink: {link}\n\nMedia: {media:?}\n\nTag: {tag:?}\n------------------------------\n", title = msg.title.as_deref().unwrap_or("None"), body = msg.body.as_deref().unwrap_or("None"), - link = msg.link.map(|url| url.as_str().to_owned()).as_deref().unwrap_or("None"), + link = msg.link.as_ref().map(|url| url.as_str().to_owned()).as_deref().unwrap_or("None"), media = msg.media, tag = tag.unwrap_or("None") ).as_bytes()).await.map_err(SinkError::Stdout)?; diff --git a/fetcher-core/src/sink/telegram.rs b/fetcher-core/src/sink/telegram.rs index d9470d7b..f4fb4e59 100644 --- a/fetcher-core/src/sink/telegram.rs +++ b/fetcher-core/src/sink/telegram.rs @@ -72,7 +72,7 @@ impl Sink for Telegram { #[tracing::instrument(level = "debug", skip(message))] async fn send( &self, - message: Message, + message: &Message, reply_to: Option<&MessageId>, tag: Option<&str>, ) -> Result, SinkError> { @@ -99,7 +99,7 @@ impl Telegram { async fn send_processed( &self, mut msg: MessageLengthLimiter<'_>, - media: Option>, + media: Option<&[Media]>, reply_to: Option, ) -> Result, SinkError> { let mut last_message = reply_to; @@ -338,15 +338,15 @@ impl Telegram { } // format and sanitize all message fields. Returns (head, body, tail, media) -fn process_msg( - msg: Message, +fn process_msg<'a>( + msg: &'a Message, tag: Option<&str>, link_location: LinkLocation, ) -> ( Option, Option, Option, - Option>, + Option<&'a [Media]>, ) { let Message { title, @@ -356,8 +356,8 @@ fn process_msg( } = msg; // escape title and body - let title = title.map(|s| teloxide::utils::html::escape(&s)); - let body = body.map(|s| teloxide::utils::html::escape(&s)); + let title = title.as_deref().map(teloxide::utils::html::escape); + let body = body.as_deref().map(teloxide::utils::html::escape); // put the link into the message let (mut head, tail) = match (title, link) { @@ -400,7 +400,7 @@ fn process_msg( }); } - (head, body, tail, media) + (head, body, tail, media.as_deref()) } impl Debug for Telegram { diff --git a/fetcher-core/src/task.rs b/fetcher-core/src/task.rs index 2fa05398..b31e1911 100644 --- a/fetcher-core/src/task.rs +++ b/fetcher-core/src/task.rs @@ -8,17 +8,20 @@ pub mod entry_to_msg_map; -use std::collections::HashSet; - use self::entry_to_msg_map::EntryToMsgMap; use crate::{ action::Action, entry::{Entry, EntryId}, error::Error, - sink::{message::MessageId, Sink}, + sink::{ + message::{Message, MessageId}, + Sink, + }, source::Source, }; +use std::{borrow::Cow, collections::HashSet}; + /// A core primitive of [`fetcher`](`crate`). /// Contains everything from a [`Source`] that allows to fetch some data, to a [`Sink`] that takes that data and sends it somewhere. /// It also contains any transformators @@ -94,12 +97,11 @@ impl Task { // entries should be sorted newest to oldest but we should send oldest first for entry in entries.iter().rev() { - // FIXME: remove clone let msg_id = send_entry( &**s, self.entry_to_msg_map.as_mut(), self.tag.as_deref(), - entry.clone(), + entry, ) .await?; @@ -126,32 +128,35 @@ async fn send_entry( sink: &dyn Sink, mut entry_to_msg_map: Option<&mut EntryToMsgMap>, tag: Option<&str>, - entry: Entry, + entry: &Entry, ) -> Result, Error> { tracing::trace!("Sending entry"); // send message if it isn't empty or raw_contents of they aren't - if !entry.msg.is_empty() || entry.raw_contents.is_some() { - // use raw_contents as msg.body if the message is empty - let mut msg = entry.msg; + if entry.msg.is_empty() && entry.raw_contents.is_none() { + return Ok(None); + } - if msg.is_empty() { - msg.body = Some( + let msg = if entry.msg.is_empty() { + Cow::Owned(Message { + body: Some( entry .raw_contents - .expect("raw_contents should be some because of the match guard"), - ); - } - - let reply_to = entry_to_msg_map - .as_mut() - .and_then(|map| map.get_if_exists(entry.reply_to.as_ref())); - - tracing::debug!("Sending {msg:?} to a sink with tag {tag:?}, replying to {reply_to:?}"); - return Ok(sink.send(msg, reply_to, tag).await?); - } - - Ok(None) + .clone() + .expect("raw_contents should be some because of the early return check"), + ), + ..entry.msg.clone() + }) + } else { + Cow::Borrowed(&entry.msg) + }; + + let reply_to = entry_to_msg_map + .as_mut() + .and_then(|map| map.get_if_exists(entry.reply_to.as_ref())); + + tracing::debug!("Sending {msg:?} to a sink with tag {tag:?}, replying to {reply_to:?}"); + Ok(sink.send(&msg, reply_to, tag).await?) } async fn mark_entry_as_read( diff --git a/fetcher/src/main.rs b/fetcher/src/main.rs index 01e80b6a..a5e31dd2 100644 --- a/fetcher/src/main.rs +++ b/fetcher/src/main.rs @@ -701,7 +701,7 @@ async fn report_error(job_name: &str, err: &str, context: Context) -> Result<()> ..Default::default() }; Telegram::new(bot, admin_chat_id, LinkLocation::default()) - .send(msg, None, Some(job_name)) + .send(&msg, None, Some(job_name)) .await .map_err(fetcher_core::error::Error::Sink)?; From 8b29f36a87346315aa882fe74385291829e0c103 Mon Sep 17 00:00:00 2001 From: Sergey Kasmy Date: Fri, 7 Apr 2023 19:10:58 +0200 Subject: [PATCH 5/8] Update config format to support sink as an action --- fetcher-config/src/jobs/action.rs | 8 +++++++- fetcher-config/src/jobs/job.rs | 7 ------- fetcher-config/src/jobs/sink.rs | 7 ------- fetcher-config/src/jobs/source.rs | 9 ++++++++ fetcher-config/src/jobs/task.rs | 34 ++++++++++--------------------- fetcher-core/src/sink/telegram.rs | 16 ++++++++------- fetcher-core/src/task.rs | 1 - fetcher-core/tests/reply_to.rs | 6 +++--- 8 files changed, 39 insertions(+), 49 deletions(-) diff --git a/fetcher-config/src/jobs/action.rs b/fetcher-config/src/jobs/action.rs index 59d2ae8b..0e9dd226 100644 --- a/fetcher-config/src/jobs/action.rs +++ b/fetcher-config/src/jobs/action.rs @@ -20,6 +20,7 @@ use self::{ contains::Contains, extract::Extract, html::Html, json::Json, remove_html::RemoveHtml, replace::Replace, set::Set, shorten::Shorten, take::Take, trim::Trim, use_as::Use, }; +use super::{external_data::ProvideExternalData, sink::Sink}; use crate::Error; use fetcher_core::{ action::{ @@ -58,6 +59,8 @@ pub enum Action { Replace(Replace), Extract(Extract), RemoveHtml(RemoveHtml), + + Sink(Sink), } // TODO: add media @@ -73,9 +76,10 @@ pub enum Field { } impl Action { - pub fn parse(self, rf: Option) -> Result>, Error> + pub fn parse(self, rf: Option, external: &D) -> Result>, Error> where RF: CReadFilter + 'static, + D: ProvideExternalData + ?Sized, { macro_rules! transform { ($tr:expr) => { @@ -121,6 +125,8 @@ impl Action { Action::Replace(x) => transform!(x.parse()?), Action::Extract(x) => transform!(x.parse()?), Action::RemoveHtml(x) => x.parse()?, + + Action::Sink(x) => vec![CAction::Sink(x.parse(external)?)], }; Ok(Some(act)) diff --git a/fetcher-config/src/jobs/job.rs b/fetcher-config/src/jobs/job.rs index 7f7d9a83..af7a46ac 100644 --- a/fetcher-config/src/jobs/job.rs +++ b/fetcher-config/src/jobs/job.rs @@ -14,7 +14,6 @@ use super::{ external_data::ProvideExternalData, named::{JobName, JobWithTaskNames, TaskName}, read_filter, - sink::Sink, source::Source, task::Task, }; @@ -35,7 +34,6 @@ pub struct Job { pub source: Option, #[serde(rename = "process")] pub actions: Option>, - pub sink: Option, pub entry_to_msg_map_enabled: Option, pub tasks: Option>, @@ -65,7 +63,6 @@ impl Job { tag: self.tag, source: self.source, actions: self.actions, - sink: self.sink, entry_to_msg_map_enabled: self.entry_to_msg_map_enabled, }; @@ -113,10 +110,6 @@ impl Job { task.actions = self.actions.clone(); } - if task.sink.is_none() { - task.sink = self.sink.clone(); - } - if task.entry_to_msg_map_enabled.is_none() { task.entry_to_msg_map_enabled = self.entry_to_msg_map_enabled; } diff --git a/fetcher-config/src/jobs/sink.rs b/fetcher-config/src/jobs/sink.rs index 2a874f8c..6f40a26f 100644 --- a/fetcher-config/src/jobs/sink.rs +++ b/fetcher-config/src/jobs/sink.rs @@ -35,11 +35,4 @@ impl Sink { Self::Stdout => Box::new(CStdout {}), }) } - - pub fn has_message_id_support(&self) -> bool { - match self { - Self::Telegram(_) => true, - Self::Discord(_) | Self::Exec(_) | Self::Stdout => false, // TODO: implement message id support for Discord - } - } } diff --git a/fetcher-config/src/jobs/source.rs b/fetcher-config/src/jobs/source.rs index 93baa2d4..f845f50d 100644 --- a/fetcher-config/src/jobs/source.rs +++ b/fetcher-config/src/jobs/source.rs @@ -74,4 +74,13 @@ impl Source { Self::AlwaysErrors => Box::new(CAlwaysErrors), }) } + + pub fn supports_replies(&self) -> bool { + // Source::Email will support replies in the future + #[allow(clippy::match_like_matches_macro)] + match self { + Self::Twitter(_) => true, + _ => false, + } + } } diff --git a/fetcher-config/src/jobs/task.rs b/fetcher-config/src/jobs/task.rs index 0cd740a1..b1fc0390 100644 --- a/fetcher-config/src/jobs/task.rs +++ b/fetcher-config/src/jobs/task.rs @@ -16,7 +16,6 @@ use super::{ external_data::{ExternalDataResult, ProvideExternalData}, named::{JobName, TaskName}, read_filter, - sink::Sink, source::Source, }; use crate::Error; @@ -31,8 +30,6 @@ pub struct Task { pub source: Option, #[serde(rename = "process")] pub actions: Option>, - // TODO: completely integrate into actions - pub sink: Option, pub entry_to_msg_map_enabled: Option, } @@ -66,31 +63,24 @@ impl Task { let actions = self.actions.try_map(|acts| { itertools::process_results( acts.into_iter() - .filter_map(|act| act.parse(rf.clone()).transpose()), + .filter_map(|act| act.parse(rf.clone(), external).transpose()), |i| i.flatten().collect(), ) })?; - // TODO: replace with match like tag below - let entry_to_msg_map = if self + let entry_to_msg_map_enabled = self .entry_to_msg_map_enabled .tap_some(|b| { - if let Some(sink) = &self.sink { - // TODO: include task name - tracing::info!( - "Overriding entry_to_msg_map_enabled for {} from the default {} to {}", - job, - sink.has_message_id_support(), - b - ); - } + // TODO: include task name + tracing::info!( + "Overriding entry_to_msg_map_enabled for {} from the default to {}", + job, + b + ); }) - .unwrap_or_else(|| { - // TODO: replace with "source.supports_replies()". There's a point to keeping the map even if the sink doesn't support it, e.g. if it's changed from stdout to discord later on - self.sink - .as_ref() - .map_or(false, Sink::has_message_id_support) - }) { + .unwrap_or_else(|| self.source.as_ref().map_or(false, Source::supports_replies)); + + let entry_to_msg_map = if entry_to_msg_map_enabled { match external.entry_to_msg_map(job, task_name) { ExternalDataResult::Ok(v) => Some(v), ExternalDataResult::Unavailable => { @@ -121,12 +111,10 @@ impl Task { (None, None) => None, }; - // FIXME: parse sink as action Ok(CTask { tag, source: self.source.map(|x| x.parse(rf, external)).transpose()?, actions, - // sink: self.sink.try_map(|x| x.parse(external))?, entry_to_msg_map, }) } diff --git a/fetcher-core/src/sink/telegram.rs b/fetcher-core/src/sink/telegram.rs index f4fb4e59..d173377e 100644 --- a/fetcher-core/src/sink/telegram.rs +++ b/fetcher-core/src/sink/telegram.rs @@ -118,7 +118,7 @@ impl Telegram { .expect("should always return a valid split at least once since msg char len is > max_char_limit"); let sent_msg = self - .send_media(&media, Some(&media_caption), last_message) + .send_media(media, Some(&media_caption), last_message) .await?; last_message = sent_msg.and_then(|v| v.first().map(|m| m.id)); } @@ -337,17 +337,19 @@ impl Telegram { } } +type HeadBodyTailMedia<'a> = ( + Option, + Option, + Option, + Option<&'a [Media]>, +); + // format and sanitize all message fields. Returns (head, body, tail, media) fn process_msg<'a>( msg: &'a Message, tag: Option<&str>, link_location: LinkLocation, -) -> ( - Option, - Option, - Option, - Option<&'a [Media]>, -) { +) -> HeadBodyTailMedia<'a> { let Message { title, body, diff --git a/fetcher-core/src/task.rs b/fetcher-core/src/task.rs index b31e1911..ff9a120b 100644 --- a/fetcher-core/src/task.rs +++ b/fetcher-core/src/task.rs @@ -166,7 +166,6 @@ async fn mark_entry_as_read( source: Option<&mut Box>, entry_to_msg_map: Option<&mut EntryToMsgMap>, ) -> Result<(), Error> { - // FIXME: remove clone if let Some(mar) = source { tracing::debug!("Marking {entry_id:?} as read"); mar.mark_as_read(entry_id).await?; diff --git a/fetcher-core/tests/reply_to.rs b/fetcher-core/tests/reply_to.rs index 72e27a85..6111f2d1 100644 --- a/fetcher-core/tests/reply_to.rs +++ b/fetcher-core/tests/reply_to.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use fetcher_core::{ + action::Action, entry::{Entry, EntryId}, error::Error, read_filter::MarkAsRead, @@ -49,7 +50,7 @@ impl Source for DummySource {} impl Sink for DummySink { async fn send( &self, - _message: Message, + _message: &Message, reply_to: Option<&MessageId>, _tag: Option<&str>, ) -> Result, SinkError> { @@ -70,8 +71,7 @@ async fn reply_to() { let mut task = Task { tag: None, source: Some(Box::new(DummySource)), - actions: None, - sink: Some(Box::new(DummySink)), + actions: Some(vec![Action::Sink(Box::new(DummySink))]), entry_to_msg_map: Some(entry_to_msg_map), }; From 68bbad954c5db6566f3fe4996d67996a99fe3dcf Mon Sep 17 00:00:00 2001 From: Sergey Kasmy Date: Sat, 8 Apr 2023 20:03:15 +0200 Subject: [PATCH 6/8] Add "import" action that imports listed actions from CFGDIR/actions --- fetcher-config/src/error.rs | 5 ++ fetcher-config/src/jobs/action.rs | 22 ++++++- fetcher-config/src/jobs/action/html/query.rs | 2 +- fetcher-config/src/jobs/action/import.rs | 49 +++++++++++++++ fetcher-config/src/jobs/external_data.rs | 21 ++++++- fetcher/src/extentions.rs | 7 ++- .../extentions/report_std_error_wrapper.rs | 29 +++++++++ fetcher/src/settings/config.rs | 1 + fetcher/src/settings/config/actions.rs | 60 +++++++++++++++++++ fetcher/src/settings/config/jobs.rs | 1 + .../src/settings/external_data_provider.rs | 18 +++++- 11 files changed, 205 insertions(+), 10 deletions(-) create mode 100644 fetcher-config/src/jobs/action/import.rs create mode 100644 fetcher/src/extentions/report_std_error_wrapper.rs create mode 100644 fetcher/src/settings/config/actions.rs diff --git a/fetcher-config/src/error.rs b/fetcher-config/src/error.rs index f24bd6f8..3b891bdf 100644 --- a/fetcher-config/src/error.rs +++ b/fetcher-config/src/error.rs @@ -4,6 +4,8 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ +pub type Result = std::result::Result; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error(transparent)] @@ -27,6 +29,9 @@ pub enum Error { #[error("Discord bot token isn't set up")] DiscordBotTokenMissing, + #[error("Importing is unavailable")] + ImportingUnavailable, + #[error("Wrong Google OAuth2 token")] GoogleOAuth2WrongToken(#[from] fetcher_core::auth::google::GoogleOAuth2Error), diff --git a/fetcher-config/src/jobs/action.rs b/fetcher-config/src/jobs/action.rs index 0e9dd226..3da5ba44 100644 --- a/fetcher-config/src/jobs/action.rs +++ b/fetcher-config/src/jobs/action.rs @@ -7,6 +7,7 @@ pub mod contains; pub mod extract; pub mod html; +pub mod import; pub mod json; pub mod remove_html; pub mod replace; @@ -17,8 +18,9 @@ pub mod trim; pub mod use_as; use self::{ - contains::Contains, extract::Extract, html::Html, json::Json, remove_html::RemoveHtml, - replace::Replace, set::Set, shorten::Shorten, take::Take, trim::Trim, use_as::Use, + contains::Contains, extract::Extract, html::Html, import::Import, json::Json, + remove_html::RemoveHtml, replace::Replace, set::Set, shorten::Shorten, take::Take, trim::Trim, + use_as::Use, }; use super::{external_data::ProvideExternalData, sink::Sink}; use crate::Error; @@ -34,6 +36,8 @@ use fetcher_core::{ }; use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::RwLock; #[derive(Deserialize, Serialize, Clone, Debug)] #[serde(rename_all = "snake_case", deny_unknown_fields)] @@ -60,7 +64,9 @@ pub enum Action { Extract(Extract), RemoveHtml(RemoveHtml), + // other Sink(Sink), + Import(Import), } // TODO: add media @@ -76,7 +82,11 @@ pub enum Field { } impl Action { - pub fn parse(self, rf: Option, external: &D) -> Result>, Error> + pub fn parse( + self, + rf: Option>>, + external: &D, + ) -> Result>, Error> where RF: CReadFilter + 'static, D: ProvideExternalData + ?Sized, @@ -126,7 +136,13 @@ impl Action { Action::Extract(x) => transform!(x.parse()?), Action::RemoveHtml(x) => x.parse()?, + // other Action::Sink(x) => vec![CAction::Sink(x.parse(external)?)], + Action::Import(x) => match x.parse(rf, external) { + Ok(Some(v)) => v, + // FIXME + other => return other, + }, }; Ok(Some(act)) diff --git a/fetcher-config/src/jobs/action/html/query.rs b/fetcher-config/src/jobs/action/html/query.rs index 0c10deea..819b57e3 100644 --- a/fetcher-config/src/jobs/action/html/query.rs +++ b/fetcher-config/src/jobs/action/html/query.rs @@ -4,7 +4,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use crate::Error; +use crate::error::Error; use fetcher_core::{ action::{transform::entry::html::query as c_query, transform::field::Replace as CReplace}, utils::OptionExt, diff --git a/fetcher-config/src/jobs/action/import.rs b/fetcher-config/src/jobs/action/import.rs new file mode 100644 index 00000000..861f7386 --- /dev/null +++ b/fetcher-config/src/jobs/action/import.rs @@ -0,0 +1,49 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +use crate::{ + error::{Error, Result}, + jobs::external_data::{ExternalDataResult, ProvideExternalData}, +}; +use fetcher_core::{action::Action as CAction, read_filter::ReadFilter as CReadFilter}; + +use itertools::process_results; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct Import(pub String); + +impl Import { + #[allow(clippy::needless_pass_by_value)] + pub fn parse( + self, + rf: Option>>, + external: &D, + ) -> Result>> + where + RF: CReadFilter + 'static, + D: ProvideExternalData + ?Sized, + { + match external.import(&self.0) { + ExternalDataResult::Ok(x) => { + let v = + process_results(x.into_iter().map(|x| x.parse(rf.clone(), external)), |i| { + i.flatten(/* option */).flatten(/* inner vec */).collect::>() + })?; + + if v.is_empty() { + Ok(None) + } else { + Ok(Some(v)) + } + } + ExternalDataResult::Unavailable => Err(Error::ImportingUnavailable), + ExternalDataResult::Err(e) => Err(e.into()), + } + } +} diff --git a/fetcher-config/src/jobs/external_data.rs b/fetcher-config/src/jobs/external_data.rs index d855a601..b947bcfa 100644 --- a/fetcher-config/src/jobs/external_data.rs +++ b/fetcher-config/src/jobs/external_data.rs @@ -5,15 +5,17 @@ */ use super::{ + action::Action, named::{JobName, TaskName}, read_filter::Kind as ReadFilterKind, }; use fetcher_core::{ - self as fcore, read_filter::ReadFilter as CReadFilter, task::entry_to_msg_map::EntryToMsgMap, + auth as c_auth, read_filter::ReadFilter as CReadFilter, task::entry_to_msg_map::EntryToMsgMap, utils::DisplayDebug, }; use std::{ + error::Error as StdError, fmt::{Debug, Display}, io, path::Path, @@ -32,7 +34,7 @@ pub trait ProvideExternalData { ExternalDataResult::Unavailable } - fn google_oauth2(&self) -> ExternalDataResult { + fn google_oauth2(&self) -> ExternalDataResult { ExternalDataResult::Unavailable } fn email_password(&self) -> ExternalDataResult { @@ -61,6 +63,11 @@ pub trait ProvideExternalData { ) -> ExternalDataResult { ExternalDataResult::Unavailable } + + /// import action `name` + fn import(&self, _name: &str) -> ExternalDataResult> { + ExternalDataResult::Unavailable + } } #[derive(thiserror::Error, Debug)] @@ -70,12 +77,22 @@ pub enum ExternalDataError { source: io::Error, payload: Option>, }, + #[error("Incompatible read filter types: in config: \"{expected}\" and found: \"{found}\"{}{}", .payload.is_some().then_some(": ").unwrap_or_default(), if let Some(p) = payload.as_ref() { p as &dyn Display } else { &"" })] ReadFilterIncompatibleTypes { expected: ReadFilterKind, found: ReadFilterKind, payload: Option>, }, + + #[error("Action \"{}\" not found", .0)] + ActionNotFound(String), + + #[error("Can't parse action \"{name}\": {err}")] + ActionParsingError { + name: String, + err: Box, + }, } impl From> for ExternalDataResult { diff --git a/fetcher/src/extentions.rs b/fetcher/src/extentions.rs index 50f03cf8..3e5cc550 100644 --- a/fetcher/src/extentions.rs +++ b/fetcher/src/extentions.rs @@ -5,7 +5,10 @@ */ pub mod error_chain; +pub mod report_std_error_wrapper; pub mod slice_display; -pub use self::error_chain::ErrorChainExt; -pub use self::slice_display::SliceDisplayExt; +pub use self::{ + error_chain::ErrorChainExt, report_std_error_wrapper::IntoStdErrorExt, + slice_display::SliceDisplayExt, +}; diff --git a/fetcher/src/extentions/report_std_error_wrapper.rs b/fetcher/src/extentions/report_std_error_wrapper.rs new file mode 100644 index 00000000..3f954bc2 --- /dev/null +++ b/fetcher/src/extentions/report_std_error_wrapper.rs @@ -0,0 +1,29 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +use std::{error::Error, fmt::Display}; + +use color_eyre::Report; + +pub trait IntoStdErrorExt { + fn into_std_error(self) -> Box; +} + +impl IntoStdErrorExt for Report { + fn into_std_error(self) -> Box { + Box::new(ReportStdErrorWrapper(self)) + } +} + +#[derive(Debug)] +pub struct ReportStdErrorWrapper(pub Report); + +impl Error for ReportStdErrorWrapper {} +impl Display for ReportStdErrorWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:#}", self.0) + } +} diff --git a/fetcher/src/settings/config.rs b/fetcher/src/settings/config.rs index 67b68e31..d78992f2 100644 --- a/fetcher/src/settings/config.rs +++ b/fetcher/src/settings/config.rs @@ -4,6 +4,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ +pub mod actions; pub mod jobs; pub mod templates; diff --git a/fetcher/src/settings/config/actions.rs b/fetcher/src/settings/config/actions.rs new file mode 100644 index 00000000..b48e1357 --- /dev/null +++ b/fetcher/src/settings/config/actions.rs @@ -0,0 +1,60 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +use super::CONFIG_FILE_EXT; +use crate::settings::context::StaticContext as Context; +use fetcher_config::jobs::action::Action as ActionConfig; + +use color_eyre::{eyre::eyre, Result}; +use figment::{ + providers::{Format, Yaml}, + Figment, +}; +use std::path::Path; + +const ACTIONS_DIR: &str = "actions"; + +/// Find all actions with `name` in the default actions paths +/// +/// # Errors +/// if the found actions path couldn't be read +#[tracing::instrument(level = "debug", name = "action")] +pub fn find(name: &str, context: Context) -> Result>> { + for actions_dir_path in context.conf_paths.iter().map(|p| p.join(ACTIONS_DIR)) { + if let Some(actions) = find_in(&actions_dir_path, name)? { + return Ok(Some(actions)); + } + } + + Ok(None) +} + +/// Find all actions with `name` in `actions_path`. +/// Returns Some(ActionConfig) if the action was found in the directory, None otherwise +/// +/// # Errors +/// * if the path couldn't be read +/// * if the config exists at `action_path` but is invalid +pub fn find_in(action_path: &Path, name: &str) -> Result>> { + tracing::trace!("Searching for an action {name:?} in {action_path:?}"); + let path = action_path.join(name).with_extension(CONFIG_FILE_EXT); + + if !path.exists() { + tracing::trace!("{path:?} doesn't exist"); + return Ok(None); + } + + if !path.is_file() { + return Err(eyre!( + "Action \"{name}\" exists at {} but is not a file", + path.display() + )); + } + + let action_config: Vec = Figment::new().merge(Yaml::file(path)).extract()?; + + Ok(Some(action_config)) +} diff --git a/fetcher/src/settings/config/jobs.rs b/fetcher/src/settings/config/jobs.rs index 713cc73f..cfc5fa77 100644 --- a/fetcher/src/settings/config/jobs.rs +++ b/fetcher/src/settings/config/jobs.rs @@ -152,6 +152,7 @@ pub fn get_all_from<'a>( pub fn get(path: &Path, name: JobName, cx: Context) -> Result> { tracing::trace!("Parsing a job from file"); + // TODO: use extract_inner() let TemplatesField { templates } = Figment::new().merge(Yaml::file(path)).extract()?; let mut full_conf = Figment::new(); diff --git a/fetcher/src/settings/external_data_provider.rs b/fetcher/src/settings/external_data_provider.rs index 9c11956f..7a38bb41 100644 --- a/fetcher/src/settings/external_data_provider.rs +++ b/fetcher/src/settings/external_data_provider.rs @@ -4,9 +4,12 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use super::{context::StaticContext, data}; +use crate::extentions::IntoStdErrorExt; + +use super::{config, context::StaticContext, data}; use fetcher_config::jobs::{ - external_data::{ExternalDataResult, ProvideExternalData}, + action::Action as ActionConfig, + external_data::{ExternalDataError, ExternalDataResult, ProvideExternalData}, named::{JobName, TaskName}, read_filter::Kind as ReadFilterKind, }; @@ -55,4 +58,15 @@ impl ProvideExternalData for ExternalDataFromDataDir { ) -> ExternalDataResult { data::runtime_external_save::entry_to_msg_map::get(job, task, self.cx).into() } + + fn import(&self, name: &str) -> ExternalDataResult> { + match config::actions::find(name, self.cx) { + Ok(Some(x)) => ExternalDataResult::Ok(x), + Ok(None) => ExternalDataResult::Err(ExternalDataError::ActionNotFound(name.to_owned())), + Err(e) => ExternalDataResult::Err(ExternalDataError::ActionParsingError { + name: name.to_owned(), + err: e.into_std_error(), + }), + } + } } From 78fb173e76e171334ec92ca887594d7bcccab64a Mon Sep 17 00:00:00 2001 From: Sergey Kasmy Date: Sun, 9 Apr 2023 17:56:53 +0200 Subject: [PATCH 7/8] Re-add sink task option that just appends itself to the list of actions --- fetcher-config/src/jobs/job.rs | 7 +++++++ fetcher-config/src/jobs/task.rs | 16 ++++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/fetcher-config/src/jobs/job.rs b/fetcher-config/src/jobs/job.rs index af7a46ac..3fcd82c4 100644 --- a/fetcher-config/src/jobs/job.rs +++ b/fetcher-config/src/jobs/job.rs @@ -14,6 +14,7 @@ use super::{ external_data::ProvideExternalData, named::{JobName, JobWithTaskNames, TaskName}, read_filter, + sink::Sink, source::Source, task::Task, }; @@ -35,6 +36,7 @@ pub struct Job { #[serde(rename = "process")] pub actions: Option>, pub entry_to_msg_map_enabled: Option, + pub sink: Option, pub tasks: Option>, pub refresh: Option, @@ -64,6 +66,7 @@ impl Job { source: self.source, actions: self.actions, entry_to_msg_map_enabled: self.entry_to_msg_map_enabled, + sink: self.sink, }; let job = CJob { @@ -113,6 +116,10 @@ impl Job { if task.entry_to_msg_map_enabled.is_none() { task.entry_to_msg_map_enabled = self.entry_to_msg_map_enabled; } + + if task.sink.is_none() { + task.sink = self.sink.clone(); + } } // FIXME: broken. Filtering can remove tasks from the tasks map. Then, when checking if we should pass the task name as a tag, we ignore the fact that we could've had more tasks in the job and skip the tag which we shouldn't do diff --git a/fetcher-config/src/jobs/task.rs b/fetcher-config/src/jobs/task.rs index b1fc0390..37e56b79 100644 --- a/fetcher-config/src/jobs/task.rs +++ b/fetcher-config/src/jobs/task.rs @@ -16,10 +16,11 @@ use super::{ external_data::{ExternalDataResult, ProvideExternalData}, named::{JobName, TaskName}, read_filter, + sink::Sink, source::Source, }; use crate::Error; -use fetcher_core::{task::Task as CTask, utils::OptionExt}; +use fetcher_core::{action::Action as CAction, task::Task as CTask, utils::OptionExt}; #[derive(Deserialize, Serialize, Debug)] #[serde(deny_unknown_fields)] @@ -31,6 +32,7 @@ pub struct Task { #[serde(rename = "process")] pub actions: Option>, pub entry_to_msg_map_enabled: Option, + pub sink: Option, } impl Task { @@ -61,11 +63,17 @@ impl Task { }; let actions = self.actions.try_map(|acts| { - itertools::process_results( + let mut acts = itertools::process_results( acts.into_iter() .filter_map(|act| act.parse(rf.clone(), external).transpose()), - |i| i.flatten().collect(), - ) + |i| i.flatten().collect::>(), + )?; + + if let Some(sink) = self.sink { + acts.push(CAction::Sink(sink.parse(external)?)); + } + + Ok::<_, Error>(acts) })?; let entry_to_msg_map_enabled = self From 0058fbcd3bb6b51f0744910c93a41b1de41e5852 Mon Sep 17 00:00:00 2001 From: Sergey Kasmy Date: Sun, 9 Apr 2023 18:05:49 +0200 Subject: [PATCH 8/8] Add sink & import actions to config-format.md --- config-format.md | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/config-format.md b/config-format.md index 4dc43e9d..d79cb99d 100644 --- a/config-format.md +++ b/config-format.md @@ -10,6 +10,7 @@ read_filter_type: newer_than_read # XO. either: # * keep only the last read entry and filter out all "older" than it # * notify when the entry is updated read_filter_type: not_present_in_read_list # XO. keep a list of all items read and filter out all that are present in it +template: # copy-paste the contents of $XDG_CONFIG_PATH/fetcher/templates/.yml. Field re-definition overrides the old value. tasks: foo: tag: # mark the message with a tag. That is usually a hashtag on top of the message or some kind of subscript in it. If a job has multiple tasks, it is automatically set to the task's name @@ -55,6 +56,17 @@ tasks: # * mark_as_read: mark read emails as read # * delete: move the emails to the trash bin. Exact behavior depends on the email provider in question. Gmail archives the emails by default instead process: # all actions are optional, so don't need to be marked with O + - import: # import a list of actions from $XDG_CONFIG_PATH/fetcher/actions/.yml + - sink: + discord: # X. Send as a discord message + user: # X. The user to DM to. This is not a handle (i.e. not User#1234) but rather the ID (see below). + channel: # X. The channel to send messages to + # The ID of a user or a channel can be gotten after enabling developer settings in Discord (under Settings -> Advanced) and rightclicking on a user/channel and selecting "Copy ID" + telegram: # X + chat_id: # Either the private chat (group/channel) ID that can be gotten using bots or the public handle of a chat. DM aren't supported yet. + link_location: # O. Where to put the link. Either as try to put it in the title if it's present, or a separate "Link" button under the message + exec: # X. Start a process and write the body of the message to its stdin + stdout # X. Just print to stdout. Isn't really useful but it is the default when run with --dry-run - read_filter # filter out already read entries using `read_filter_type` stradegy - take: # take `num` entries from either the newest or the oldest and ignore the rest : @@ -162,15 +174,8 @@ tasks: # debug related actions: - caps # make the message title uppercase - debug_print # debug print the entire contents of the entry + sink: - discord: # X. Send as a discord message - user: # X. The user to DM to. This is not a handle (i.e. not User#1234) but rather the ID (see below). - channel: # X. The channel to send messages to - # The ID of a user or a channel can be gotten after enabling developer settings in Discord (under Settings -> Advanced) and rightclicking on a user/channel and selecting "Copy ID" - telegram: # X - chat_id: # Either the private chat (group/channel) ID that can be gotten using bots or the public handle of a chat. DM aren't supported yet. - link_location: # O. Where to put the link. Either as try to put it in the title if it's present, or a separate "Link" button under the message - exec: # X. Start a process and write the body of the message to its stdin - stdout # X. Just print to stdout. Isn't really useful but it is the default when run with --dry-run + ... # same as process: sink. Just appends itself to the process list. This is useful when the process list is set in a template and thus can't be overriden ```