Skip to content

Commit

Permalink
feat: add utilities for streaming paths (#387)
Browse files Browse the repository at this point in the history
  • Loading branch information
morgante committed Jun 28, 2024
1 parent 196ab66 commit f9fa65f
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 5 deletions.
1 change: 1 addition & 0 deletions crates/cli/src/commands/apply_migration.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::flags::GlobalFormatFlags;
use crate::{flags::OutputFormat, messenger_variant::create_emitter};
use marzano_messenger::emit::FlushableMessenger;

#[cfg(not(feature = "workflows_v2"))]
use anyhow::bail;
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/commands/apply_pattern.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use marzano_gritmodule::markdown::get_body_from_md_content;
use marzano_gritmodule::searcher::{find_global_grit_dir, find_grit_modules_dir};
use marzano_gritmodule::utils::{infer_pattern, is_pattern_name, parse_remote_name};
use marzano_language::target_language::PatternLanguage;
use marzano_messenger::emit::FlushableMessenger as _;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::env;
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/commands/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use marzano_core::{
};
use marzano_gritmodule::{config::ResolvedGritDefinition, utils::extract_path};
use marzano_language::target_language::{expand_paths, PatternLanguage};
use marzano_messenger::emit::FlushableMessenger as _;
use marzano_util::cache::GritCache;
use marzano_util::rich_path::RichPath;
use marzano_util::{finder::get_input_files, rich_path::RichFile};
Expand Down
6 changes: 4 additions & 2 deletions crates/cli/src/messenger_variant.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{bail, Result};
use marzano_core::api::AnalysisLog;
use marzano_messenger::{
emit::Messager,
emit::{FlushableMessenger, Messager},
output_mode::OutputMode,
workflows::{PackagedWorkflowOutcome, WorkflowMessenger},
};
Expand Down Expand Up @@ -186,8 +186,10 @@ impl<'a> MessengerVariant<'a> {
_ => None,
}
}
}

