diff --git a/config-format.md b/config-format.md index 4dc43e9d..d79cb99d 100644 --- a/config-format.md +++ b/config-format.md @@ -10,6 +10,7 @@ read_filter_type: newer_than_read # XO. either: # * keep only the last read entry and filter out all "older" than it # * notify when the entry is updated read_filter_type: not_present_in_read_list # XO. keep a list of all items read and filter out all that are present in it +template: # copy-paste the contents of $XDG_CONFIG_PATH/fetcher/templates/.yml. Field re-definition overrides the old value. tasks: foo: tag: # mark the message with a tag. That is usually a hashtag on top of the message or some kind of subscript in it. If a job has multiple tasks, it is automatically set to the task's name @@ -55,6 +56,17 @@ tasks: # * mark_as_read: mark read emails as read # * delete: move the emails to the trash bin. Exact behavior depends on the email provider in question. Gmail archives the emails by default instead process: # all actions are optional, so don't need to be marked with O + - import: # import a list of actions from $XDG_CONFIG_PATH/fetcher/actions/.yml + - sink: + discord: # X. Send as a discord message + user: # X. The user to DM to. This is not a handle (i.e. not User#1234) but rather the ID (see below). + channel: # X. The channel to send messages to + # The ID of a user or a channel can be gotten after enabling developer settings in Discord (under Settings -> Advanced) and rightclicking on a user/channel and selecting "Copy ID" + telegram: # X + chat_id: # Either the private chat (group/channel) ID that can be gotten using bots or the public handle of a chat. DM aren't supported yet. + link_location: # O. Where to put the link. Either as try to put it in the title if it's present, or a separate "Link" button under the message + exec: # X. Start a process and write the body of the message to its stdin + stdout # X. Just print to stdout. Isn't really useful but it is the default when run with --dry-run - read_filter # filter out already read entries using `read_filter_type` stradegy - take: # take `num` entries from either the newest or the oldest and ignore the rest : @@ -162,15 +174,8 @@ tasks: # debug related actions: - caps # make the message title uppercase - debug_print # debug print the entire contents of the entry + sink: - discord: # X. Send as a discord message - user: # X. The user to DM to. This is not a handle (i.e. not User#1234) but rather the ID (see below). - channel: # X. The channel to send messages to - # The ID of a user or a channel can be gotten after enabling developer settings in Discord (under Settings -> Advanced) and rightclicking on a user/channel and selecting "Copy ID" - telegram: # X - chat_id: # Either the private chat (group/channel) ID that can be gotten using bots or the public handle of a chat. DM aren't supported yet. - link_location: # O. Where to put the link. Either as try to put it in the title if it's present, or a separate "Link" button under the message - exec: # X. Start a process and write the body of the message to its stdin - stdout # X. Just print to stdout. Isn't really useful but it is the default when run with --dry-run + ... # same as process: sink. Just appends itself to the process list. This is useful when the process list is set in a template and thus can't be overriden ``` diff --git a/fetcher-config/src/error.rs b/fetcher-config/src/error.rs index f24bd6f8..3b891bdf 100644 --- a/fetcher-config/src/error.rs +++ b/fetcher-config/src/error.rs @@ -4,6 +4,8 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ +pub type Result = std::result::Result; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error(transparent)] @@ -27,6 +29,9 @@ pub enum Error { #[error("Discord bot token isn't set up")] DiscordBotTokenMissing, + #[error("Importing is unavailable")] + ImportingUnavailable, + #[error("Wrong Google OAuth2 token")] GoogleOAuth2WrongToken(#[from] fetcher_core::auth::google::GoogleOAuth2Error), diff --git a/fetcher-config/src/jobs/action.rs b/fetcher-config/src/jobs/action.rs index 59d2ae8b..3da5ba44 100644 --- a/fetcher-config/src/jobs/action.rs +++ b/fetcher-config/src/jobs/action.rs @@ -7,6 +7,7 @@ pub mod contains; pub mod extract; pub mod html; +pub mod import; pub mod json; pub mod remove_html; pub mod replace; @@ -17,9 +18,11 @@ pub mod trim; pub mod use_as; use self::{ - contains::Contains, extract::Extract, html::Html, json::Json, remove_html::RemoveHtml, - replace::Replace, set::Set, shorten::Shorten, take::Take, trim::Trim, use_as::Use, + contains::Contains, extract::Extract, html::Html, import::Import, json::Json, + remove_html::RemoveHtml, replace::Replace, set::Set, shorten::Shorten, take::Take, trim::Trim, + use_as::Use, }; +use super::{external_data::ProvideExternalData, sink::Sink}; use crate::Error; use fetcher_core::{ action::{ @@ -33,6 +36,8 @@ use fetcher_core::{ }; use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::RwLock; #[derive(Deserialize, Serialize, Clone, Debug)] #[serde(rename_all = "snake_case", deny_unknown_fields)] @@ -58,6 +63,10 @@ pub enum Action { Replace(Replace), Extract(Extract), RemoveHtml(RemoveHtml), + + // other + Sink(Sink), + Import(Import), } // TODO: add media @@ -73,9 +82,14 @@ pub enum Field { } impl Action { - pub fn parse(self, rf: Option) -> Result>, Error> + pub fn parse( + self, + rf: Option>>, + external: &D, + ) -> Result>, Error> where RF: CReadFilter + 'static, + D: ProvideExternalData + ?Sized, { macro_rules! transform { ($tr:expr) => { @@ -121,6 +135,14 @@ impl Action { Action::Replace(x) => transform!(x.parse()?), Action::Extract(x) => transform!(x.parse()?), Action::RemoveHtml(x) => x.parse()?, + + // other + Action::Sink(x) => vec![CAction::Sink(x.parse(external)?)], + Action::Import(x) => match x.parse(rf, external) { + Ok(Some(v)) => v, + // FIXME + other => return other, + }, }; Ok(Some(act)) diff --git a/fetcher-config/src/jobs/action/html/query.rs b/fetcher-config/src/jobs/action/html/query.rs index 0c10deea..819b57e3 100644 --- a/fetcher-config/src/jobs/action/html/query.rs +++ b/fetcher-config/src/jobs/action/html/query.rs @@ -4,7 +4,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use crate::Error; +use crate::error::Error; use fetcher_core::{ action::{transform::entry::html::query as c_query, transform::field::Replace as CReplace}, utils::OptionExt, diff --git a/fetcher-config/src/jobs/action/import.rs b/fetcher-config/src/jobs/action/import.rs new file mode 100644 index 00000000..861f7386 --- /dev/null +++ b/fetcher-config/src/jobs/action/import.rs @@ -0,0 +1,49 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +use crate::{ + error::{Error, Result}, + jobs::external_data::{ExternalDataResult, ProvideExternalData}, +}; +use fetcher_core::{action::Action as CAction, read_filter::ReadFilter as CReadFilter}; + +use itertools::process_results; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct Import(pub String); + +impl Import { + #[allow(clippy::needless_pass_by_value)] + pub fn parse( + self, + rf: Option>>, + external: &D, + ) -> Result>> + where + RF: CReadFilter + 'static, + D: ProvideExternalData + ?Sized, + { + match external.import(&self.0) { + ExternalDataResult::Ok(x) => { + let v = + process_results(x.into_iter().map(|x| x.parse(rf.clone(), external)), |i| { + i.flatten(/* option */).flatten(/* inner vec */).collect::>() + })?; + + if v.is_empty() { + Ok(None) + } else { + Ok(Some(v)) + } + } + ExternalDataResult::Unavailable => Err(Error::ImportingUnavailable), + ExternalDataResult::Err(e) => Err(e.into()), + } + } +} diff --git a/fetcher-config/src/jobs/external_data.rs b/fetcher-config/src/jobs/external_data.rs index d855a601..b947bcfa 100644 --- a/fetcher-config/src/jobs/external_data.rs +++ b/fetcher-config/src/jobs/external_data.rs @@ -5,15 +5,17 @@ */ use super::{ + action::Action, named::{JobName, TaskName}, read_filter::Kind as ReadFilterKind, }; use fetcher_core::{ - self as fcore, read_filter::ReadFilter as CReadFilter, task::entry_to_msg_map::EntryToMsgMap, + auth as c_auth, read_filter::ReadFilter as CReadFilter, task::entry_to_msg_map::EntryToMsgMap, utils::DisplayDebug, }; use std::{ + error::Error as StdError, fmt::{Debug, Display}, io, path::Path, @@ -32,7 +34,7 @@ pub trait ProvideExternalData { ExternalDataResult::Unavailable } - fn google_oauth2(&self) -> ExternalDataResult { + fn google_oauth2(&self) -> ExternalDataResult { ExternalDataResult::Unavailable } fn email_password(&self) -> ExternalDataResult { @@ -61,6 +63,11 @@ pub trait ProvideExternalData { ) -> ExternalDataResult { ExternalDataResult::Unavailable } + + /// import action `name` + fn import(&self, _name: &str) -> ExternalDataResult> { + ExternalDataResult::Unavailable + } } #[derive(thiserror::Error, Debug)] @@ -70,12 +77,22 @@ pub enum ExternalDataError { source: io::Error, payload: Option>, }, + #[error("Incompatible read filter types: in config: \"{expected}\" and found: \"{found}\"{}{}", .payload.is_some().then_some(": ").unwrap_or_default(), if let Some(p) = payload.as_ref() { p as &dyn Display } else { &"" })] ReadFilterIncompatibleTypes { expected: ReadFilterKind, found: ReadFilterKind, payload: Option>, }, + + #[error("Action \"{}\" not found", .0)] + ActionNotFound(String), + + #[error("Can't parse action \"{name}\": {err}")] + ActionParsingError { + name: String, + err: Box, + }, } impl From> for ExternalDataResult { diff --git a/fetcher-config/src/jobs/job.rs b/fetcher-config/src/jobs/job.rs index 7f7d9a83..3fcd82c4 100644 --- a/fetcher-config/src/jobs/job.rs +++ b/fetcher-config/src/jobs/job.rs @@ -35,8 +35,8 @@ pub struct Job { pub source: Option, #[serde(rename = "process")] pub actions: Option>, - pub sink: Option, pub entry_to_msg_map_enabled: Option, + pub sink: Option, pub tasks: Option>, pub refresh: Option, @@ -65,8 +65,8 @@ impl Job { tag: self.tag, source: self.source, actions: self.actions, - sink: self.sink, entry_to_msg_map_enabled: self.entry_to_msg_map_enabled, + sink: self.sink, }; let job = CJob { @@ -113,13 +113,13 @@ impl Job { task.actions = self.actions.clone(); } - if task.sink.is_none() { - task.sink = self.sink.clone(); - } - if task.entry_to_msg_map_enabled.is_none() { task.entry_to_msg_map_enabled = self.entry_to_msg_map_enabled; } + + if task.sink.is_none() { + task.sink = self.sink.clone(); + } } // FIXME: broken. Filtering can remove tasks from the tasks map. Then, when checking if we should pass the task name as a tag, we ignore the fact that we could've had more tasks in the job and skip the tag which we shouldn't do diff --git a/fetcher-config/src/jobs/sink.rs b/fetcher-config/src/jobs/sink.rs index 2a874f8c..6f40a26f 100644 --- a/fetcher-config/src/jobs/sink.rs +++ b/fetcher-config/src/jobs/sink.rs @@ -35,11 +35,4 @@ impl Sink { Self::Stdout => Box::new(CStdout {}), }) } - - pub fn has_message_id_support(&self) -> bool { - match self { - Self::Telegram(_) => true, - Self::Discord(_) | Self::Exec(_) | Self::Stdout => false, // TODO: implement message id support for Discord - } - } } diff --git a/fetcher-config/src/jobs/source.rs b/fetcher-config/src/jobs/source.rs index 93baa2d4..f845f50d 100644 --- a/fetcher-config/src/jobs/source.rs +++ b/fetcher-config/src/jobs/source.rs @@ -74,4 +74,13 @@ impl Source { Self::AlwaysErrors => Box::new(CAlwaysErrors), }) } + + pub fn supports_replies(&self) -> bool { + // Source::Email will support replies in the future + #[allow(clippy::match_like_matches_macro)] + match self { + Self::Twitter(_) => true, + _ => false, + } + } } diff --git a/fetcher-config/src/jobs/source/http.rs b/fetcher-config/src/jobs/source/http.rs index b19e96ab..f8094345 100644 --- a/fetcher-config/src/jobs/source/http.rs +++ b/fetcher-config/src/jobs/source/http.rs @@ -10,7 +10,6 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, OneOrMany}; use url::Url; -// TODO: use a map #[serde_as] #[derive(Deserialize, Serialize, Clone, Debug)] #[serde(transparent)] diff --git a/fetcher-config/src/jobs/task.rs b/fetcher-config/src/jobs/task.rs index 8e380e26..37e56b79 100644 --- a/fetcher-config/src/jobs/task.rs +++ b/fetcher-config/src/jobs/task.rs @@ -20,7 +20,7 @@ use super::{ source::Source, }; use crate::Error; -use fetcher_core::{task::Task as CTask, utils::OptionExt}; +use fetcher_core::{action::Action as CAction, task::Task as CTask, utils::OptionExt}; #[derive(Deserialize, Serialize, Debug)] #[serde(deny_unknown_fields)] @@ -31,9 +31,8 @@ pub struct Task { pub source: Option, #[serde(rename = "process")] pub actions: Option>, - // TODO: completely integrate into actions - pub sink: Option, pub entry_to_msg_map_enabled: Option, + pub sink: Option, } impl Task { @@ -64,33 +63,32 @@ impl Task { }; let actions = self.actions.try_map(|acts| { - itertools::process_results( + let mut acts = itertools::process_results( acts.into_iter() - .filter_map(|act| act.parse(rf.clone()).transpose()), - |i| i.flatten().collect(), - ) + .filter_map(|act| act.parse(rf.clone(), external).transpose()), + |i| i.flatten().collect::>(), + )?; + + if let Some(sink) = self.sink { + acts.push(CAction::Sink(sink.parse(external)?)); + } + + Ok::<_, Error>(acts) })?; - // TODO: replace with match like tag below - let entry_to_msg_map = if self + let entry_to_msg_map_enabled = self .entry_to_msg_map_enabled .tap_some(|b| { - if let Some(sink) = &self.sink { - // TODO: include task name - tracing::info!( - "Overriding entry_to_msg_map_enabled for {} from the default {} to {}", - job, - sink.has_message_id_support(), - b - ); - } + // TODO: include task name + tracing::info!( + "Overriding entry_to_msg_map_enabled for {} from the default to {}", + job, + b + ); }) - .unwrap_or_else(|| { - // replace with "source.supports_replies()". There's a point to keeping the map even if the sink doesn't support it, e.g. if it's changed from stdout to discord later on - self.sink - .as_ref() - .map_or(false, Sink::has_message_id_support) - }) { + .unwrap_or_else(|| self.source.as_ref().map_or(false, Source::supports_replies)); + + let entry_to_msg_map = if entry_to_msg_map_enabled { match external.entry_to_msg_map(job, task_name) { ExternalDataResult::Ok(v) => Some(v), ExternalDataResult::Unavailable => { @@ -125,7 +123,6 @@ impl Task { tag, source: self.source.map(|x| x.parse(rf, external)).transpose()?, actions, - sink: self.sink.try_map(|x| x.parse(external))?, entry_to_msg_map, }) } diff --git a/fetcher-core/src/action.rs b/fetcher-core/src/action.rs index 8f7edf94..84bce310 100644 --- a/fetcher-core/src/action.rs +++ b/fetcher-core/src/action.rs @@ -9,43 +9,21 @@ pub mod filter; pub mod transform; -use self::{ - filter::Filter, - transform::{error::TransformError, Transform}, -}; -use crate::entry::Entry; +use crate::sink::Sink; + +use self::{filter::Filter, transform::Transform}; /// An action that modifies a list of entries in some way #[derive(Debug)] pub enum Action { /// Filter out entries Filter(Box), + /// Transform some entries into one or more new entries Transform(Box), -} -impl Action { - /// Processes `entries` using the [`Action`] - /// - /// # Errors - /// if there was error transforming `entries`. Filtering out never fails - pub async fn process(&self, mut entries: Vec) -> Result, TransformError> { - match self { - Action::Filter(f) => { - f.filter(&mut entries).await; - Ok(entries) - } - Action::Transform(tr) => { - let mut fully_transformed = Vec::new(); - - for entry in entries { - fully_transformed.extend(tr.transform(entry).await?); - } - - Ok(fully_transformed) - } - } - } + /// Send entries to the Sink + Sink(Box), } impl From> for Action { @@ -59,3 +37,9 @@ impl From> for Action { Action::Transform(transform) } } + +impl From> for Action { + fn from(sink: Box) -> Self { + Action::Sink(sink) + } +} diff --git a/fetcher-core/src/action/transform/entry/print.rs b/fetcher-core/src/action/transform/entry/print.rs index 27b54c26..5d6b1c0f 100644 --- a/fetcher-core/src/action/transform/entry/print.rs +++ b/fetcher-core/src/action/transform/entry/print.rs @@ -39,7 +39,7 @@ impl TransformEntry for DebugPrint { }; Stdout - .send(msg, None, Some("debug print")) + .send(&msg, None, Some("debug print")) .await .expect("stdout is unavailable"); diff --git a/fetcher-core/src/exec.rs b/fetcher-core/src/exec.rs index 2d278b42..9d26ef63 100644 --- a/fetcher-core/src/exec.rs +++ b/fetcher-core/src/exec.rs @@ -84,11 +84,11 @@ impl Sink for Exec { /// * if the data couldn't be passed to the stdin pipe of the process async fn send( &self, - message: Message, + message: &Message, _reply_to: Option<&MessageId>, _tag: Option<&str>, ) -> Result, SinkError> { - let Some(body) = message.body else { + let Some(body) = &message.body else { return Ok(None); }; diff --git a/fetcher-core/src/sink.rs b/fetcher-core/src/sink.rs index 3fb6c698..703bd8b5 100644 --- a/fetcher-core/src/sink.rs +++ b/fetcher-core/src/sink.rs @@ -32,7 +32,7 @@ pub trait Sink: Debug + Send + Sync { /// Send the message with an optional tag (usually represented as a hashtag) async fn send( &self, - message: Message, + message: &Message, reply_to: Option<&MessageId>, tag: Option<&str>, ) -> Result, SinkError>; diff --git a/fetcher-core/src/sink/discord.rs b/fetcher-core/src/sink/discord.rs index 9e516ad9..2580c4e5 100644 --- a/fetcher-core/src/sink/discord.rs +++ b/fetcher-core/src/sink/discord.rs @@ -70,7 +70,7 @@ impl Discord { impl Sink for Discord { async fn send( &self, - msg: Message, + msg: &Message, reply_to: Option<&MessageId>, tag: Option<&str>, ) -> Result, SinkError> { @@ -173,7 +173,7 @@ impl Sink for Discord { .await .map_err(|e| SinkError::Discord { source: e, - msg: Box::new(msg), + msg: Box::new(msg.clone()), })?; last_message = Some(msg.id); diff --git a/fetcher-core/src/sink/stdout.rs b/fetcher-core/src/sink/stdout.rs index c443f061..9378b2db 100644 --- a/fetcher-core/src/sink/stdout.rs +++ b/fetcher-core/src/sink/stdout.rs @@ -25,7 +25,7 @@ impl Sink for Stdout { /// if there was an error writing to stdout async fn send( &self, - msg: Message, + msg: &Message, _reply_to: Option<&MessageId>, tag: Option<&str>, ) -> Result, SinkError> { @@ -33,7 +33,7 @@ impl Sink for Stdout { "------------------------------\nMessage:\nTitle: {title}\n\nBody:\n{body}\n\nLink: {link}\n\nMedia: {media:?}\n\nTag: {tag:?}\n------------------------------\n", title = msg.title.as_deref().unwrap_or("None"), body = msg.body.as_deref().unwrap_or("None"), - link = msg.link.map(|url| url.as_str().to_owned()).as_deref().unwrap_or("None"), + link = msg.link.as_ref().map(|url| url.as_str().to_owned()).as_deref().unwrap_or("None"), media = msg.media, tag = tag.unwrap_or("None") ).as_bytes()).await.map_err(SinkError::Stdout)?; diff --git a/fetcher-core/src/sink/telegram.rs b/fetcher-core/src/sink/telegram.rs index d9470d7b..d173377e 100644 --- a/fetcher-core/src/sink/telegram.rs +++ b/fetcher-core/src/sink/telegram.rs @@ -72,7 +72,7 @@ impl Sink for Telegram { #[tracing::instrument(level = "debug", skip(message))] async fn send( &self, - message: Message, + message: &Message, reply_to: Option<&MessageId>, tag: Option<&str>, ) -> Result, SinkError> { @@ -99,7 +99,7 @@ impl Telegram { async fn send_processed( &self, mut msg: MessageLengthLimiter<'_>, - media: Option>, + media: Option<&[Media]>, reply_to: Option, ) -> Result, SinkError> { let mut last_message = reply_to; @@ -118,7 +118,7 @@ impl Telegram { .expect("should always return a valid split at least once since msg char len is > max_char_limit"); let sent_msg = self - .send_media(&media, Some(&media_caption), last_message) + .send_media(media, Some(&media_caption), last_message) .await?; last_message = sent_msg.and_then(|v| v.first().map(|m| m.id)); } @@ -337,17 +337,19 @@ impl Telegram { } } -// format and sanitize all message fields. Returns (head, body, tail, media) -fn process_msg( - msg: Message, - tag: Option<&str>, - link_location: LinkLocation, -) -> ( +type HeadBodyTailMedia<'a> = ( Option, Option, Option, - Option>, -) { + Option<&'a [Media]>, +); + +// format and sanitize all message fields. Returns (head, body, tail, media) +fn process_msg<'a>( + msg: &'a Message, + tag: Option<&str>, + link_location: LinkLocation, +) -> HeadBodyTailMedia<'a> { let Message { title, body, @@ -356,8 +358,8 @@ fn process_msg( } = msg; // escape title and body - let title = title.map(|s| teloxide::utils::html::escape(&s)); - let body = body.map(|s| teloxide::utils::html::escape(&s)); + let title = title.as_deref().map(teloxide::utils::html::escape); + let body = body.as_deref().map(teloxide::utils::html::escape); // put the link into the message let (mut head, tail) = match (title, link) { @@ -400,7 +402,7 @@ fn process_msg( }); } - (head, body, tail, media) + (head, body, tail, media.as_deref()) } impl Debug for Telegram { diff --git a/fetcher-core/src/task.rs b/fetcher-core/src/task.rs index 10c5c7c8..ff9a120b 100644 --- a/fetcher-core/src/task.rs +++ b/fetcher-core/src/task.rs @@ -8,17 +8,20 @@ pub mod entry_to_msg_map; -use std::collections::HashSet; - use self::entry_to_msg_map::EntryToMsgMap; use crate::{ - action::{transform::error::TransformError, Action}, - entry::Entry, + action::Action, + entry::{Entry, EntryId}, error::Error, - sink::Sink, + sink::{ + message::{Message, MessageId}, + Sink, + }, source::Source, }; +use std::{borrow::Cow, collections::HashSet}; + /// A core primitive of [`fetcher`](`crate`). /// Contains everything from a [`Source`] that allows to fetch some data, to a [`Sink`] that takes that data and sends it somewhere. /// It also contains any transformators @@ -33,9 +36,6 @@ pub struct Task { /// A list of optional transformators which to run the data received from the source through pub actions: Option>, - /// The sink where to send the data to - pub sink: Option>, - /// Map of an entry to a message. Used when an entry is a reply to an older entry to be able to show that as a message, too pub entry_to_msg_map: Option, } @@ -49,85 +49,73 @@ impl Task { pub async fn run(&mut self) -> Result<(), Error> { tracing::trace!("Running task"); - let entries = { - let raw = match &mut self.source { - Some(source) => source.fetch().await?, - None => vec![Entry::default()], // return just an empty entry if there is no source - }; - - tracing::debug!("Got {} raw entries from the sources", raw.len()); - tracing::trace!("Raw entries: {raw:#?}"); - - let processed = match &self.actions { - Some(actions) => process_entries(raw, actions).await?, - None => raw, - }; - - let processed_len = processed.len(); - tracing::debug!("{processed_len} entries remained after processing"); - tracing::trace!("Entries after processing: {processed:#?}"); - - let deduped = remove_duplicates(processed); - - if processed_len - deduped.len() > 0 { - tracing::info!( - "Removed {} duplicate entries", - processed_len - deduped.len() - ); - } - - deduped + let raw = match &mut self.source { + Some(source) => source.fetch().await?, + None => vec![Entry::default()], // return just an empty entry if there is no source }; - // entries should be sorted newest to oldest but we should send oldest first - for entry in entries.into_iter().rev() { - self.send_entry(entry).await?; - } + tracing::debug!("Got {} raw entries from the sources", raw.len()); + tracing::trace!("Raw entries: {raw:#?}"); + + self.process_entries(raw).await?; Ok(()) } - #[tracing::instrument(level = "trace", skip_all, fields(entry_id = ?entry.id))] - async fn send_entry(&mut self, entry: Entry) -> Result<(), Error> { - tracing::trace!("Sending and marking as read entry"); - - let msgid = match self.sink.as_ref() { - // send message if it isn't empty or raw_contents of they aren't - Some(sink) if !entry.msg.is_empty() || entry.raw_contents.is_some() => { - // use raw_contents as msg.body if the message is empty - let mut msg = entry.msg; - - if msg.is_empty() { - msg.body = Some( - entry - .raw_contents - .expect("raw_contents should be some because of the match guard"), - ); + // TODO: figure out a way to split into several functions to avoid 15 level nesting? + // It's a bit difficult because this function can't be a method because we are borrowing self.actions + // throughout the entire process + async fn process_entries(&mut self, mut entries: Vec) -> Result<(), Error> { + for act in self.actions.iter().flatten() { + match act { + Action::Filter(f) => { + f.filter(&mut entries).await; } + Action::Transform(tr) => { + let mut fully_transformed = Vec::new(); - let tag = self.tag.as_deref(); - let reply_to = self - .entry_to_msg_map - .as_mut() - .and_then(|map| map.get_if_exists(entry.reply_to.as_ref())); + for entry in entries { + fully_transformed.extend(tr.transform(entry).await?); + } - tracing::debug!( - "Sending {msg:?} to a sink with tag {tag:?}, replying to {reply_to:?}" - ); - sink.send(msg, reply_to, tag).await? - } - _ => None, - }; - - if let Some(entry_id) = entry.id { - if let Some(source) = &mut self.source { - tracing::debug!("Marking {entry_id:?} as read"); - source.mark_as_read(&entry_id).await?; - } - - if let Some((msgid, map)) = msgid.zip(self.entry_to_msg_map.as_mut()) { - tracing::debug!("Associating entry {entry_id:?} with message {msgid:?}"); - map.insert(entry_id, msgid).await?; + entries = fully_transformed; + } + Action::Sink(s) => { + let undeduped_len = entries.len(); + tracing::trace!("Entries to send before dedup: {undeduped_len}"); + + entries = remove_duplicates(entries); + + if undeduped_len - entries.len() > 0 { + tracing::info!( + "Removed {} duplicate entries before sending", + undeduped_len - entries.len() + ); + } + + tracing::trace!("Sending entries: {entries:#?}"); + + // entries should be sorted newest to oldest but we should send oldest first + for entry in entries.iter().rev() { + let msg_id = send_entry( + &**s, + self.entry_to_msg_map.as_mut(), + self.tag.as_deref(), + entry, + ) + .await?; + + if let Some(entry_id) = entry.id.as_ref() { + mark_entry_as_read( + entry_id, + msg_id, + self.source.as_mut(), + self.entry_to_msg_map.as_mut(), + ) + .await?; + } + } + } } } @@ -135,15 +123,60 @@ impl Task { } } -async fn process_entries( - mut entries: Vec, - actions: &[Action], -) -> Result, TransformError> { - for a in actions { - entries = a.process(entries).await?; +#[tracing::instrument(level = "trace", skip_all, fields(entry_id = ?entry.id))] +async fn send_entry( + sink: &dyn Sink, + mut entry_to_msg_map: Option<&mut EntryToMsgMap>, + tag: Option<&str>, + entry: &Entry, +) -> Result, Error> { + tracing::trace!("Sending entry"); + + // send message if it isn't empty or raw_contents of they aren't + if entry.msg.is_empty() && entry.raw_contents.is_none() { + return Ok(None); + } + + let msg = if entry.msg.is_empty() { + Cow::Owned(Message { + body: Some( + entry + .raw_contents + .clone() + .expect("raw_contents should be some because of the early return check"), + ), + ..entry.msg.clone() + }) + } else { + Cow::Borrowed(&entry.msg) + }; + + let reply_to = entry_to_msg_map + .as_mut() + .and_then(|map| map.get_if_exists(entry.reply_to.as_ref())); + + tracing::debug!("Sending {msg:?} to a sink with tag {tag:?}, replying to {reply_to:?}"); + Ok(sink.send(&msg, reply_to, tag).await?) +} + +async fn mark_entry_as_read( + entry_id: &EntryId, + msg_id: Option, + // source: Option<&mut dyn Source>, // TODO: this doesn't work. Why? + source: Option<&mut Box>, + entry_to_msg_map: Option<&mut EntryToMsgMap>, +) -> Result<(), Error> { + if let Some(mar) = source { + tracing::debug!("Marking {entry_id:?} as read"); + mar.mark_as_read(entry_id).await?; + } + + if let Some((msgid, map)) = msg_id.zip(entry_to_msg_map) { + tracing::debug!("Associating entry {entry_id:?} with message {msgid:?}"); + map.insert(entry_id.clone(), msgid).await?; } - Ok(entries) + Ok(()) } fn remove_duplicates(entries: Vec) -> Vec { diff --git a/fetcher-core/tests/reply_to.rs b/fetcher-core/tests/reply_to.rs index 72e27a85..6111f2d1 100644 --- a/fetcher-core/tests/reply_to.rs +++ b/fetcher-core/tests/reply_to.rs @@ -3,6 +3,7 @@ use async_trait::async_trait; use fetcher_core::{ + action::Action, entry::{Entry, EntryId}, error::Error, read_filter::MarkAsRead, @@ -49,7 +50,7 @@ impl Source for DummySource {} impl Sink for DummySink { async fn send( &self, - _message: Message, + _message: &Message, reply_to: Option<&MessageId>, _tag: Option<&str>, ) -> Result, SinkError> { @@ -70,8 +71,7 @@ async fn reply_to() { let mut task = Task { tag: None, source: Some(Box::new(DummySource)), - actions: None, - sink: Some(Box::new(DummySink)), + actions: Some(vec![Action::Sink(Box::new(DummySink))]), entry_to_msg_map: Some(entry_to_msg_map), }; diff --git a/fetcher/src/args.rs b/fetcher/src/args.rs index d21bc328..06feda49 100644 --- a/fetcher/src/args.rs +++ b/fetcher/src/args.rs @@ -11,7 +11,7 @@ use fetcher_config::jobs::{ }; use argh::FromArgs; -use color_eyre::{eyre::eyre, Report, Result}; +use color_eyre::{Report, Result}; use std::{path::PathBuf, str::FromStr}; /// Automatic news fetching and parsing @@ -129,7 +129,7 @@ impl FromStr for Setting { } } -/// Wrapper around Job foreign struct to implement `FromStr` from valid job JSON +/// Wrapper around Job foreign struct to implement `FromStr` from valid job in JSON format #[derive(Debug)] pub struct JsonJobs(Vec<(JobName, ConfigJob)>); diff --git a/fetcher/src/extentions.rs b/fetcher/src/extentions.rs index 50f03cf8..3e5cc550 100644 --- a/fetcher/src/extentions.rs +++ b/fetcher/src/extentions.rs @@ -5,7 +5,10 @@ */ pub mod error_chain; +pub mod report_std_error_wrapper; pub mod slice_display; -pub use self::error_chain::ErrorChainExt; -pub use self::slice_display::SliceDisplayExt; +pub use self::{ + error_chain::ErrorChainExt, report_std_error_wrapper::IntoStdErrorExt, + slice_display::SliceDisplayExt, +}; diff --git a/fetcher/src/extentions/report_std_error_wrapper.rs b/fetcher/src/extentions/report_std_error_wrapper.rs new file mode 100644 index 00000000..3f954bc2 --- /dev/null +++ b/fetcher/src/extentions/report_std_error_wrapper.rs @@ -0,0 +1,29 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +use std::{error::Error, fmt::Display}; + +use color_eyre::Report; + +pub trait IntoStdErrorExt { + fn into_std_error(self) -> Box; +} + +impl IntoStdErrorExt for Report { + fn into_std_error(self) -> Box { + Box::new(ReportStdErrorWrapper(self)) + } +} + +#[derive(Debug)] +pub struct ReportStdErrorWrapper(pub Report); + +impl Error for ReportStdErrorWrapper {} +impl Display for ReportStdErrorWrapper { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:#}", self.0) + } +} diff --git a/fetcher/src/main.rs b/fetcher/src/main.rs index 7fd4b557..a5e31dd2 100644 --- a/fetcher/src/main.rs +++ b/fetcher/src/main.rs @@ -138,7 +138,16 @@ async fn async_main() -> Result<()> { job.inner.refresh_time = None; for task in &mut job.inner.tasks { - task.sink = None; + if let Some(actions) = task.actions.take() { + let no_sink_acts = actions + .into_iter() + .filter(|a| !matches!(a, Action::Sink(_))) + .collect::>(); + + if !no_sink_acts.is_empty() { + task.actions = Some(no_sink_acts); + } + } } } @@ -305,8 +314,10 @@ async fn run_command(run_args: args::Run, cx: Context) -> Result<()> { } // don't send anything anywhere, just print - if let Some(sink) = &mut task.sink { - *sink = Box::new(Stdout); + for act in task.actions.iter_mut().flatten() { + if let Action::Sink(sink) = act { + *sink = Box::new(Stdout); + } } // don't save entry to msg map to the fs @@ -690,7 +701,7 @@ async fn report_error(job_name: &str, err: &str, context: Context) -> Result<()> ..Default::default() }; Telegram::new(bot, admin_chat_id, LinkLocation::default()) - .send(msg, None, Some(job_name)) + .send(&msg, None, Some(job_name)) .await .map_err(fetcher_core::error::Error::Sink)?; diff --git a/fetcher/src/settings/config.rs b/fetcher/src/settings/config.rs index 67b68e31..d78992f2 100644 --- a/fetcher/src/settings/config.rs +++ b/fetcher/src/settings/config.rs @@ -4,6 +4,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ +pub mod actions; pub mod jobs; pub mod templates; diff --git a/fetcher/src/settings/config/actions.rs b/fetcher/src/settings/config/actions.rs new file mode 100644 index 00000000..b48e1357 --- /dev/null +++ b/fetcher/src/settings/config/actions.rs @@ -0,0 +1,60 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + */ + +use super::CONFIG_FILE_EXT; +use crate::settings::context::StaticContext as Context; +use fetcher_config::jobs::action::Action as ActionConfig; + +use color_eyre::{eyre::eyre, Result}; +use figment::{ + providers::{Format, Yaml}, + Figment, +}; +use std::path::Path; + +const ACTIONS_DIR: &str = "actions"; + +/// Find all actions with `name` in the default actions paths +/// +/// # Errors +/// if the found actions path couldn't be read +#[tracing::instrument(level = "debug", name = "action")] +pub fn find(name: &str, context: Context) -> Result>> { + for actions_dir_path in context.conf_paths.iter().map(|p| p.join(ACTIONS_DIR)) { + if let Some(actions) = find_in(&actions_dir_path, name)? { + return Ok(Some(actions)); + } + } + + Ok(None) +} + +/// Find all actions with `name` in `actions_path`. +/// Returns Some(ActionConfig) if the action was found in the directory, None otherwise +/// +/// # Errors +/// * if the path couldn't be read +/// * if the config exists at `action_path` but is invalid +pub fn find_in(action_path: &Path, name: &str) -> Result>> { + tracing::trace!("Searching for an action {name:?} in {action_path:?}"); + let path = action_path.join(name).with_extension(CONFIG_FILE_EXT); + + if !path.exists() { + tracing::trace!("{path:?} doesn't exist"); + return Ok(None); + } + + if !path.is_file() { + return Err(eyre!( + "Action \"{name}\" exists at {} but is not a file", + path.display() + )); + } + + let action_config: Vec = Figment::new().merge(Yaml::file(path)).extract()?; + + Ok(Some(action_config)) +} diff --git a/fetcher/src/settings/config/jobs.rs b/fetcher/src/settings/config/jobs.rs index 713cc73f..cfc5fa77 100644 --- a/fetcher/src/settings/config/jobs.rs +++ b/fetcher/src/settings/config/jobs.rs @@ -152,6 +152,7 @@ pub fn get_all_from<'a>( pub fn get(path: &Path, name: JobName, cx: Context) -> Result> { tracing::trace!("Parsing a job from file"); + // TODO: use extract_inner() let TemplatesField { templates } = Figment::new().merge(Yaml::file(path)).extract()?; let mut full_conf = Figment::new(); diff --git a/fetcher/src/settings/external_data_provider.rs b/fetcher/src/settings/external_data_provider.rs index 9c11956f..7a38bb41 100644 --- a/fetcher/src/settings/external_data_provider.rs +++ b/fetcher/src/settings/external_data_provider.rs @@ -4,9 +4,12 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -use super::{context::StaticContext, data}; +use crate::extentions::IntoStdErrorExt; + +use super::{config, context::StaticContext, data}; use fetcher_config::jobs::{ - external_data::{ExternalDataResult, ProvideExternalData}, + action::Action as ActionConfig, + external_data::{ExternalDataError, ExternalDataResult, ProvideExternalData}, named::{JobName, TaskName}, read_filter::Kind as ReadFilterKind, }; @@ -55,4 +58,15 @@ impl ProvideExternalData for ExternalDataFromDataDir { ) -> ExternalDataResult { data::runtime_external_save::entry_to_msg_map::get(job, task, self.cx).into() } + + fn import(&self, name: &str) -> ExternalDataResult> { + match config::actions::find(name, self.cx) { + Ok(Some(x)) => ExternalDataResult::Ok(x), + Ok(None) => ExternalDataResult::Err(ExternalDataError::ActionNotFound(name.to_owned())), + Err(e) => ExternalDataResult::Err(ExternalDataError::ActionParsingError { + name: name.to_owned(), + err: e.into_std_error(), + }), + } + } }