pub async fn flush(&mut self) -> anyhow::Result<()> {
impl FlushableMessenger for MessengerVariant<'_> {
async fn flush(&mut self) -> anyhow::Result<()> {
match self {
#[cfg(feature = "remote_redis")]
MessengerVariant::Redis(ref mut redis) => redis.flush().await,
Expand Down
14 changes: 14 additions & 0 deletions crates/core/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ impl MatchResult {
pub fn is_error(&self) -> bool {
matches!(self, MatchResult::AnalysisLog(log) if log.level < 400)
}

pub fn kind(&self) -> &'static str {
match self {
MatchResult::PatternInfo(_) => "PatternInfo",
MatchResult::AllDone(_) => "AllDone",
MatchResult::Match(_) => "Match",
MatchResult::InputFile(_) => "InputFile",
MatchResult::Rewrite(_) => "Rewrite",
MatchResult::CreateFile(_) => "CreateFile",
MatchResult::RemoveFile(_) => "RemoveFile",
MatchResult::DoneFile(_) => "DoneFile",
MatchResult::AnalysisLog(_) => "AnalysisLog",
}
}
}

/// Make a path look the way provolone expects it to
Expand Down
77 changes: 75 additions & 2 deletions crates/core/src/problem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
marzano_resolved_pattern::{MarzanoFile, MarzanoResolvedPattern},
pattern_compiler::compiler::VariableLocations,
};
use anyhow::Result;
use anyhow::{bail, Result};
use grit_pattern_matcher::{
constants::{GLOBAL_VARS_SCOPE_INDEX, NEW_FILES_INDEX},
context::QueryContext,
Expand All @@ -34,10 +34,11 @@ use rayon::iter::IntoParallelIterator;
use rayon::iter::ParallelIterator;
use sha2::{Digest, Sha256};

use crate::api::FileMatchResult;
use std::{
collections::HashMap,
path::PathBuf,
sync::mpsc::{self, Sender},
sync::mpsc::{self, Receiver, Sender},
};
use std::{fmt::Debug, str::FromStr};
use tracing::{event, Level};
Expand Down Expand Up @@ -338,6 +339,7 @@ impl Problem {
results
}

/// Given a vec of paths, execute the problem on each path and stream the results
pub fn execute_paths_streaming(
&self,
files: Vec<PathBuf>,
Expand All @@ -348,6 +350,77 @@ impl Problem {
self.execute_shared(files, context, tx, cache)
}

/// Given an input channel and an output channel, chain the input channel to the output channel
///
/// Files that match from the input channel are executed by this pattern
/// All other message types are simply forwarded to the output channel
///
pub fn execute_streaming_relay(
&self,
incoming_rx: Receiver<Vec<MatchResult>>,
context: &ExecutionContext,
outgoing_tx: Sender<Vec<MatchResult>>,
_cache: &impl GritCache,
) -> Result<()> {
if self.is_multifile {
bail!("Streaming is not supported for multifile patterns");
}

#[cfg(feature = "grit_tracing")]
let parent_span = tracing::span!(Level::INFO, "execute_shared_body",).entered();
#[cfg(feature = "grit_tracing")]
let parent_cx = parent_span.context();

rayon::scope(|s| {
#[cfg(feature = "grit_tracing")]
let grouped_ctx = parent_cx;

s.spawn(move |_| {
#[cfg(feature = "grit_tracing")]
let task_span = tracing::info_span!("apply_file_inner").entered();
#[cfg(feature = "grit_tracing")]
task_span.set_parent(grouped_ctx);

event!(Level::INFO, "spawn execute_shared_body");

incoming_rx.iter().for_each(|res| {
let mut paths = Vec::new();

for m in res.into_iter() {
match m {
MatchResult::Match(m) => {
paths.push(PathBuf::from(m.file_name()));
}
MatchResult::PatternInfo(_)
| MatchResult::AllDone(_)
| MatchResult::InputFile(_)
| MatchResult::AnalysisLog(_)
| MatchResult::DoneFile(_) => {
outgoing_tx.send(vec![m]).unwrap();
}
MatchResult::Rewrite(_)
| MatchResult::CreateFile(_)
| MatchResult::RemoveFile(_) => {
outgoing_tx
.send(vec![
m,
MatchResult::AnalysisLog(AnalysisLog::floating_error(
"Streaming does not support rewrites, creates, or removes"
.to_string(),
)),
])
.unwrap();
}
}
}
self.execute_shared(paths, context, outgoing_tx.clone(), &NullCache::new());
});
})
});

Ok(())
}

#[cfg_attr(feature = "grit_tracing", tracing::instrument(skip_all))]
pub(crate) fn execute_shared(
&self,
Expand Down
4 changes: 4 additions & 0 deletions crates/marzano_messenger/src/emit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,10 @@ pub trait Messager: Send + Sync {
}
}

pub trait FlushableMessenger {
fn flush(&mut self) -> impl std::future::Future<Output = Result<()>> + Send;
}

/// Visibility levels dictate *which* objects we show (ex. just rewrites, or also every file analyzed)
#[derive(Debug, PartialEq, PartialOrd, Clone, Copy, ValueEnum, Serialize, Default)]
pub enum VisibilityLevels {
Expand Down
8 changes: 7 additions & 1 deletion crates/marzano_messenger/src/testing.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use marzano_core::api::MatchResult;

use crate::emit::Messager;
use crate::emit::{FlushableMessenger, Messager};

/// A testing messenger that doesn't actually send messages anywhere.
///
Expand Down Expand Up @@ -41,3 +41,9 @@ impl Messager for TestingMessenger {
Ok(())
}
}

impl FlushableMessenger for TestingMessenger {
async fn flush(&mut self) -> Result<()> {
Ok(())
}
}

0 comments on commit f9fa65f

Please sign in to comment.