From 3bdc87130ff85977e01814f789bbfc2eff9dd322 Mon Sep 17 00:00:00 2001 From: luca-ctx <216224554+luca-ctx@users.noreply.github.com> Date: Wed, 1 Jul 2026 13:18:30 -0500 Subject: [PATCH 1/2] Add history source plugin imports --- Cargo.lock | 1 + crates/ctx-cli/Cargo.toml | 1 + crates/ctx-cli/src/docs.rs | 9 + crates/ctx-cli/src/history_source_plugins.rs | 506 +++++++++++++++++++ crates/ctx-cli/src/main.rs | 489 +++++++++++++++++- crates/ctx-cli/tests/cli.rs | 503 ++++++++++++++++++ crates/ctx-history-capture/src/lib.rs | 201 +++++++- docs/cli-reference.md | 50 +- docs/custom-history-import-format.md | 36 +- docs/first-10-minutes.md | 5 +- docs/getting-started.md | 5 +- docs/history-source-plugins.md | 225 +++++++++ docs/providers.md | 10 +- docs/search.md | 15 +- docs/storage.md | 11 +- 15 files changed, 2001 insertions(+), 66 deletions(-) create mode 100644 crates/ctx-cli/src/history_source_plugins.rs create mode 100644 docs/history-source-plugins.md diff --git a/Cargo.lock b/Cargo.lock index 97b76076..7f689fa6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -291,6 +291,7 @@ dependencies = [ "predicates", "ring", "rusqlite", + "serde", "serde_json", "sha2", "tempfile", diff --git a/crates/ctx-cli/Cargo.toml b/crates/ctx-cli/Cargo.toml index d91a6463..84141954 100644 --- a/crates/ctx-cli/Cargo.toml +++ b/crates/ctx-cli/Cargo.toml @@ -22,6 +22,7 @@ chrono.workspace = true clap.workspace = true clap_mangen.workspace = true ring.workspace = true +serde.workspace = true serde_json.workspace = true sha2.workspace = true ureq = { version = "2.10", default-features = true } diff --git a/crates/ctx-cli/src/docs.rs b/crates/ctx-cli/src/docs.rs index f964e987..e2f0e6e0 100644 --- a/crates/ctx-cli/src/docs.rs +++ b/crates/ctx-cli/src/docs.rs @@ -214,6 +214,15 @@ const TOPICS: &[DocTopic] = &[ source_path: "docs/custom-history-import-format.md", body: include_str!("../../../docs/custom-history-import-format.md"), }, + DocTopic { + id: "history-source-plugins", + title: "History Source Plugins", + audience: "integrator-agent", + summary: "Local plugin manifests, stdout import, cursor handoff, and adapter shapes.", + tags: &["providers", "plugins", "imports", "custom"], + source_path: "docs/history-source-plugins.md", + body: include_str!("../../../docs/history-source-plugins.md"), + }, DocTopic { id: "provider-support", title: "Provider Support", diff --git a/crates/ctx-cli/src/history_source_plugins.rs b/crates/ctx-cli/src/history_source_plugins.rs new file mode 100644 index 00000000..50ed1452 --- /dev/null +++ b/crates/ctx-cli/src/history_source_plugins.rs @@ -0,0 +1,506 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + env, + fs::{self, OpenOptions}, + io::{Read, Write}, + path::{Path, PathBuf}, + process::{Command, Stdio}, + thread, + time::{Duration, Instant}, +}; + +#[cfg(unix)] +use std::os::unix::fs::OpenOptionsExt; + +use anyhow::{anyhow, Context, Result}; +use serde::Deserialize; +use uuid::Uuid; + +const PLUGIN_MANIFEST_FILE: &str = "ctx-history-plugin.json"; +const LEGACY_PLUGIN_MANIFEST_FILE: &str = "plugin.json"; +const DEFAULT_PLUGIN_TIMEOUT_SECONDS: u64 = 300; +const MAX_PLUGIN_STDERR_SNIPPET_BYTES: usize = 4096; +const MAX_INLINE_CURSOR_ENV_BYTES: usize = 8192; +const SAFE_PLUGIN_ENV: &[&str] = &[ + "PATH", + "HOME", + "USER", + "LOGNAME", + "LANG", + "LC_ALL", + "LC_CTYPE", + "TMPDIR", + "TEMP", + "TMP", + "XDG_CONFIG_HOME", + "XDG_DATA_HOME", + "XDG_CACHE_HOME", + "XDG_STATE_HOME", +]; + +#[derive(Debug, Clone)] +pub struct HistorySourcePluginSource { + pub plugin_name: String, + pub plugin_display_name: Option, + pub plugin_version: Option, + pub manifest_path: PathBuf, + pub manifest_dir: PathBuf, + pub id: String, + pub display_name: Option, + pub provider_key: String, + pub source_id: String, + pub source_format: String, + pub command: Vec, + pub working_dir: Option, + pub env: BTreeMap, + pub enabled: bool, + pub refresh: HistorySourcePluginRefresh, + pub timeout: Duration, +} + +impl HistorySourcePluginSource { + pub fn label(&self) -> String { + format!("{}/{}", self.plugin_name, self.id) + } + + pub fn cursor_stream(&self) -> String { + ctx_history_capture::custom_history_jsonl_v1_cursor_stream( + &self.provider_key, + &self.source_id, + &self.source_format, + ) + } + + pub fn matches_selector(&self, selector: &str) -> bool { + selector == self.plugin_name + || selector == self.id + || selector == self.label() + || selector == self.provider_key + || selector == format!("{}/{}", self.provider_key, self.source_id) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum HistorySourcePluginRefresh { + Manual, + Auto, +} + +impl Default for HistorySourcePluginRefresh { + fn default() -> Self { + Self::Manual + } +} + +#[derive(Debug, Clone)] +pub struct HistorySourcePluginRun { + pub stdout: Vec, + pub stderr: String, +} + +#[derive(Debug, Clone)] +pub struct HistorySourcePluginRunOptions<'a> { + pub data_root: &'a Path, + pub machine_id: &'a str, + pub cursor: Option<&'a str>, + pub cursor_stream: &'a str, + pub full_rescan: bool, +} + +#[derive(Debug, Deserialize)] +struct HistorySourcePluginManifest { + schema_version: u32, + name: String, + #[serde(default)] + display_name: Option, + #[serde(default)] + version: Option, + #[serde(default)] + history_sources: Vec, +} + +#[derive(Debug, Deserialize)] +struct HistorySourcePluginSourceManifest { + id: String, + #[serde(default)] + display_name: Option, + #[serde(default)] + provider_key: Option, + #[serde(default)] + source_id: Option, + source_format: String, + command: Vec, + #[serde(default)] + working_dir: Option, + #[serde(default)] + env: BTreeMap, + #[serde(default)] + enabled: bool, + #[serde(default)] + refresh: HistorySourcePluginRefresh, + #[serde(default)] + timeout_seconds: Option, +} + +pub fn discover_history_source_plugins( + data_root: &Path, + extra_manifests: &[PathBuf], +) -> Result> { + let mut sources = Vec::new(); + for manifest_path in plugin_manifest_paths(data_root) { + match read_plugin_manifest(&manifest_path) { + Ok(mut manifest_sources) => sources.append(&mut manifest_sources), + Err(_) => continue, + } + } + for manifest_path in explicit_plugin_manifest_paths(extra_manifests)? { + let mut manifest_sources = read_plugin_manifest(&manifest_path)?; + sources.append(&mut manifest_sources); + } + sources.sort_by(|left, right| left.label().cmp(&right.label())); + Ok(sources) +} + +pub fn run_history_source_plugin( + source: &HistorySourcePluginSource, + options: HistorySourcePluginRunOptions<'_>, +) -> Result { + let (program, args) = source.command.split_first().ok_or_else(|| { + anyhow!( + "history source plugin {} has an empty command", + source.label() + ) + })?; + let mut command = Command::new(program); + command.env_clear(); + inherit_safe_plugin_env(&mut command); + command.args(args); + command.stdin(Stdio::null()); + command.stdout(Stdio::piped()); + command.stderr(Stdio::piped()); + if let Some(working_dir) = &source.working_dir { + command.current_dir(resolve_manifest_path(&source.manifest_dir, working_dir)); + } + for (key, value) in &source.env { + command.env(key, value); + } + command.env("CTX_DATA_ROOT", options.data_root); + command.env("CTX_HISTORY_PLUGIN", "1"); + command.env("CTX_HISTORY_PLUGIN_NAME", &source.plugin_name); + command.env("CTX_HISTORY_PLUGIN_MANIFEST", &source.manifest_path); + command.env("CTX_HISTORY_SOURCE", source.label()); + command.env("CTX_HISTORY_SOURCE_ID", &source.source_id); + command.env("CTX_HISTORY_PROVIDER_KEY", &source.provider_key); + command.env("CTX_HISTORY_SOURCE_FORMAT", &source.source_format); + command.env("CTX_HISTORY_CURSOR_STREAM", options.cursor_stream); + command.env("CTX_HISTORY_MACHINE_ID", options.machine_id); + command.env( + "CTX_HISTORY_FULL_RESCAN", + if options.full_rescan { "1" } else { "0" }, + ); + let cursor_file = if let Some(cursor) = options.cursor { + let path = write_private_temp_file("ctx-history-cursor", cursor).with_context(|| { + format!("write history source plugin {} cursor file", source.label()) + })?; + if cursor.len() <= MAX_INLINE_CURSOR_ENV_BYTES { + command.env("CTX_HISTORY_CURSOR", cursor); + command.env("CTX_HISTORY_CURSOR_JSON", cursor); + } else { + command.env_remove("CTX_HISTORY_CURSOR"); + command.env_remove("CTX_HISTORY_CURSOR_JSON"); + } + command.env("CTX_HISTORY_CURSOR_FILE", &path); + Some(path) + } else { + command.env_remove("CTX_HISTORY_CURSOR"); + command.env_remove("CTX_HISTORY_CURSOR_JSON"); + command.env_remove("CTX_HISTORY_CURSOR_FILE"); + None + }; + let mut child = match command.spawn() { + Ok(child) => child, + Err(err) => { + cleanup_cursor_file(cursor_file.as_ref()); + return Err(err).with_context(|| { + format!( + "spawn history source plugin {} command {}", + source.label(), + shell_like_command(&source.command) + ) + }); + } + }; + let mut stdout = child + .stdout + .take() + .context("history source plugin stdout was not piped")?; + let mut stderr = child + .stderr + .take() + .context("history source plugin stderr was not piped")?; + let stdout_handle = thread::spawn(move || { + let mut bytes = Vec::new(); + stdout.read_to_end(&mut bytes).map(|_| bytes) + }); + let stderr_handle = thread::spawn(move || { + let mut bytes = Vec::new(); + stderr.read_to_end(&mut bytes).map(|_| bytes) + }); + + let started = Instant::now(); + let status = loop { + if let Some(status) = child.try_wait()? { + break status; + } + if started.elapsed() >= source.timeout { + let _ = child.kill(); + let _ = child.wait(); + cleanup_cursor_file(cursor_file.as_ref()); + return Err(anyhow!( + "history source plugin {} timed out after {}s", + source.label(), + source.timeout.as_secs() + )); + } + thread::sleep(Duration::from_millis(25)); + }; + + let stdout = stdout_handle + .join() + .map_err(|_| anyhow!("history source plugin stdout reader panicked"))??; + let stderr = stderr_handle + .join() + .map_err(|_| anyhow!("history source plugin stderr reader panicked"))??; + cleanup_cursor_file(cursor_file.as_ref()); + let stderr = String::from_utf8_lossy(&stderr).trim().to_owned(); + if !status.success() { + let detail = if stderr.is_empty() { + format!("exit status {status}") + } else { + format!("exit status {status}: {}", stderr_snippet(&stderr)) + }; + return Err(anyhow!( + "history source plugin {} failed: {detail}", + source.label() + )); + } + Ok(HistorySourcePluginRun { stdout, stderr }) +} + +fn inherit_safe_plugin_env(command: &mut Command) { + for key in SAFE_PLUGIN_ENV { + if let Some(value) = env::var_os(key) { + command.env(key, value); + } + } +} + +fn write_private_temp_file(prefix: &str, contents: &str) -> Result { + for _ in 0..16 { + let path = env::temp_dir().join(format!("{prefix}-{}.json", Uuid::new_v4())); + let mut options = OpenOptions::new(); + options.write(true).create_new(true); + #[cfg(unix)] + options.mode(0o600); + match options.open(&path) { + Ok(mut file) => { + file.write_all(contents.as_bytes())?; + return Ok(path); + } + Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => continue, + Err(err) => { + return Err(err) + .with_context(|| format!("create private temp file {}", path.display())); + } + } + } + Err(anyhow!("failed to allocate unique private temp file")) +} + +fn cleanup_cursor_file(path: Option<&PathBuf>) { + if let Some(path) = path { + let _ = fs::remove_file(path); + } +} + +fn read_plugin_manifest(path: &Path) -> Result> { + let raw = fs::read_to_string(path) + .with_context(|| format!("read history source plugin manifest {}", path.display()))?; + let manifest: HistorySourcePluginManifest = serde_json::from_str(&raw) + .with_context(|| format!("parse history source plugin manifest {}", path.display()))?; + validate_plugin_id("plugin name", &manifest.name)?; + if manifest.schema_version != 1 { + return Err(anyhow!( + "history source plugin manifest {} has unsupported schema_version {}; expected 1", + path.display(), + manifest.schema_version + )); + } + let manifest_dir = path + .parent() + .unwrap_or_else(|| Path::new(".")) + .to_path_buf(); + let mut sources = Vec::new(); + for source in manifest.history_sources { + validate_plugin_id("history source id", &source.id)?; + let provider_key = source.provider_key.unwrap_or_else(|| manifest.name.clone()); + validate_plugin_id("provider_key", &provider_key)?; + let source_id = source.source_id.unwrap_or_else(|| source.id.clone()); + if source.source_format.trim().is_empty() { + return Err(anyhow!( + "history source plugin manifest {} source {} has empty source_format", + path.display(), + source.id + )); + } + if source.command.is_empty() || source.command.iter().any(|part| part.trim().is_empty()) { + return Err(anyhow!( + "history source plugin manifest {} source {} has empty command", + path.display(), + source.id + )); + } + sources.push(HistorySourcePluginSource { + plugin_name: manifest.name.clone(), + plugin_display_name: manifest.display_name.clone(), + plugin_version: manifest.version.clone(), + manifest_path: path.to_path_buf(), + manifest_dir: manifest_dir.clone(), + id: source.id, + display_name: source.display_name, + provider_key, + source_id, + source_format: source.source_format, + command: source.command, + working_dir: source.working_dir, + env: source.env, + enabled: source.enabled, + refresh: source.refresh, + timeout: Duration::from_secs( + source + .timeout_seconds + .unwrap_or(DEFAULT_PLUGIN_TIMEOUT_SECONDS) + .max(1), + ), + }); + } + Ok(sources) +} + +fn plugin_manifest_paths(data_root: &Path) -> Vec { + let mut candidates = BTreeSet::new(); + collect_manifest_path_candidates(&data_root.join("plugins"), &mut candidates); + if let Some(paths) = env::var_os("CTX_HISTORY_PLUGIN_PATH") { + for path in env::split_paths(&paths) { + collect_manifest_path_candidates(&path, &mut candidates); + } + } + if let Some(paths) = env::var_os("CTX_PLUGIN_PATH") { + for path in env::split_paths(&paths) { + collect_manifest_path_candidates(&path, &mut candidates); + } + } + candidates.into_iter().collect() +} + +fn explicit_plugin_manifest_paths(extra_manifests: &[PathBuf]) -> Result> { + let mut candidates = BTreeSet::new(); + for path in extra_manifests { + let before = candidates.len(); + collect_manifest_path_candidates(path, &mut candidates); + if candidates.len() == before { + return Err(anyhow!( + "history source plugin manifest path {} did not contain {}", + path.display(), + PLUGIN_MANIFEST_FILE + )); + } + } + Ok(candidates.into_iter().collect()) +} + +fn collect_manifest_path_candidates(path: &Path, candidates: &mut BTreeSet) { + if path.is_file() { + candidates.insert(path.to_path_buf()); + return; + } + if !path.is_dir() { + return; + } + let direct = path.join(PLUGIN_MANIFEST_FILE); + if direct.is_file() { + candidates.insert(direct); + } + let legacy = path.join(LEGACY_PLUGIN_MANIFEST_FILE); + if legacy.is_file() { + candidates.insert(legacy); + } + let Ok(entries) = fs::read_dir(path) else { + return; + }; + for entry in entries.flatten() { + let child = entry.path(); + if child.is_file() + && child + .file_name() + .and_then(|name| name.to_str()) + .is_some_and(|name| name == PLUGIN_MANIFEST_FILE) + { + candidates.insert(child); + continue; + } + if child.is_dir() { + let manifest = child.join(PLUGIN_MANIFEST_FILE); + if manifest.is_file() { + candidates.insert(manifest); + } + } + } +} + +fn validate_plugin_id(label: &str, value: &str) -> Result<()> { + let valid = !value.is_empty() + && value.len() <= 128 + && value.bytes().all(|byte| { + byte.is_ascii_lowercase() || byte.is_ascii_digit() || matches!(byte, b'.' | b'_' | b'-') + }) + && value + .bytes() + .next() + .is_some_and(|byte| byte.is_ascii_lowercase() || byte.is_ascii_digit()); + if valid { + Ok(()) + } else { + Err(anyhow!( + "{label} must be 1 to 128 bytes, start with a lowercase ASCII letter or digit, and use only lowercase ASCII letters, digits, '.', '_', or '-'" + )) + } +} + +fn resolve_manifest_path(manifest_dir: &Path, path: &Path) -> PathBuf { + if path.is_absolute() { + path.to_path_buf() + } else { + manifest_dir.join(path) + } +} + +fn shell_like_command(command: &[String]) -> String { + command.join(" ") +} + +fn stderr_snippet(value: &str) -> String { + let mut snippet = value + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .take(12) + .collect::>() + .join(" | "); + if snippet.len() > MAX_PLUGIN_STDERR_SNIPPET_BYTES { + snippet.truncate(MAX_PLUGIN_STDERR_SNIPPET_BYTES); + snippet.push_str("..."); + } + snippet +} diff --git a/crates/ctx-cli/src/main.rs b/crates/ctx-cli/src/main.rs index 2bf735ef..0c9a5e10 100644 --- a/crates/ctx-cli/src/main.rs +++ b/crates/ctx-cli/src/main.rs @@ -1,6 +1,6 @@ use std::{ env, fs, - io::{IsTerminal, Read, Write}, + io::{Cursor, IsTerminal, Read, Write}, path::{Path, PathBuf}, str::FromStr, sync::{Arc, Mutex}, @@ -18,6 +18,7 @@ use uuid::Uuid; mod analytics; mod config; mod docs; +mod history_source_plugins; mod identity; mod mcp; mod net; @@ -30,9 +31,10 @@ use ctx_history_capture::{ import_antigravity_cli_history, import_claude_projects_jsonl_tree, import_codex_history_jsonl, import_codex_session_jsonl, import_codex_session_jsonl_tail, import_codex_session_paths, import_codex_session_tree, import_copilot_cli_session_events, import_cursor_native_history, - import_custom_history_jsonl_v1, import_factory_ai_droid_sessions, import_gemini_cli_history, - import_opencode_sqlite, import_pi_session_jsonl, provider_source_for_path, - provider_source_spec, stable_capture_uuid, validate_custom_history_jsonl_v1, + import_custom_history_jsonl_v1, import_custom_history_jsonl_v1_reader, + import_factory_ai_droid_sessions, import_gemini_cli_history, import_opencode_sqlite, + import_pi_session_jsonl, provider_source_for_path, provider_source_spec, stable_capture_uuid, + validate_custom_history_jsonl_v1, validate_custom_history_jsonl_v1_reader, AntigravityCliImportOptions, CatalogSummary, ClaudeProjectsImportOptions, CodexEventImportMode, CodexHistoryImportOptions, CodexSessionCatalogOptions, CodexSessionImportOptions, CodexSessionImportProgress, CodexSessionImportProgressCallback, CodexToolOutputMode, @@ -42,8 +44,9 @@ use ctx_history_capture::{ ProviderSourceStatus, }; use ctx_history_core::{ - database_path, default_data_root, CaptureProvider, ContextCitation, ContextCitationType, Event, - EventRole, EventType, HistoryRecord, ProviderRawRetention, RedactionState, Session, + database_path, default_data_root, CaptureProvider, ContextCitation, ContextCitationType, + CtxHistoryJsonlRecord, Event, EventRole, EventType, HistoryRecord, ProviderRawRetention, + RedactionState, Session, }; use ctx_history_store::{ CatalogSession, CatalogSourceIndexUpdate, RawSqlOptions, RawSqlResult, RawSqlValue, @@ -51,6 +54,10 @@ use ctx_history_store::{ RAW_SQL_DEFAULT_MAX_ROWS, RAW_SQL_DEFAULT_MAX_SQL_BYTES, RAW_SQL_DEFAULT_MAX_VALUE_BYTES, RAW_SQL_MAX_TIMEOUT, }; +use history_source_plugins::{ + discover_history_source_plugins, run_history_source_plugin, HistorySourcePluginRefresh, + HistorySourcePluginRunOptions, HistorySourcePluginSource, +}; const WAL_TRUNCATE_MIN_BYTES: u64 = 64 * 1024 * 1024; const LARGE_IMPORT_SOURCE_FILES_WARNING: usize = 10_000; @@ -124,14 +131,28 @@ struct ImportArgs { provider: Option, #[arg(long)] path: Option, + #[arg( + long = "history-source", + alias = "plugin", + conflicts_with_all = ["provider", "path", "format", "all"] + )] + history_source: Option, + #[arg( + long = "history-source-manifest", + alias = "plugin-manifest", + conflicts_with_all = ["provider", "path", "format"] + )] + history_source_manifest: Vec, + #[arg(long = "reset-cursor")] + reset_cursor: bool, #[arg( long, value_enum, requires = "path", - conflicts_with_all = ["provider", "all"] + conflicts_with_all = ["provider", "all", "history_source"] )] format: Option, - #[arg(long, conflicts_with_all = ["provider", "path", "format"])] + #[arg(long, conflicts_with_all = ["provider", "path", "format", "history_source"])] all: bool, #[arg(long)] resume: bool, @@ -662,6 +683,7 @@ struct ImportRunOptions { json: bool, print_human: bool, allow_empty_sources: bool, + include_history_source_plugins: bool, operation: &'static str, } @@ -1217,7 +1239,9 @@ fn main() -> Result<()> { let result = match cli.command { CommandRoot::Setup(args) => run_setup(args, data_root.clone(), &mut analytics_properties), CommandRoot::Status(args) => run_status(args, data_root.clone(), &mut analytics_properties), - CommandRoot::Sources(args) => run_sources(args, &mut analytics_properties), + CommandRoot::Sources(args) => { + run_sources(args, data_root.clone(), &mut analytics_properties) + } CommandRoot::Import(args) => run_import(args, data_root.clone(), &mut analytics_properties), CommandRoot::Show(args) => run_show(args, data_root.clone(), &mut analytics_properties), CommandRoot::Locate(args) => run_locate(args, data_root.clone(), &mut analytics_properties), @@ -1270,6 +1294,8 @@ fn command_analytics_properties(command: &CommandRoot) -> AnalyticsProperties { "source_mode", if args.format.is_some() { "explicit_format" + } else if args.history_source.is_some() { + "history_source_plugin" } else if args.path.is_some() { "explicit_path" } else if args.all { @@ -1287,6 +1313,7 @@ fn command_analytics_properties(command: &CommandRoot) -> AnalyticsProperties { provider.capture_provider().as_str(), ); } + analytics::insert_bool(&mut properties, "reset_cursor", args.reset_cursor); analytics::insert_str( &mut properties, "progress_mode", @@ -1441,6 +1468,9 @@ fn run_setup( let import_args = ImportArgs { provider: None, path: None, + history_source: None, + history_source_manifest: Vec::new(), + reset_cursor: false, format: None, all: true, resume: false, @@ -1456,6 +1486,7 @@ fn run_setup( json: args.json, print_human: !args.json, allow_empty_sources: true, + include_history_source_plugins: false, operation: "setup", }, )?) @@ -1657,8 +1688,13 @@ fn run_status( Ok(()) } -fn run_sources(args: JsonArgs, analytics_properties: &mut AnalyticsProperties) -> Result<()> { +fn run_sources( + args: JsonArgs, + data_root: PathBuf, + analytics_properties: &mut AnalyticsProperties, +) -> Result<()> { let sources = discovered_sources(); + let plugin_sources = discover_history_source_plugins(&data_root, &[])?; let existing = sources.iter().filter(|source| source.exists).count(); let importable = sources .iter() @@ -1671,7 +1707,7 @@ fn run_sources(args: JsonArgs, analytics_properties: &mut AnalyticsProperties) - analytics::insert_count_bucket( analytics_properties, "providers_detected_bucket", - sources.len() as u64, + sources.len().saturating_add(plugin_sources.len()) as u64, ); analytics::insert_count_bucket( analytics_properties, @@ -1684,9 +1720,11 @@ fn run_sources(args: JsonArgs, analytics_properties: &mut AnalyticsProperties) - importable as u64, ); if args.json { + let mut source_values = sources_json(&sources); + source_values.extend(plugin_sources_json(&plugin_sources)); print_json(json!({ "schema_version": 1, - "sources": sources_json(&sources), + "sources": source_values, }))?; } else { for source in sources { @@ -1698,6 +1736,13 @@ fn run_sources(args: JsonArgs, analytics_properties: &mut AnalyticsProperties) - source.source_format ); } + for source in plugin_sources { + println!( + "custom {} available (history-source-plugin:{})", + source.label(), + source.source_format + ); + } } Ok(()) } @@ -1759,6 +1804,7 @@ fn run_import( json, print_human: !json, allow_empty_sources: false, + include_history_source_plugins: true, operation: "import", }, )?; @@ -1773,7 +1819,7 @@ fn run_import_internal( ) -> Result { fs::create_dir_all(&data_root)?; config::write_default_config(&data_root)?; - let db_path = database_path(data_root); + let db_path = database_path(data_root.clone()); let mut store = Store::open(&db_path)?; let mut totals = ImportTotals::default(); let mut imported_sources = Vec::new(); @@ -1790,12 +1836,17 @@ fn run_import_internal( } let requests = import_requests(args)?; - if requests.is_empty() { + let plugin_requests = history_source_plugin_import_requests( + args, + &data_root, + options.include_history_source_plugins, + )?; + if requests.is_empty() && plugin_requests.is_empty() { if options.allow_empty_sources { return Ok(ImportReport::empty(args.resume)); } return Err(anyhow!( - "no importable provider history sources found; use --path or run `ctx sources`" + "no importable provider history sources found; use --path, --history-source, or run `ctx sources`" )); } @@ -1810,7 +1861,7 @@ fn run_import_internal( analytics::insert_count_bucket( analytics_properties, "sources_seen_bucket", - planned_sources.len() as u64, + planned_sources.len().saturating_add(plugin_requests.len()) as u64, ); analytics::insert_bytes_bucket( analytics_properties, @@ -1829,7 +1880,7 @@ fn run_import_internal( "discovering", format!( "found {} import source(s), {}", - planned_sources.len(), + planned_sources.len().saturating_add(plugin_requests.len()), format_bytes(planned_total_bytes) ), ); @@ -1840,6 +1891,64 @@ fn run_import_internal( progress.warning(warning); } + for plugin_source in plugin_requests { + if options.print_human { + progress.finish_line(); + println!("importing history source plugin {}", plugin_source.label()); + } + progress.message( + "indexing", + format!("running history source plugin {}", plugin_source.label()), + ); + match import_history_source_plugin( + &mut store, + &plugin_source, + &data_root, + args.resume || args.reset_cursor, + ) { + Ok((summary, stats)) => { + totals.add(&summary, &stats); + progress.done( + "indexing", + format!("imported history source plugin {}", plugin_source.label()), + planned_total_bytes, + ); + if options.print_human { + progress.finish_line(); + print_history_source_plugin_imported(&plugin_source, &summary); + } + imported_sources.push(history_source_plugin_import_json( + &plugin_source, + &stats, + &summary, + )); + } + Err(err) => { + let error = error_summary(&err); + if allow_source_failures && !import_error_is_systemic(&error) { + totals.add_source_failure(&SourceStats::default()); + progress.done( + "indexing", + format!( + "skipped history source plugin {}: {}", + plugin_source.label(), + one_line_error(&error) + ), + planned_total_bytes, + ); + if options.print_human { + progress.finish_line(); + print_history_source_plugin_failed(&plugin_source, &error); + } + imported_sources + .push(history_source_plugin_failure_json(&plugin_source, &error)); + } else { + return Err(err); + } + } + } + } + if should_parallelize_import(&planned_sources) { let final_refresh_required = store.event_search_projection_needs_backfill()? || planned_sources @@ -2114,7 +2223,12 @@ fn run_import_internal( ); analytics::insert_count_bucket(analytics_properties, "failed_bucket", totals.failed as u64); if totals.imported_sources == 0 && totals.failed_sources > 0 { - return Err(anyhow!("all import sources failed")); + let detail = imported_sources + .iter() + .find_map(|source| source.get("error").and_then(Value::as_str)) + .map(|error| format!("; first failure: {error}")) + .unwrap_or_default(); + return Err(anyhow!("all import sources failed{detail}")); } Ok(ImportReport { resume: args.resume, @@ -2412,6 +2526,32 @@ fn custom_format_import_json( }) } +fn history_source_plugin_import_json( + source: &HistorySourcePluginSource, + stats: &SourceStats, + summary: &ProviderImportSummary, +) -> Value { + json!({ + "status": "imported", + "provider": CaptureProvider::Custom.as_str(), + "kind": "history_source_plugin", + "plugin": source.plugin_name, + "history_source": source.label(), + "provider_key": source.provider_key, + "source_id": source.source_id, + "source_format": source.source_format, + "manifest_path": source.manifest_path, + "source_files": stats.files, + "source_bytes": stats.bytes, + "imported_sessions": summary.imported_sessions, + "imported_events": summary.imported_events, + "imported_edges": summary.imported_edges, + "skipped": summary.skipped, + "failed": summary.failed, + "failures": provider_failures_json(summary), + }) +} + fn provider_failures_json(summary: &ProviderImportSummary) -> Vec { summary .failures @@ -2438,6 +2578,23 @@ fn source_failure_json(failure: &ImportSourceFailure) -> Value { }) } +fn history_source_plugin_failure_json(source: &HistorySourcePluginSource, error: &str) -> Value { + json!({ + "status": "failed", + "provider": CaptureProvider::Custom.as_str(), + "kind": "history_source_plugin", + "plugin": source.plugin_name, + "history_source": source.label(), + "provider_key": source.provider_key, + "source_id": source.source_id, + "source_format": source.source_format, + "manifest_path": source.manifest_path, + "source_files": 0, + "source_bytes": 0, + "error": one_line_error(error), + }) +} + fn print_source_imported(source: &SourceInfo, summary: &ProviderImportSummary) { println!( "imported {}: sessions={} events={} edges={} skipped={} failed={}", @@ -2450,6 +2607,21 @@ fn print_source_imported(source: &SourceInfo, summary: &ProviderImportSummary) { ); } +fn print_history_source_plugin_imported( + source: &HistorySourcePluginSource, + summary: &ProviderImportSummary, +) { + println!( + "imported history source plugin {}: sessions={} events={} edges={} skipped={} failed={}", + source.label(), + summary.imported_sessions, + summary.imported_events, + summary.imported_edges, + summary.skipped, + summary.failed + ); +} + fn print_source_failed(failure: &ImportSourceFailure) { println!( "skipped {}: {}", @@ -2459,6 +2631,15 @@ fn print_source_failed(failure: &ImportSourceFailure) { println!(" path: {}", failure.source.path.display()); } +fn print_history_source_plugin_failed(source: &HistorySourcePluginSource, error: &str) { + println!( + "skipped history source plugin {}: {}", + source.label(), + one_line_error(error) + ); + println!(" manifest: {}", source.manifest_path.display()); +} + fn source_error_reason(source: &SourceInfo, error: &str) -> String { let error = one_line_error(error); let prefix = format!( @@ -4309,6 +4490,9 @@ fn run_doctor( } fn import_requests(args: &ImportArgs) -> Result> { + if args.history_source.is_some() || !args.history_source_manifest.is_empty() { + return Ok(Vec::new()); + } if let Some(path) = &args.path { let provider = args .provider @@ -4356,6 +4540,209 @@ fn import_requests(args: &ImportArgs) -> Result> { Ok(sources) } +fn history_source_plugin_import_requests( + args: &ImportArgs, + data_root: &Path, + include_plugins: bool, +) -> Result> { + if !include_plugins { + return Ok(Vec::new()); + } + if !args.all && args.history_source.is_none() && args.history_source_manifest.is_empty() { + return Ok(Vec::new()); + } + let sources = discover_history_source_plugins(data_root, &args.history_source_manifest)?; + if let Some(selector) = &args.history_source { + let matches = sources + .into_iter() + .filter(|source| source.matches_selector(selector)) + .collect::>(); + if matches.is_empty() { + return Err(anyhow!( + "no history source plugin matched `{selector}`; use `ctx sources` to inspect configured plugins" + )); + } + if matches.len() > 1 { + let labels = matches + .iter() + .map(HistorySourcePluginSource::label) + .collect::>() + .join(", "); + return Err(anyhow!( + "history source plugin selector `{selector}` matched multiple sources ({labels}); use plugin/source or provider_key/source_id" + )); + } + return Ok(matches); + } + if args.all { + return Ok(sources + .into_iter() + .filter(|source| source.enabled) + .collect()); + } + Ok(sources + .into_iter() + .filter(|source| { + args.history_source_manifest + .iter() + .any(|path| manifest_arg_matches_source(path, &source.manifest_path)) + }) + .collect()) +} + +fn manifest_arg_matches_source(arg: &Path, manifest_path: &Path) -> bool { + if arg.is_file() { + return same_pathish(arg, manifest_path); + } + if arg.is_dir() { + return manifest_path.starts_with(arg); + } + same_pathish(arg, manifest_path) +} + +fn same_pathish(left: &Path, right: &Path) -> bool { + if left == right { + return true; + } + let left = fs::canonicalize(left).unwrap_or_else(|_| left.to_path_buf()); + let right = fs::canonicalize(right).unwrap_or_else(|_| right.to_path_buf()); + left == right +} + +fn import_history_source_plugin( + store: &mut Store, + source: &HistorySourcePluginSource, + data_root: &Path, + full_rescan: bool, +) -> Result<(ProviderImportSummary, SourceStats)> { + let record = import_record_for_history_source_plugin(source); + let record_id = record.id; + let options = CustomHistoryJsonlV1ImportOptions::default(); + let machine_id = options.machine_id.clone(); + let cursor_stream = source.cursor_stream(); + let previous_cursor = if full_rescan { + None + } else { + store + .get_sync_cursor(None, &machine_id, &cursor_stream)? + .map(|cursor| cursor.cursor) + }; + let run = run_history_source_plugin( + source, + HistorySourcePluginRunOptions { + data_root, + machine_id: &machine_id, + cursor: previous_cursor.as_deref(), + cursor_stream: &cursor_stream, + full_rescan, + }, + )?; + let _plugin_stderr = &run.stderr; + validate_history_source_plugin_output(source, &run.stdout, &machine_id)?; + let validation = validate_custom_history_jsonl_v1_reader(Cursor::new(run.stdout.as_slice())) + .map_err(anyhow::Error::from)?; + if validation.failed > 0 { + return Err(history_source_plugin_import_failure(source, &validation)); + } + let stats = SourceStats { + files: 1, + bytes: run.stdout.len() as u64, + }; + store.upsert_record(&record)?; + let summary = import_custom_history_jsonl_v1_reader( + Cursor::new(run.stdout), + store, + CustomHistoryJsonlV1ImportOptions { + machine_id, + source_path: Some(source.manifest_path.clone()), + history_record_id: Some(record_id), + allow_partial_failures: false, + ..options + }, + ) + .map_err(anyhow::Error::from)?; + if summary.failed > 0 { + return Err(history_source_plugin_import_failure(source, &summary)); + } + Ok((summary, stats)) +} + +fn validate_history_source_plugin_output( + source: &HistorySourcePluginSource, + stdout: &[u8], + machine_id: &str, +) -> Result<()> { + let text = std::str::from_utf8(stdout).with_context(|| { + format!( + "history source plugin {} emitted non-UTF-8 ctx-history-jsonl-v1 output", + source.label() + ) + })?; + let mut saw_source = false; + for (index, line) in text.lines().enumerate() { + let line_number = index + 1; + if line.trim().is_empty() { + continue; + } + let record: CtxHistoryJsonlRecord = serde_json::from_str(line).with_context(|| { + format!( + "history source plugin {} emitted invalid ctx-history-jsonl-v1 at line {line_number}", + source.label() + ) + })?; + let CtxHistoryJsonlRecord::Source(source_record) = record else { + continue; + }; + saw_source = true; + if source_record.provider_key != source.provider_key + || source_record.source_id != source.source_id + || source_record.source_format != source.source_format + { + return Err(anyhow!( + "history source plugin {} emitted source identity {}/{}/{} but manifest declares {}/{}/{}", + source.label(), + source_record.provider_key, + source_record.source_id, + source_record.source_format, + source.provider_key, + source.source_id, + source.source_format + )); + } + if let Some(source_machine_id) = source_record.machine_id { + if source_machine_id != machine_id { + return Err(anyhow!( + "history source plugin {} emitted machine_id `{source_machine_id}` but ctx is importing as `{machine_id}`; omit machine_id or set it to CTX_HISTORY_MACHINE_ID", + source.label() + )); + } + } + } + if !saw_source { + return Err(anyhow!( + "history source plugin {} emitted no source record", + source.label() + )); + } + Ok(()) +} + +fn history_source_plugin_import_failure( + source: &HistorySourcePluginSource, + summary: &ProviderImportSummary, +) -> anyhow::Error { + let detail = summary + .failures + .first() + .map(|failure| format!("line {}: {}", failure.line, failure.error)) + .unwrap_or_else(|| "unknown validation failure".to_owned()); + anyhow!( + "history source plugin {} import failed with {} failure(s); first failure: {detail}", + source.label(), + summary.failed + ) +} + fn validate_source_import_supported(source: &SourceInfo) -> Result<()> { match source.import_support { ProviderImportSupport::Native => Ok(()), @@ -5144,6 +5531,35 @@ fn import_record_for_custom_history(path: &Path, format: ImportFormatArg) -> His record } +fn import_record_for_history_source_plugin(source: &HistorySourcePluginSource) -> HistoryRecord { + let key = format!( + "history-source-plugin:{}:{}:{}:{}:{}", + source.plugin_name, source.id, source.provider_key, source.source_id, source.source_format + ); + let mut record = HistoryRecord::new( + format!("history source plugin {}", source.label()), + format!( + "Indexed custom agent history from history source plugin {} ({})", + source.label(), + source.source_format + ), + vec![ + "agent-history".into(), + "custom".into(), + "history-source-plugin".into(), + source.provider_key.clone(), + source.source_format.clone(), + ], + "agent_history", + source + .manifest_path + .parent() + .map(|path| path.display().to_string()), + ); + record.id = stable_capture_uuid(&key, "record"); + record +} + fn discovered_sources() -> Vec { home_dir() .as_deref() @@ -5180,6 +5596,43 @@ fn sources_json(sources: &[SourceInfo]) -> Vec { .collect() } +fn plugin_sources_json(sources: &[HistorySourcePluginSource]) -> Vec { + sources + .iter() + .map(|source| { + json!({ + "provider": CaptureProvider::Custom.as_str(), + "kind": "history_source_plugin", + "plugin": source.plugin_name, + "plugin_display_name": source.plugin_display_name, + "plugin_version": source.plugin_version, + "history_source": source.label(), + "history_source_id": source.id, + "display_name": source.display_name, + "provider_key": source.provider_key, + "source_id": source.source_id, + "source_format": source.source_format, + "manifest_path": source.manifest_path, + "enabled": source.enabled, + "refresh": history_source_plugin_refresh_json(source.refresh), + "status": "available", + "import_support": "history_source_plugin", + "native_import": false, + "importable": true, + "raw_retention": "metadata_only", + "unsupported_reason": null, + }) + }) + .collect() +} + +fn history_source_plugin_refresh_json(refresh: HistorySourcePluginRefresh) -> &'static str { + match refresh { + HistorySourcePluginRefresh::Manual => "manual", + HistorySourcePluginRefresh::Auto => "auto", + } +} + fn import_support_json(support: ProviderImportSupport) -> &'static str { match support { ProviderImportSupport::Native => "native", diff --git a/crates/ctx-cli/tests/cli.rs b/crates/ctx-cli/tests/cli.rs index e12d00bc..3a059ab3 100644 --- a/crates/ctx-cli/tests/cli.rs +++ b/crates/ctx-cli/tests/cli.rs @@ -35,6 +35,180 @@ fn custom_history_fixture(name: &str) -> String { materialized_fixture("custom-history-jsonl", name) } +#[derive(Debug)] +struct HistorySourcePluginFixture { + manifest_dir: PathBuf, + run_marker: PathBuf, +} + +fn write_history_source_plugin( + temp: &TempDir, + provider: &str, + enabled: bool, + cursor_log: Option<&Path>, +) -> HistorySourcePluginFixture { + write_history_source_plugin_at( + &temp.path().join("history-plugins"), + provider, + enabled, + cursor_log, + ) +} + +fn write_history_source_plugin_at( + root: &Path, + provider: &str, + enabled: bool, + cursor_log: Option<&Path>, +) -> HistorySourcePluginFixture { + let manifest_dir = root.join(provider); + fs::create_dir_all(&manifest_dir).unwrap(); + let script = manifest_dir.join("export.py"); + let run_marker = manifest_dir.join("ran"); + let run_marker_json = Value::String(run_marker.display().to_string()); + let cursor_log_py = cursor_log + .map(|path| { + serde_json::to_string(&path.display().to_string()) + .expect("cursor log path is JSON-serializable") + }) + .unwrap_or_else(|| "None".to_owned()); + let script_body = format!( + r#"#!/usr/bin/env python3 +import json +import os +import pathlib +import sys + +provider = sys.argv[1] +source_id = os.environ["CTX_HISTORY_SOURCE_ID"] +provider_key = os.environ["CTX_HISTORY_PROVIDER_KEY"] +source_format = os.environ["CTX_HISTORY_SOURCE_FORMAT"] +cursor_stream = os.environ["CTX_HISTORY_CURSOR_STREAM"] +cursor_json = os.environ.get("CTX_HISTORY_CURSOR_JSON") +cursor_file = os.environ.get("CTX_HISTORY_CURSOR_FILE") +pathlib.Path({run_marker_json}).write_text("ran\n") +cursor_log = {cursor_log_py} +cursor_text = cursor_json +if not cursor_text and cursor_file: + cursor_text = pathlib.Path(cursor_file).read_text() +if cursor_log and cursor_text: + file_text = pathlib.Path(cursor_file).read_text() if cursor_file else "" + with open(cursor_log, "a", encoding="utf-8") as handle: + handle.write(cursor_text + "\n") + handle.write("cursor_file=" + file_text + "\n") + +cursor_shapes = {{ + "dorkos": {{"files": {{"/tmp/dorkos.jsonl": {{"offset": 128, "size": 128, "mtimeMs": 1}}}}}}, + "disabled-dorkos": {{"files": {{"/tmp/disabled-dorkos.jsonl": {{"offset": 128, "size": 128, "mtimeMs": 1}}}}}}, + "openclaw": {{"backend": "openclaw-file", "transcripts": {{"/tmp/openclaw.jsonl": {{"offset": 256, "size": 256, "lastRecordId": "rec-1"}}}}}}, + "hermes": {{"message_id": 7}}, + "nanoclaw": {{"sessions": {{"sess-1": 42}}}}, +}} +next_cursor = cursor_shapes[provider] +if cursor_text: + if provider == "hermes": + next_cursor = {{"message_id": 8}} + elif provider == "nanoclaw": + next_cursor = {{"sessions": {{"sess-1": 44}}}} + elif provider == "openclaw": + next_cursor = {{"backend": "openclaw-file", "transcripts": {{"/tmp/openclaw.jsonl": {{"offset": 512, "size": 512, "lastRecordId": "rec-2"}}}}}} + else: + next_cursor = {{"files": {{"/tmp/" + provider + ".jsonl": {{"offset": 256, "size": 256, "mtimeMs": 2}}}}}} + +event_index = 1 if cursor_text else 0 +phase = "incremental" if cursor_text else "initial" +observed = "2026-07-01T12:00:00Z" +cursor = {{ + "after": {{ + "stream": cursor_stream, + "cursor": json.dumps(next_cursor, separators=(",", ":")), + "observed_at": observed, + }} +}} +if cursor_text: + cursor["before"] = {{ + "stream": cursor_stream, + "cursor": cursor_text, + "observed_at": observed, + }} + +records = [ + {{"record_type": "manifest", "schema_version": "ctx-history-jsonl-v1", "producer": provider + "-fixture"}}, + {{"record_type": "source", "source_id": source_id, "provider_key": provider_key, "source_format": source_format, "observed_at": observed, "cursor": cursor, "metadata": {{"fixture_provider": provider}}}}, + {{"record_type": "session", "source_id": source_id, "session_id": provider + "-session", "started_at": "2026-07-01T11:59:00Z", "cwd": "/workspace/" + provider, "agent_type": "primary", "is_primary": True, "status": "completed"}}, + {{"record_type": "event", "source_id": source_id, "session_id": provider + "-session", "event_index": event_index, "event_id": provider + "-event-" + str(event_index), "native_cursor": phase, "event_type": "message", "role": "assistant", "occurred_at": observed, "payload": {{"text": provider + " plugin " + phase + " marker"}}, "preview": provider + " plugin " + phase + " marker"}}, +] +for record in records: + print(json.dumps(record, separators=(",", ":"))) +"#, + run_marker_json = run_marker_json, + cursor_log_py = cursor_log_py + ); + fs::write(&script, script_body).unwrap(); + let manifest = json!({ + "schema_version": 1, + "name": provider, + "display_name": format!("{provider} history"), + "version": "0.1.0", + "history_sources": [{ + "id": "default", + "provider_key": provider, + "source_id": "default", + "source_format": format!("{provider}-history-v1"), + "enabled": enabled, + "command": [python_command(), script.display().to_string(), provider], + "timeout_seconds": 10 + }] + }); + fs::write( + manifest_dir.join("ctx-history-plugin.json"), + serde_json::to_vec_pretty(&manifest).unwrap(), + ) + .unwrap(); + HistorySourcePluginFixture { + manifest_dir, + run_marker, + } +} + +fn python_command() -> String { + std::env::var("PYTHON").unwrap_or_else(|_| "python3".to_owned()) +} + +fn write_raw_history_source_plugin( + temp: &TempDir, + provider: &str, + script_body: &str, +) -> HistorySourcePluginFixture { + let manifest_dir = temp.path().join("history-plugins").join(provider); + fs::create_dir_all(&manifest_dir).unwrap(); + let script = manifest_dir.join("export.py"); + let run_marker = manifest_dir.join("ran"); + fs::write(&script, script_body).unwrap(); + let manifest = json!({ + "schema_version": 1, + "name": provider, + "history_sources": [{ + "id": "default", + "provider_key": provider, + "source_id": "default", + "source_format": format!("{provider}-history-v1"), + "enabled": false, + "command": [python_command(), script.display().to_string()], + "timeout_seconds": 10 + }] + }); + fs::write( + manifest_dir.join("ctx-history-plugin.json"), + serde_json::to_vec_pretty(&manifest).unwrap(), + ) + .unwrap(); + HistorySourcePluginFixture { + manifest_dir, + run_marker, + } +} + fn redaction_fixture(name: &str) -> String { materialized_fixture("redaction", name) } @@ -844,6 +1018,335 @@ fn import_custom_history_format_is_not_a_native_provider_importer() { assert!(stderr.contains("--all"), "{stderr}"); } +#[test] +fn history_source_plugins_are_listed_without_running() { + let temp = tempdir(); + let plugin = write_history_source_plugin(&temp, "dorkos", false, None); + + let sources = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args(["sources", "--json"]), + ); + let plugin_source = sources["sources"] + .as_array() + .unwrap() + .iter() + .find(|source| source["history_source"] == "dorkos/default") + .unwrap(); + assert_eq!(plugin_source["kind"], "history_source_plugin"); + assert_eq!(plugin_source["provider_key"], "dorkos"); + assert_eq!(plugin_source["enabled"], false); + assert!(!plugin.run_marker.exists()); +} + +#[test] +fn setup_does_not_execute_enabled_history_source_plugins() { + let temp = tempdir(); + let plugin = write_history_source_plugin(&temp, "dorkos", true, None); + + json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args(["setup", "--json", "--progress", "none"]), + ); + + assert!(!plugin.run_marker.exists()); +} + +#[test] +fn ambiguous_history_source_plugin_selector_fails_before_execution() { + let temp = tempdir(); + let plugin_root = temp.path().join("history-plugins"); + let dorkos = write_history_source_plugin_at(&plugin_root, "dorkos", false, None); + let hermes = write_history_source_plugin_at(&plugin_root, "hermes", false, None); + + let stderr = failure_stderr( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin_root) + .args([ + "import", + "--history-source", + "default", + "--progress", + "none", + ]), + ); + + assert!(stderr.contains("matched multiple sources"), "{stderr}"); + assert!(!dorkos.run_marker.exists()); + assert!(!hermes.run_marker.exists()); +} + +#[test] +fn explicit_history_source_manifest_reports_parse_errors() { + let temp = tempdir(); + let bad_manifest = temp.path().join("bad-plugin.json"); + fs::write(&bad_manifest, "{not-json").unwrap(); + + let stderr = failure_stderr(ctx(&temp).args([ + "import", + "--history-source-manifest", + bad_manifest.to_str().unwrap(), + "--progress", + "none", + ])); + + assert!( + stderr.contains("parse history source plugin manifest"), + "{stderr}" + ); +} + +#[test] +fn failed_history_source_plugin_import_does_not_leave_record_metadata() { + let temp = tempdir(); + let script = r#"#!/usr/bin/env python3 +import json +provider = "badplugin" +records = [ + {"record_type":"manifest","schema_version":"ctx-history-jsonl-v1"}, + {"record_type":"source","source_id":"default","provider_key":provider,"source_format":"badplugin-history-v1"}, + {"record_type":"event","source_id":"default","session_id":"missing","event_index":0,"event_type":"message","role":"assistant","occurred_at":"2026-07-01T12:00:00Z","preview":"should not import"} +] +for record in records: + print(json.dumps(record)) +"#; + let plugin = write_raw_history_source_plugin(&temp, "badplugin", script); + + let stderr = failure_stderr( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "import", + "--history-source", + "badplugin/default", + "--progress", + "none", + ]), + ); + + assert!(stderr.contains("import failed"), "{stderr}"); + let conn = Connection::open(temp.path().join("work.sqlite")).unwrap(); + assert_eq!( + sqlite_count(&conn, "SELECT COUNT(*) FROM history_records"), + 0 + ); + assert_eq!(sqlite_count(&conn, "SELECT COUNT(*) FROM sessions"), 0); + assert_eq!(sqlite_count(&conn, "SELECT COUNT(*) FROM events"), 0); +} + +#[test] +fn history_source_plugin_rejects_mismatched_machine_id_before_import() { + let temp = tempdir(); + let script = r#"#!/usr/bin/env python3 +import json +records = [ + {"record_type":"manifest","schema_version":"ctx-history-jsonl-v1"}, + {"record_type":"source","source_id":"default","provider_key":"machineplugin","source_format":"machineplugin-history-v1","machine_id":"other-machine"}, + {"record_type":"session","source_id":"default","session_id":"run","started_at":"2026-07-01T12:00:00Z"}, +] +for record in records: + print(json.dumps(record)) +"#; + let plugin = write_raw_history_source_plugin(&temp, "machineplugin", script); + + let stderr = failure_stderr( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "import", + "--history-source", + "machineplugin/default", + "--progress", + "none", + ]), + ); + + assert!(stderr.contains("machine_id"), "{stderr}"); + let conn = Connection::open(temp.path().join("work.sqlite")).unwrap(); + assert_eq!( + sqlite_count(&conn, "SELECT COUNT(*) FROM history_records"), + 0 + ); +} + +#[test] +fn large_history_source_plugin_cursor_uses_cursor_file_without_inline_env() { + let temp = tempdir(); + let log = temp.path().join("large-cursor.log"); + let log_json = serde_json::to_string(&log.display().to_string()).unwrap(); + let script = format!( + r#"#!/usr/bin/env python3 +import json +import os +import pathlib + +cursor_file = os.environ.get("CTX_HISTORY_CURSOR_FILE") +inline = os.environ.get("CTX_HISTORY_CURSOR_JSON") +cursor_text = pathlib.Path(cursor_file).read_text() if cursor_file else inline +if cursor_text: + with open({log_json}, "a", encoding="utf-8") as handle: + handle.write("inline=" + ("1" if inline else "0") + "\n") + handle.write("file_len=" + str(len(cursor_text)) + "\n") +next_cursor = "x" * 9000 if not cursor_text else "done" +observed = "2026-07-01T12:00:00Z" +records = [ + {{"record_type":"manifest","schema_version":"ctx-history-jsonl-v1"}}, + {{"record_type":"source","source_id":"default","provider_key":"largecursor","source_format":"largecursor-history-v1","cursor":{{"after":{{"stream":os.environ["CTX_HISTORY_CURSOR_STREAM"],"cursor":next_cursor,"observed_at":observed}}}}}}, + {{"record_type":"session","source_id":"default","session_id":"run","started_at":"2026-07-01T12:00:00Z"}}, + {{"record_type":"event","source_id":"default","session_id":"run","event_index":1 if cursor_text else 0,"event_type":"message","role":"assistant","occurred_at":observed,"preview":"large cursor marker"}}, +] +for record in records: + print(json.dumps(record)) +"# + ); + let plugin = write_raw_history_source_plugin(&temp, "largecursor", &script); + + json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "import", + "--history-source", + "largecursor/default", + "--json", + "--progress", + "none", + ]), + ); + json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "import", + "--history-source", + "largecursor/default", + "--json", + "--progress", + "none", + ]), + ); + + let log = fs::read_to_string(log).unwrap(); + assert!(log.contains("inline=0"), "{log}"); + assert!(log.contains("file_len=9000"), "{log}"); +} + +#[test] +fn import_history_source_plugin_is_searchable_and_receives_cursor() { + let temp = tempdir(); + let cursor_log = temp.path().join("cursor-log.txt"); + let plugin = write_history_source_plugin(&temp, "hermes", false, Some(&cursor_log)); + + let first = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "import", + "--history-source", + "hermes", + "--json", + "--progress", + "none", + ]), + ); + assert_eq!(first["totals"]["imported_sessions"], 1); + assert_eq!(first["totals"]["imported_events"], 1); + assert_eq!(first["sources"][0]["history_source"], "hermes/default"); + + let initial = json_output(ctx(&temp).args([ + "search", + "hermes plugin initial marker", + "--provider", + "custom", + "--refresh", + "off", + "--json", + ])); + assert!( + !initial["results"].as_array().unwrap().is_empty(), + "initial plugin import was not searchable: {initial:#}" + ); + + let second = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "import", + "--history-source", + "hermes", + "--json", + "--progress", + "none", + ]), + ); + assert_eq!(second["totals"]["imported_sessions"], 0); + assert_eq!(second["totals"]["imported_events"], 1); + + let incremental = json_output(ctx(&temp).args([ + "search", + "hermes plugin incremental marker", + "--provider", + "custom", + "--refresh", + "off", + "--json", + ])); + assert!( + !incremental["results"].as_array().unwrap().is_empty(), + "incremental plugin import was not searchable: {incremental:#}" + ); + let cursor_log = fs::read_to_string(cursor_log).unwrap(); + assert!(cursor_log.contains(r#""message_id":7"#), "{cursor_log}"); + assert!(cursor_log.contains("cursor_file="), "{cursor_log}"); +} + +#[test] +fn import_all_runs_enabled_history_source_plugins_for_external_shapes() { + let temp = tempdir(); + let plugin_root = temp.path().join("history-plugins"); + let providers = ["dorkos", "openclaw", "hermes", "nanoclaw"]; + for provider in providers { + write_history_source_plugin_at(&plugin_root, provider, true, None); + } + write_history_source_plugin_at(&plugin_root, "disabled-dorkos", false, None); + + let imported = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin_root) + .args(["import", "--all", "--json", "--progress", "none"]), + ); + assert_eq!(imported["totals"]["imported_sources"], 4); + assert_eq!(imported["totals"]["imported_sessions"], 4); + assert_eq!(imported["totals"]["imported_events"], 4); + let sources = imported["sources"].as_array().unwrap(); + for provider in providers { + assert!( + sources + .iter() + .any(|source| source["history_source"] == format!("{provider}/default")), + "missing import source for {provider}: {sources:#?}" + ); + let search = json_output(ctx(&temp).args([ + "search", + &format!("{provider} plugin initial marker"), + "--provider", + "custom", + "--refresh", + "off", + "--json", + ])); + assert!( + !search["results"].as_array().unwrap().is_empty(), + "{provider} plugin result was not searchable: {search:#}" + ); + } + assert!(!sources + .iter() + .any(|source| source["history_source"] == "disabled-dorkos/default")); +} + #[test] fn import_all_discovers_and_imports_providers_together() { let temp = tempdir(); diff --git a/crates/ctx-history-capture/src/lib.rs b/crates/ctx-history-capture/src/lib.rs index 41f33c73..e65061c4 100644 --- a/crates/ctx-history-capture/src/lib.rs +++ b/crates/ctx-history-capture/src/lib.rs @@ -1741,6 +1741,49 @@ pub fn import_custom_history_jsonl_v1( options.allow_partial_failures, &mut summary, )?; + import_custom_history_source_cursors(store, &normalization.source_cursors)?; + Ok(summary) +} + +pub fn import_custom_history_jsonl_v1_reader( + reader: impl BufRead, + store: &mut Store, + options: CustomHistoryJsonlV1ImportOptions, +) -> Result { + let normalization = normalize_custom_history_jsonl_v1_reader( + reader, + &ProviderAdapterContext { + machine_id: options.machine_id, + source_path: options.source_path, + imported_at: options.imported_at, + tool_output_mode: CodexToolOutputMode::Full, + event_mode: CodexEventImportMode::Rich, + include_notices: true, + }, + )?; + if normalization.provider.summary.failed > 0 && !options.allow_partial_failures { + return Ok(normalization.provider.summary); + } + + let mut summary = import_normalized_provider_captures( + store, + normalization.provider, + NormalizedProviderImportOptions { + history_record_id: options.history_record_id, + allow_partial_failures: options.allow_partial_failures, + persist_cursors: true, + wrap_transaction: true, + fast_event_inserts: true, + }, + )?; + import_custom_history_edges( + store, + &normalization.edges, + options.history_record_id, + options.allow_partial_failures, + &mut summary, + )?; + import_custom_history_source_cursors(store, &normalization.source_cursors)?; Ok(summary) } @@ -1756,6 +1799,14 @@ pub fn validate_custom_history_jsonl_v1(path: impl AsRef) -> Result Result { + let normalization = + normalize_custom_history_jsonl_v1_reader(reader, &ProviderAdapterContext::default())?; + Ok(normalization.provider.summary) +} + pub fn import_codex_history_jsonl( path: impl AsRef, store: &mut Store, @@ -3327,6 +3378,13 @@ const CODEX_FAST_IMPORT_PASSIVE_CHECKPOINT_MIN_BYTES: u64 = 2 * 1024 * 1024 * 10 struct CustomHistoryJsonlV1NormalizationResult { provider: ProviderNormalizationResult, edges: Vec<(usize, CustomHistoryJsonlV1EdgeImport)>, + source_cursors: Vec, +} + +#[derive(Debug, Clone)] +struct CustomHistoryJsonlV1SourceCursorImport { + machine_id: String, + checkpoint: ProviderCursorCheckpoint, } #[derive(Debug, Clone)] @@ -3350,6 +3408,13 @@ fn normalize_custom_history_jsonl_v1( ensure_regular_provider_transcript_file(path)?; let file = File::open(path)?; let reader = BufReader::new(file); + normalize_custom_history_jsonl_v1_reader(reader, context) +} + +fn normalize_custom_history_jsonl_v1_reader( + reader: impl BufRead, + context: &ProviderAdapterContext, +) -> Result { let mut summary = ProviderImportSummary::default(); let mut records = Vec::new(); @@ -3559,6 +3624,23 @@ fn normalize_custom_history_jsonl_v1( summary, ..ProviderNormalizationResult::default() }; + let mut source_cursors = Vec::new(); + for (_, source) in sources.values() { + let machine_id = source + .machine_id + .clone() + .unwrap_or_else(|| context.machine_id.clone()); + if let Some(after) = source + .cursor + .as_ref() + .and_then(|cursor| custom_history_normalized_cursor_range(source, cursor).after) + { + source_cursors.push(CustomHistoryJsonlV1SourceCursorImport { + machine_id, + checkpoint: after, + }); + } + } for (line_number, session) in sessions.values() { let source = &sources .get(&session.source_id) @@ -3609,6 +3691,7 @@ fn normalize_custom_history_jsonl_v1( Ok(CustomHistoryJsonlV1NormalizationResult { provider: result, edges: custom_edges, + source_cursors, }) } @@ -3621,6 +3704,7 @@ fn custom_history_failed_normalization( ..ProviderNormalizationResult::default() }, edges: Vec::new(), + source_cursors: Vec::new(), } } @@ -4047,15 +4131,27 @@ fn custom_history_internal_session_id( } fn custom_history_cursor_stream(source: &CtxHistoryJsonlSourceRecord) -> String { + custom_history_jsonl_v1_cursor_stream( + &source.provider_key, + &source.source_id, + &source.source_format, + ) +} + +pub fn custom_history_jsonl_v1_cursor_stream( + provider_key: &str, + source_id: &str, + source_format: &str, +) -> String { let key = custom_history_key(json!({ "schema": CTX_HISTORY_JSONL_V1_SCHEMA_VERSION, "kind": "cursor_stream", - "provider_key": source.provider_key, - "source_id": source.source_id, - "source_format": source.source_format, + "provider_key": provider_key, + "source_id": source_id, + "source_format": source_format, })); let stream_id = stable_capture_uuid(&key, "custom-cursor-stream"); - format!("provider:custom:{}:{stream_id}", source.provider_key) + format!("provider:custom:{provider_key}:{stream_id}") } fn custom_history_normalized_cursor_range( @@ -4198,6 +4294,32 @@ fn import_custom_history_edges( Ok(()) } +fn import_custom_history_source_cursors( + store: &mut Store, + cursors: &[CustomHistoryJsonlV1SourceCursorImport], +) -> Result<()> { + for cursor in cursors { + store.upsert_sync_cursor(&SyncCursor { + id: stable_capture_uuid( + &format!( + "provider-cursor:{}:{}:{}", + CaptureProvider::Custom.as_str(), + cursor.machine_id, + cursor.checkpoint.stream + ), + "provider-sync-cursor", + ), + team_id: None, + device_id: cursor.machine_id.clone(), + stream: cursor.checkpoint.stream.clone(), + cursor: cursor.checkpoint.cursor.clone(), + last_synced_at: Some(cursor.checkpoint.observed_at), + timestamps: timestamps(cursor.checkpoint.observed_at), + })?; + } + Ok(()) +} + fn collect_jsonl_paths(root: &Path, paths: &mut Vec) -> Result<()> { let metadata = fs::symlink_metadata(root)?; let file_type = metadata.file_type(); @@ -11493,6 +11615,77 @@ mod tests { assert_eq!(second.skipped_edges, 2); } + #[test] + fn custom_history_jsonl_reader_import_persists_normalized_cursor() { + let temp = tempdir(); + let mut store = Store::open(temp.path().join("work.sqlite")).unwrap(); + let input = [ + r#"{"record_type":"manifest","schema_version":"ctx-history-jsonl-v1"}"#, + r#"{"record_type":"source","source_id":"src","provider_key":"stream-agent","source_format":"stream-v1","cursor":{"after":{"stream":"native-stream","cursor":"{\"message_id\":7}","observed_at":"2026-07-01T12:00:00Z"}}}"#, + r#"{"record_type":"session","source_id":"src","session_id":"run","started_at":"2026-07-01T11:59:00Z"}"#, + r#"{"record_type":"event","source_id":"src","session_id":"run","event_index":0,"event_type":"message","role":"assistant","occurred_at":"2026-07-01T12:00:00Z","preview":"stream import marker"}"#, + ] + .join("\n"); + + let summary = import_custom_history_jsonl_v1_reader( + std::io::Cursor::new(input.into_bytes()), + &mut store, + CustomHistoryJsonlV1ImportOptions { + source_path: Some(PathBuf::from("plugin://stream-agent/default")), + imported_at: "2026-07-01T12:01:00Z".parse().unwrap(), + ..CustomHistoryJsonlV1ImportOptions::default() + }, + ) + .unwrap(); + + assert_eq!(summary.failed, 0, "{:?}", summary.failures); + assert_eq!(summary.imported_sessions, 1); + assert_eq!(summary.imported_events, 1); + let cursor = store + .get_sync_cursor( + None, + &CustomHistoryJsonlV1ImportOptions::default().machine_id, + &custom_history_jsonl_v1_cursor_stream("stream-agent", "src", "stream-v1"), + ) + .unwrap() + .unwrap(); + assert_eq!(cursor.cursor, r#"{"message_id":7}"#); + } + + #[test] + fn custom_history_jsonl_reader_persists_source_only_cursor() { + let temp = tempdir(); + let mut store = Store::open(temp.path().join("work.sqlite")).unwrap(); + let input = [ + r#"{"record_type":"manifest","schema_version":"ctx-history-jsonl-v1"}"#, + r#"{"record_type":"source","source_id":"src","provider_key":"stream-agent","source_format":"stream-v1","cursor":{"after":{"stream":"native-stream","cursor":"{\"message_id\":9}","observed_at":"2026-07-01T12:02:00Z"}}}"#, + ] + .join("\n"); + + let summary = import_custom_history_jsonl_v1_reader( + std::io::Cursor::new(input.into_bytes()), + &mut store, + CustomHistoryJsonlV1ImportOptions { + imported_at: "2026-07-01T12:03:00Z".parse().unwrap(), + ..CustomHistoryJsonlV1ImportOptions::default() + }, + ) + .unwrap(); + + assert_eq!(summary.failed, 0, "{:?}", summary.failures); + assert_eq!(summary.imported_sessions, 0); + assert_eq!(summary.imported_events, 0); + let cursor = store + .get_sync_cursor( + None, + &CustomHistoryJsonlV1ImportOptions::default().machine_id, + &custom_history_jsonl_v1_cursor_stream("stream-agent", "src", "stream-v1"), + ) + .unwrap() + .unwrap(); + assert_eq!(cursor.cursor, r#"{"message_id":9}"#); + } + #[test] fn custom_history_jsonl_malformed_import_is_atomic_by_default() { let temp = tempdir(); diff --git a/docs/cli-reference.md b/docs/cli-reference.md index 4682ae5e..c5f67d1c 100644 --- a/docs/cli-reference.md +++ b/docs/cli-reference.md @@ -28,8 +28,9 @@ ctx doctor --json - `setup` creates the data root, opens or creates `work.sqlite`, writes `config.toml` when needed, discovers known provider history locations, - catalogs Codex sessions, imports all discovered importable sources, optimizes - the local search index, and prints next steps. + catalogs Codex sessions, imports discovered native provider sources, optimizes + the local search index, and prints next steps. It does not execute + history-source plugin commands. - `setup --catalog-only` stops after discovery/cataloging. It is useful for fast inventory or troubleshooting, but it does not make history searchable. - `status` reports the ctx root, database path, config path, indexed item @@ -59,11 +60,16 @@ machine. Current rows include: - Pi session JSONL at `~/.pi/sessions.jsonl`; - native rows for supported Antigravity, Claude, OpenCode, Gemini, Cursor, Copilot CLI, and Factory AI Droid local history locations. +- local history-source plugin manifests under `$CTX_DATA_ROOT/plugins` or + `CTX_HISTORY_PLUGIN_PATH`. -Each JSON row includes `provider`, `path`, `exists`, `source_format`, `status`, -`import_support`, `native_import`, `importable`, `raw_retention`, and any -`unsupported_reason`. `sources` reads home-directory path metadata and writes -nothing to provider files or source repositories. +Native JSON rows include `provider`, `path`, `exists`, `source_format`, +`status`, `import_support`, `native_import`, `importable`, `raw_retention`, and +any `unsupported_reason`. Plugin JSON rows use +`kind: "history_source_plugin"` and include `plugin`, `history_source`, +`provider_key`, `source_id`, `manifest_path`, and `enabled`. `sources` reads +path metadata and plugin manifests, writes nothing to provider files or source +repositories, and does not execute plugin commands. ## Import @@ -82,13 +88,19 @@ ctx import --provider factory-ai-droid ctx import --path ~/.codex/sessions ctx import --provider pi --path ~/.pi/sessions.jsonl ctx import --format ctx-history-jsonl-v1 --path ./history.jsonl +ctx import --history-source dorkos +ctx import --plugin dorkos/default +ctx import --history-source-manifest ./ctx-history-plugin.json +ctx import --plugin-manifest ./ctx-history-plugin.json +ctx import --history-source hermes --reset-cursor ctx import --resume ctx import --json ctx import --progress json --json ``` `import` explicitly indexes provider history into the local SQLite store. The -normal first-run path is `ctx setup`, which already imports discovered sources. +normal first-run path is `ctx setup`, which already imports discovered native +provider sources. Use `import` to repair, re-run, resume, or target a specific provider/path. It creates the data root and default config if needed, reads provider transcript files, and writes indexed source metadata, sessions, events, searchable text, @@ -99,12 +111,23 @@ Custom history can be imported from an explicit JSONL file with remembered as a provider home; see `docs/custom-history-import-format.md` for the schema and incremental semantics. +History-source plugins are local command adapters that stream +`ctx-history-jsonl-v1` to stdout. Use `--history-source ` for an +explicit plugin import, or `--history-source-manifest ` to test a manifest +without installing it. `--plugin` and `--plugin-manifest` are aliases. +`--reset-cursor` withholds the previous plugin cursor for that run and asks the +plugin to perform a full rescan. See `docs/history-source-plugins.md`. + Import selection rules: -- with no arguments or with `--all`, import all discovered sources that exist; +- with no arguments, import discovered native sources that exist; +- with `--all`, import discovered native sources that exist and enabled + history-source plugin sources; - with `--provider`, import discovered sources for that provider; - with `--format ctx-history-jsonl-v1 --path `, import that custom history JSONL file; +- with `--history-source`, import matching local plugin sources; +- with `--history-source-manifest`, import sources from that manifest path; - with `--path`, import exactly that path; - with `--path` and no provider, parse the path as Codex format. @@ -168,11 +191,12 @@ ctx search "this current task" --include-current-session `search` defaults to `--refresh auto`, which quietly refreshes discovered native provider sources before querying indexed sessions and events. The refresh is -best-effort and keeps JSON stdout reserved for the search result object. On -large discovered sources or already-cataloged indexes, `auto` serves current -results without a foreground catch-up scan; use `--refresh strict` or -`ctx import --all` when you need a full catch-up before querying. Use -`--refresh off` to search the existing index without refreshing, or +best-effort and keeps JSON stdout reserved for the search result object. +History-source plugin commands are not executed by search refresh. On large +discovered sources or already-cataloged indexes, `auto` serves current results +without a foreground catch-up scan; use `--refresh strict` or `ctx import --all` +when you need a full catch-up before querying. Use `--refresh off` to search the +existing index without refreshing, or `--refresh strict` to fail when the pre-search refresh cannot run or import successfully. Search-only sources without native import support are searched from the existing index until they are explicitly imported through a supported diff --git a/docs/custom-history-import-format.md b/docs/custom-history-import-format.md index 70336075..704209db 100644 --- a/docs/custom-history-import-format.md +++ b/docs/custom-history-import-format.md @@ -3,16 +3,23 @@ `ctx-history-jsonl-v1` is the public JSONL format for importing session history from tools without a built-in local-history adapter. -## Transport +## Transports -Version 1 uses an explicit local file path: +The same JSONL schema can be imported from an explicit local file path: ```bash ctx import --format ctx-history-jsonl-v1 --path ./history.jsonl ``` -ctx does not discover a fixed storage location for this format. The file is not -read from stdin, and ctx does not execute exporter commands in v1. +or from a local history-source plugin command: + +```bash +ctx import --history-source my-agent +``` + +ctx does not discover a fixed storage location for this format. File imports +are explicit paths. Plugin imports are explicit local command adapters declared +by a local manifest; see `docs/history-source-plugins.md`. Each line is one JSON object. Every object has a `record_type` field with one of: @@ -203,19 +210,24 @@ Example: ## Incremental Semantics -v1 imports are explicit, local, and idempotent. On each -`ctx import --format ctx-history-jsonl-v1 --path `, ctx rescans the file -and upserts equivalent records instead of appending duplicates. +v1 imports are explicit, local, and idempotent. On each file import, ctx rescans +the file and upserts equivalent records instead of appending duplicates. On each +plugin import, ctx invokes the plugin, validates stdout atomically, and upserts +the emitted records. When a source record supplies `cursor`, ctx rewrites its storage stream under a `provider:custom::` namespace and also preserves the exporter-supplied cursor object in source metadata. Event `native_cursor` values -are also preserved. ctx does not negotiate with external exporters in v1, does -not call exporter commands, and does not request a delta range; exporter -negotiation is a follow-up capability. +are also preserved. + +For plugin imports, ctx passes the previously stored source cursor to the next +command through `CTX_HISTORY_CURSOR_JSON` and `CTX_HISTORY_CURSOR_FILE`. The +cursor string remains exporter-owned, so it can encode byte offsets, SQLite row +ids, session sequence maps, or another native high-water mark. -If an import is interrupted, run the same command again. The expected behavior -is another idempotent rescan of the same JSONL file. +If an import is interrupted, run the same command again. File imports perform +another idempotent rescan. Plugin imports receive the last successfully stored +cursor; failed plugin runs do not advance it. ## Compact Example diff --git a/docs/first-10-minutes.md b/docs/first-10-minutes.md index 0eddca2d..57b96635 100644 --- a/docs/first-10-minutes.md +++ b/docs/first-10-minutes.md @@ -26,8 +26,9 @@ ctx status --json ``` `ctx setup` creates local storage, discovers supported provider history, -catalogs Codex sessions, imports discovered sources, and optimizes the local -search index. The default root is `~/.ctx`. Use a temporary root for trials: +catalogs Codex sessions, imports discovered native provider sources, and +optimizes the local search index. It does not execute history-source plugin +commands. The default root is `~/.ctx`. Use a temporary root for trials: ```bash ctx --data-root /tmp/ctx-first-10 setup diff --git a/docs/getting-started.md b/docs/getting-started.md index 5bfd7372..4b958420 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -28,8 +28,9 @@ ctx status Setup creates the configured ctx data root, initializes SQLite, writes `config.toml` when missing, discovers known provider history paths, catalogs -Codex sessions, imports discovered sources, optimizes the local search index, -and prints next steps. The default data root is `~/.ctx`. +Codex sessions, imports discovered native provider sources, optimizes the local +search index, and prints next steps. It does not execute history-source plugin +commands. The default data root is `~/.ctx`. Use a different root when testing: diff --git a/docs/history-source-plugins.md b/docs/history-source-plugins.md new file mode 100644 index 00000000..66274bb4 --- /dev/null +++ b/docs/history-source-plugins.md @@ -0,0 +1,225 @@ +# History Source Plugins + +History source plugins let third-party tools make their local histories +searchable in ctx without ctx owning their storage schemas. + +The narrow waist is: + +1. A local manifest declares one or more history sources. +2. ctx invokes the declared command only during explicit import. +3. The command writes `ctx-history-jsonl-v1` records to stdout. +4. ctx validates and imports that stream atomically. +5. ctx passes the previous source cursor back on the next run. + +Plugins are command-line adapters, not an in-process ABI and not a hosted plugin +store. Plugin authors own their native JSONL, SQLite, or API reads. ctx owns the +manifest, cursor handoff, validation, import, and search index. + +## Install And Discover + +Put a manifest at one of: + +- `$CTX_DATA_ROOT/plugins//ctx-history-plugin.json`; +- any directory or manifest file listed in `CTX_HISTORY_PLUGIN_PATH`; +- any directory or manifest file listed in `CTX_PLUGIN_PATH`. + +`ctx sources` and `ctx sources --json` list plugin sources without executing +their commands. + +Manifest example: + +```json +{ + "schema_version": 1, + "name": "dorkos", + "display_name": "DorkOS history", + "version": "0.1.0", + "history_sources": [ + { + "id": "default", + "provider_key": "dorkos", + "source_id": "default", + "source_format": "dorkos-claude-jsonl-v1", + "enabled": true, + "command": ["ctx-history-source-dorkos", "export"], + "timeout_seconds": 300 + } + ] +} +``` + +`name`, `id`, `provider_key`, and `source_id` must be stable lowercase ASCII +identifiers. `command` is an argv array; ctx does not run it through a shell. + +`enabled: true` means `ctx import --all` may run that source. Explicit imports +can run a discovered source even when it is not enabled. + +## Import + +```bash +ctx import --history-source dorkos +ctx import --plugin dorkos +ctx import --history-source dorkos/default +ctx import --history-source-manifest ./ctx-history-plugin.json +ctx import --plugin-manifest ./ctx-history-plugin.json +ctx import --all +ctx import --history-source hermes --reset-cursor +``` + +Selectors can match plugin name, source id, `plugin/source`, `provider_key`, or +`provider_key/source_id`, but they must resolve to exactly one source before ctx +executes a command. Prefer `plugin/source` when a machine has multiple plugins. + +`--history-source-manifest` is a development path: it adds that manifest for the +current command without installing it. With no selector, ctx imports sources +from the supplied manifest path. + +`--reset-cursor` withholds the previous cursor and sets +`CTX_HISTORY_FULL_RESCAN=1`. The plugin should emit a fresh `source.cursor.after` +checkpoint if the rescan succeeds. + +`ctx setup` and search refresh do not execute plugins in this version. Run +`ctx import --history-source ` or `ctx import --all` to catch up +plugin-backed sources before searching. + +## Runtime Environment + +ctx sets these variables before invoking a plugin command: + +- `CTX_DATA_ROOT` +- `CTX_HISTORY_PLUGIN=1` +- `CTX_HISTORY_PLUGIN_NAME` +- `CTX_HISTORY_PLUGIN_MANIFEST` +- `CTX_HISTORY_SOURCE`, such as `dorkos/default` +- `CTX_HISTORY_SOURCE_ID` +- `CTX_HISTORY_PROVIDER_KEY` +- `CTX_HISTORY_SOURCE_FORMAT` +- `CTX_HISTORY_CURSOR_STREAM` +- `CTX_HISTORY_MACHINE_ID` +- `CTX_HISTORY_FULL_RESCAN`, `1` or `0` +- `CTX_HISTORY_CURSOR`, when a previous cursor exists and is small enough for + inline environment handoff +- `CTX_HISTORY_CURSOR_JSON`, same value as `CTX_HISTORY_CURSOR` when set +- `CTX_HISTORY_CURSOR_FILE`, a temporary file containing the cursor + +Use `CTX_HISTORY_CURSOR_FILE` for large native cursor maps. The file exists only +while the plugin process runs and is the reliable cursor handoff path. + +The plugin must write only `ctx-history-jsonl-v1` JSONL to stdout. Progress and +diagnostics belong on stderr. If the command exits nonzero or stdout is invalid, +ctx imports nothing from that run and does not advance the cursor. + +Plugin commands receive a limited inherited environment by default: `PATH`, +`HOME`, basic locale variables, temporary-directory variables, and XDG data or +config homes. Put provider-specific environment values in the manifest `env` +object instead of relying on the parent shell. + +## Cursor Contract + +The plugin controls the cursor string. It may be a number, an opaque token, or a +JSON string. ctx stores it under a stable custom stream derived from: + +- `provider_key` +- `source_id` +- `source_format` +- local machine id + +On the next import, ctx passes the stored `cursor.after.cursor` value back in +the runtime environment. This keeps native cursor design inside the provider +adapter: + +- file appenders can use byte offsets; +- SQLite stores can use row ids; +- split stores can use JSON maps keyed by session id, file path, or direction. + +Every plugin run should emit a `source` record matching the manifest +`provider_key`, `source_id`, and `source_format`. ctx rejects mismatches before +writing imported rows. + +## Adapter Shapes + +The local research checkouts showed four different storage models, which is why +ctx should not maintain native adapters for them. + +### DorkOS + +DorkOS currently derives history from Claude SDK JSONL files under +`~/.claude/projects//*.jsonl`. A DorkOS plugin should read those files by +byte offset and use a cursor like: + +```json +{"files":{"/home/me/.claude/projects/x/session.jsonl":{"offset":12345,"size":13000,"mtimeMs":1780000000000}}} +``` + +The plugin can enrich events with DorkOS metadata from `~/.dork/dork.db`, but +the transcript source is still the Claude JSONL file. + +### OpenClaw + +OpenClaw currently has session metadata under +`~/.openclaw/agents//sessions/sessions.json` and transcript JSONL +files beside it. A plugin should use OpenClaw's session accessor where possible, +resolve transcript paths, and cursor by byte offset: + +```json +{"backend":"openclaw-file","transcripts":{"/home/me/.openclaw/agents/a/sessions/s.jsonl":{"offset":456,"size":900,"lastRecordId":"rec-2"}}} +``` + +If OpenClaw flips storage to SQLite, the OpenClaw-owned plugin can keep the same +ctx stdout contract while changing its native reader. + +### Hermes + +Hermes Agent stores canonical history in `~/.hermes/state.db`. A Hermes plugin +should read `sessions` and `messages` read-only, order by `messages.id`, and +cursor by the maximum message row id: + +```json +{"message_id":1234} +``` + +Session metadata-only changes may need a second cursor if Hermes exposes a +reliable session update high-water mark. + +### NanoClaw + +NanoClaw uses a central `data/v2.db` plus per-session inbound and outbound +SQLite databases under `data/v2-sessions///`. +Inbound messages use even `seq` values and outbound messages use odd `seq` +values. A generic NanoClaw plugin can cursor by per-session sequence: + +```json +{"sessions":{"sess-abc":42,"sess-def":8}} +``` + +Provider-specific NanoClaw plugins can instead read mounted provider state, such +as Claude JSONL, when they need full internal tool/thinking events. + +## Minimal Plugin Pseudocode + +```python +import json, os, sqlite3, sys + +cursor = json.loads(os.environ.get("CTX_HISTORY_CURSOR_JSON") or "{}") +after_message_id = cursor.get("message_id", 0) +db = sqlite3.connect(os.path.expanduser("~/.hermes/state.db")) + +print(json.dumps({"record_type": "manifest", "schema_version": "ctx-history-jsonl-v1"})) +print(json.dumps({ + "record_type": "source", + "source_id": os.environ["CTX_HISTORY_SOURCE_ID"], + "provider_key": os.environ["CTX_HISTORY_PROVIDER_KEY"], + "source_format": os.environ["CTX_HISTORY_SOURCE_FORMAT"], + "cursor": { + "after": { + "stream": os.environ["CTX_HISTORY_CURSOR_STREAM"], + "cursor": json.dumps({"message_id": after_message_id}), + "observed_at": "2026-07-01T12:00:00Z" + } + } +})) + +for row in db.execute("SELECT id, session_id, role, content, timestamp FROM messages WHERE id > ? ORDER BY id", (after_message_id,)): + # Emit session records as needed, then event records with stable event_index. + pass +``` diff --git a/docs/providers.md b/docs/providers.md index 59f69cee..b184c881 100644 --- a/docs/providers.md +++ b/docs/providers.md @@ -26,10 +26,12 @@ The current CLI imports local history for: These are built-in provider adapters for native local history. The custom history format is separate: `ctx import --format ctx-history-jsonl-v1 --path -` reads an explicit JSONL interchange file from any exporter. It is -stored internally under the bounded provider `custom` while preserving the -exporter's `provider_key`, `source_id`, and `session_id` as metadata and ID -namespace components. It is not auto-discovered by `ctx sources`. +` reads an explicit JSONL interchange file from any exporter, and +history-source plugins can stream the same format from local adapter commands. +Custom history is stored internally under the bounded provider `custom` while +preserving the exporter's `provider_key`, `source_id`, and `session_id` as +metadata and ID namespace components. File imports are not auto-discovered; +local plugin manifests are listed by `ctx sources`. Use `ctx sources` for the truth on the current machine: diff --git a/docs/search.md b/docs/search.md index 95666834..52c24265 100644 --- a/docs/search.md +++ b/docs/search.md @@ -103,13 +103,14 @@ are intentionally looking for material from the active session tree. `--refresh` defaults to `auto`. `auto` attempts a best-effort pre-search import of discovered native provider sources and serves the existing index if that -refresh fails. On large discovered sources or already-cataloged indexes, `auto` -serves current results without a foreground catch-up scan; use -`--refresh strict` or `ctx import --all` when you need a full catch-up before -querying. `off` skips the pre-search refresh. `strict` fails the search if the -refresh cannot run or import successfully. Search-only sources without native -import support are searched from the existing index until they are explicitly -imported through a supported path. +refresh fails. Search refresh does not execute history-source plugin commands. +On large discovered sources or already-cataloged indexes, `auto` serves current +results without a foreground catch-up scan; use `--refresh strict` or +`ctx import --all` when you need a full catch-up before querying. `off` skips +the pre-search refresh. `strict` fails the search if the refresh cannot run or +import successfully. Search-only sources without native import support are +searched from the existing index until they are explicitly imported through a +supported path. Use `--refresh off` for a strictly read-only search over the existing ctx index. This avoids provider imports and avoids updating the ctx SQLite store. diff --git a/docs/storage.md b/docs/storage.md index 300e5f11..cb41c0ec 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -76,8 +76,8 @@ analytics marker described under network behavior. | --- | --- | --- | | `ctx setup` | provider transcript files and home path metadata for source discovery | data root, `work.sqlite`, `config.toml`, and SQLite index | | `ctx status` | data root metadata and existing SQLite store | none | -| `ctx sources` | known provider paths under the user's home | none | -| `ctx import` | provider transcript files and path metadata, or the explicit custom history JSONL file passed with `--format ctx-history-jsonl-v1 --path` | data root, `config.toml` if missing, and SQLite index | +| `ctx sources` | known provider paths under the user's home and local history-source plugin manifests | none | +| `ctx import` | provider transcript files and path metadata, the explicit custom history JSONL file passed with `--format ctx-history-jsonl-v1 --path`, or stdout from an explicit history-source plugin command | data root, `config.toml` if missing, and SQLite index | | `ctx show` | SQLite index | selected `--out` path for `show session` when provided | | `ctx locate` | SQLite index and raw source path metadata | none | | `ctx search` | native provider transcript files, path metadata, and SQLite index | SQLite index for newly discovered native provider history | @@ -127,6 +127,7 @@ ctx import --all ctx import --resume ctx import --path ~/.codex/sessions ctx import --format ctx-history-jsonl-v1 --path ./history.jsonl +ctx import --history-source dorkos ``` Current adapters are safe to re-run. They rescan sources idempotently and keep @@ -134,8 +135,10 @@ source paths or cursors when available. Custom history JSONL imports follow the same v1 lifecycle: ctx rescans the explicit file, upserts already-imported records, stores supplied source cursor metadata under ctx-owned custom cursor streams, and preserves event native -cursors. The path is not added to `config.toml` or treated as a fixed provider -location. +cursors. History-source plugins receive the previous stored cursor on each +explicit import and stream the same JSONL format to stdout. Failed plugin runs +do not advance cursors. Explicit file paths and plugin manifests are not added +to `config.toml` or treated as fixed provider homes. ## Upgrade Reindexing From b10b048ded5d6d5130bc87f42df2bdcedf1f9864 Mon Sep 17 00:00:00 2001 From: luca-ctx <216224554+luca-ctx@users.noreply.github.com> Date: Wed, 1 Jul 2026 15:12:19 -0500 Subject: [PATCH 2/2] Add search refresh for history source plugins --- crates/ctx-cli/src/history_source_plugins.rs | 235 ++++++++++-- crates/ctx-cli/src/main.rs | 58 ++- crates/ctx-cli/tests/cli.rs | 368 ++++++++++++++++++- docs/cli-reference.md | 20 +- docs/first-10-minutes.md | 5 +- docs/history-source-plugins.md | 21 +- docs/product-contract.md | 9 +- docs/search.md | 21 +- 8 files changed, 653 insertions(+), 84 deletions(-) diff --git a/crates/ctx-cli/src/history_source_plugins.rs b/crates/ctx-cli/src/history_source_plugins.rs index 50ed1452..530c8262 100644 --- a/crates/ctx-cli/src/history_source_plugins.rs +++ b/crates/ctx-cli/src/history_source_plugins.rs @@ -2,15 +2,15 @@ use std::{ collections::{BTreeMap, BTreeSet}, env, fs::{self, OpenOptions}, - io::{Read, Write}, + io::{ErrorKind, Read, Write}, path::{Path, PathBuf}, - process::{Command, Stdio}, + process::{Child, ChildStderr, ChildStdout, Command, ExitStatus, Stdio}, thread, time::{Duration, Instant}, }; #[cfg(unix)] -use std::os::unix::fs::OpenOptionsExt; +use std::os::unix::{fs::OpenOptionsExt, io::AsRawFd}; use anyhow::{anyhow, Context, Result}; use serde::Deserialize; @@ -19,6 +19,8 @@ use uuid::Uuid; const PLUGIN_MANIFEST_FILE: &str = "ctx-history-plugin.json"; const LEGACY_PLUGIN_MANIFEST_FILE: &str = "plugin.json"; const DEFAULT_PLUGIN_TIMEOUT_SECONDS: u64 = 300; +const MAX_PLUGIN_STDOUT_BYTES: usize = 64 * 1024 * 1024; +const MAX_PLUGIN_STDERR_BYTES: usize = 256 * 1024; const MAX_PLUGIN_STDERR_SNIPPET_BYTES: usize = 4096; const MAX_INLINE_CURSOR_ENV_BYTES: usize = 8192; const SAFE_PLUGIN_ENV: &[&str] = &[ @@ -231,21 +233,121 @@ pub fn run_history_source_plugin( }); } }; - let mut stdout = child + let stdout = child .stdout .take() .context("history source plugin stdout was not piped")?; - let mut stderr = child + let stderr = child .stderr .take() .context("history source plugin stderr was not piped")?; + let run_result = collect_child_output_with_timeout( + &mut child, + stdout, + stderr, + source.timeout, + &source.label(), + ); + cleanup_cursor_file(cursor_file.as_ref()); + let (status, stdout, stderr) = run_result?; + let stderr = String::from_utf8_lossy(&stderr).trim().to_owned(); + if !status.success() { + let detail = if stderr.is_empty() { + format!("exit status {status}") + } else { + format!("exit status {status}: {}", stderr_snippet(&stderr)) + }; + return Err(anyhow!( + "history source plugin {} failed: {detail}", + source.label() + )); + } + Ok(HistorySourcePluginRun { stdout, stderr }) +} + +#[cfg(unix)] +fn collect_child_output_with_timeout( + child: &mut Child, + mut stdout: ChildStdout, + mut stderr: ChildStderr, + timeout: Duration, + source_label: &str, +) -> Result<(ExitStatus, Vec, Vec)> { + set_nonblocking(stdout.as_raw_fd())?; + set_nonblocking(stderr.as_raw_fd())?; + + let started = Instant::now(); + let mut status = None; + let mut stdout_open = true; + let mut stderr_open = true; + let mut stdout_bytes = Vec::new(); + let mut stderr_bytes = Vec::new(); + loop { + if stdout_open { + read_available_with_limit( + &mut stdout, + &mut stdout_bytes, + &mut stdout_open, + MAX_PLUGIN_STDOUT_BYTES, + "stdout", + source_label, + ) + .inspect_err(|_| { + let _ = child.kill(); + let _ = child.wait(); + })?; + } + if stderr_open { + read_available_with_limit( + &mut stderr, + &mut stderr_bytes, + &mut stderr_open, + MAX_PLUGIN_STDERR_BYTES, + "stderr", + source_label, + ) + .inspect_err(|_| { + let _ = child.kill(); + let _ = child.wait(); + })?; + } + if status.is_none() { + status = child.try_wait()?; + } + if let Some(status) = status { + if !stdout_open && !stderr_open { + return Ok((status, stdout_bytes, stderr_bytes)); + } + } + if started.elapsed() >= timeout { + if status.is_none() { + let _ = child.kill(); + let _ = child.wait(); + } + return Err(anyhow!( + "history source plugin {source_label} timed out after {}s", + timeout.as_secs() + )); + } + thread::sleep(Duration::from_millis(25)); + } +} + +#[cfg(not(unix))] +fn collect_child_output_with_timeout( + child: &mut Child, + stdout: ChildStdout, + stderr: ChildStderr, + timeout: Duration, + source_label: &str, +) -> Result<(ExitStatus, Vec, Vec)> { + let stdout_source = source_label.to_owned(); let stdout_handle = thread::spawn(move || { - let mut bytes = Vec::new(); - stdout.read_to_end(&mut bytes).map(|_| bytes) + read_pipe_with_limit(stdout, MAX_PLUGIN_STDOUT_BYTES, "stdout", &stdout_source) }); + let stderr_source = source_label.to_owned(); let stderr_handle = thread::spawn(move || { - let mut bytes = Vec::new(); - stderr.read_to_end(&mut bytes).map(|_| bytes) + read_pipe_with_limit(stderr, MAX_PLUGIN_STDERR_BYTES, "stderr", &stderr_source) }); let started = Instant::now(); @@ -253,14 +355,12 @@ pub fn run_history_source_plugin( if let Some(status) = child.try_wait()? { break status; } - if started.elapsed() >= source.timeout { + if started.elapsed() >= timeout { let _ = child.kill(); let _ = child.wait(); - cleanup_cursor_file(cursor_file.as_ref()); return Err(anyhow!( - "history source plugin {} timed out after {}s", - source.label(), - source.timeout.as_secs() + "history source plugin {source_label} timed out after {}s", + timeout.as_secs() )); } thread::sleep(Duration::from_millis(25)); @@ -272,20 +372,77 @@ pub fn run_history_source_plugin( let stderr = stderr_handle .join() .map_err(|_| anyhow!("history source plugin stderr reader panicked"))??; - cleanup_cursor_file(cursor_file.as_ref()); - let stderr = String::from_utf8_lossy(&stderr).trim().to_owned(); - if !status.success() { - let detail = if stderr.is_empty() { - format!("exit status {status}") - } else { - format!("exit status {status}: {}", stderr_snippet(&stderr)) - }; - return Err(anyhow!( - "history source plugin {} failed: {detail}", - source.label() - )); + Ok((status, stdout, stderr)) +} + +#[cfg(unix)] +fn set_nonblocking(fd: std::os::fd::RawFd) -> Result<()> { + let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + if flags < 0 { + return Err(std::io::Error::last_os_error()).context("read plugin pipe flags"); + } + let result = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) }; + if result < 0 { + return Err(std::io::Error::last_os_error()).context("set plugin pipe nonblocking"); + } + Ok(()) +} + +#[cfg(unix)] +fn read_available_with_limit( + reader: &mut R, + bytes: &mut Vec, + open: &mut bool, + max_bytes: usize, + name: &str, + source_label: &str, +) -> Result<()> { + let mut buffer = [0u8; 8192]; + loop { + match reader.read(&mut buffer) { + Ok(0) => { + *open = false; + return Ok(()); + } + Ok(count) => { + if bytes.len().saturating_add(count) > max_bytes { + return Err(anyhow!( + "history source plugin {source_label} {name} exceeded {max_bytes} byte limit" + )); + } + bytes.extend_from_slice(&buffer[..count]); + } + Err(err) if err.kind() == ErrorKind::WouldBlock => return Ok(()), + Err(err) if err.kind() == ErrorKind::Interrupted => continue, + Err(err) => { + return Err(err) + .with_context(|| format!("read history source plugin {source_label} {name}")) + } + } + } +} + +#[cfg(any(test, not(unix)))] +fn read_pipe_with_limit( + mut reader: R, + max_bytes: usize, + name: &str, + source_label: &str, +) -> Result> { + let mut bytes = Vec::new(); + let mut buffer = [0u8; 8192]; + loop { + let count = reader.read(&mut buffer)?; + if count == 0 { + return Ok(bytes); + } + if bytes.len().saturating_add(count) > max_bytes { + return Err(anyhow!( + "history source plugin {source_label} {name} exceeded {max_bytes} byte limit" + )); + } + bytes.extend_from_slice(&buffer[..count]); } - Ok(HistorySourcePluginRun { stdout, stderr }) } fn inherit_safe_plugin_env(command: &mut Command) { @@ -504,3 +661,27 @@ fn stderr_snippet(value: &str) -> String { } snippet } + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + #[test] + fn read_pipe_with_limit_accepts_output_at_limit() { + let bytes = read_pipe_with_limit(Cursor::new(b"abcd"), 4, "stdout", "plugin/default") + .expect("output at limit should pass"); + assert_eq!(bytes, b"abcd"); + } + + #[test] + fn read_pipe_with_limit_rejects_output_over_limit() { + let err = read_pipe_with_limit(Cursor::new(b"abcde"), 4, "stdout", "plugin/default") + .expect_err("output over limit should fail"); + assert!( + err.to_string() + .contains("history source plugin plugin/default stdout exceeded 4 byte limit"), + "{err}" + ); + } +} diff --git a/crates/ctx-cli/src/main.rs b/crates/ctx-cli/src/main.rs index 0c9a5e10..19233750 100644 --- a/crates/ctx-cli/src/main.rs +++ b/crates/ctx-cli/src/main.rs @@ -312,7 +312,7 @@ struct SearchArgs { value_enum, default_value_t = RefreshArg::Auto, help = "Pre-search refresh behavior: auto, off, or strict", - long_help = "Pre-search refresh behavior. auto best-effort refreshes discovered native provider sources and serves the existing index if refresh fails; off searches the existing index only; strict fails if the refresh cannot run or import successfully." + long_help = "Pre-search refresh behavior. auto best-effort refreshes discovered native provider sources and enabled auto history-source plugins, then serves the existing index if refresh fails; off searches the existing index only; strict fails if the refresh cannot run or import successfully." )] refresh: RefreshArg, #[arg( @@ -4296,16 +4296,27 @@ fn refresh_before_search(args: &SearchArgs, data_root: &Path) -> Result sources, + Err(err) if args.refresh == RefreshArg::Auto => { + return Ok(SearchRefreshReport::failed( + RefreshArg::Auto, + sources.len(), + error_summary(&err), + )); + } + Err(err) => return Err(err.context("search refresh failed")), + }; + if sources.is_empty() && plugin_sources.is_empty() { if args.refresh == RefreshArg::Strict { return Err(anyhow!( - "strict search refresh found no supported discovered native provider sources; use --refresh off to search the existing index" + "strict search refresh found no supported discovered native provider or enabled auto history-source plugin sources; use --refresh off to search the existing index" )); } return Ok(SearchRefreshReport::skipped(args.refresh, "no_sources")); } - let source_count = sources.len(); - match refresh_sources_for_search(data_root, sources, args.refresh, args.json) { + let source_count = sources.len().saturating_add(plugin_sources.len()); + match refresh_sources_for_search(data_root, sources, plugin_sources, args.refresh, args.json) { Ok(totals) => Ok(SearchRefreshReport::completed( args.refresh, source_count, @@ -4340,9 +4351,23 @@ fn search_refresh_sources(provider: Option) -> Vec { .collect() } +fn search_refresh_plugin_sources( + data_root: &Path, + provider: Option, +) -> Result> { + if !matches!(provider, None | Some(ProviderArg::Custom)) { + return Ok(Vec::new()); + } + Ok(discover_history_source_plugins(data_root, &[])? + .into_iter() + .filter(|source| source.enabled && source.refresh == HistorySourcePluginRefresh::Auto) + .collect()) +} + fn refresh_sources_for_search( data_root: &Path, sources: Vec, + plugin_sources: Vec, refresh: RefreshArg, json_output: bool, ) -> Result { @@ -4353,7 +4378,7 @@ fn refresh_sources_for_search( .into_iter() .map(|source| (source, SourceStats::default())) .collect::>(); - if planned_sources.is_empty() { + if planned_sources.is_empty() && plugin_sources.is_empty() { return Ok(ImportTotals::default()); } @@ -4438,6 +4463,27 @@ fn refresh_sources_for_search( } } + if !plugin_sources.is_empty() { + let mut store = Store::open(&db_path)?; + for plugin_source in plugin_sources { + progress.message( + "refreshing", + format!("running history source plugin {}", plugin_source.label()), + ); + let (summary, stats) = + import_history_source_plugin(&mut store, &plugin_source, data_root, false) + .with_context(|| { + format!("refresh history source plugin {}", plugin_source.label()) + })?; + totals.add(&summary, &stats); + progress.done( + "refreshing", + format!("refreshed history source plugin {}", plugin_source.label()), + 0, + ); + } + } + Store::open(&db_path)?.checkpoint_wal_truncate_if_larger_than(WAL_TRUNCATE_MIN_BYTES)?; Ok(totals) } diff --git a/crates/ctx-cli/tests/cli.rs b/crates/ctx-cli/tests/cli.rs index 3a059ab3..c49d1026 100644 --- a/crates/ctx-cli/tests/cli.rs +++ b/crates/ctx-cli/tests/cli.rs @@ -47,10 +47,21 @@ fn write_history_source_plugin( enabled: bool, cursor_log: Option<&Path>, ) -> HistorySourcePluginFixture { - write_history_source_plugin_at( + write_history_source_plugin_with_refresh(temp, provider, enabled, None, cursor_log) +} + +fn write_history_source_plugin_with_refresh( + temp: &TempDir, + provider: &str, + enabled: bool, + refresh: Option<&str>, + cursor_log: Option<&Path>, +) -> HistorySourcePluginFixture { + write_history_source_plugin_at_with_refresh( &temp.path().join("history-plugins"), provider, enabled, + refresh, cursor_log, ) } @@ -60,6 +71,16 @@ fn write_history_source_plugin_at( provider: &str, enabled: bool, cursor_log: Option<&Path>, +) -> HistorySourcePluginFixture { + write_history_source_plugin_at_with_refresh(root, provider, enabled, None, cursor_log) +} + +fn write_history_source_plugin_at_with_refresh( + root: &Path, + provider: &str, + enabled: bool, + refresh: Option<&str>, + cursor_log: Option<&Path>, ) -> HistorySourcePluginFixture { let manifest_dir = root.join(provider); fs::create_dir_all(&manifest_dir).unwrap(); @@ -145,20 +166,24 @@ for record in records: cursor_log_py = cursor_log_py ); fs::write(&script, script_body).unwrap(); + let mut source_manifest = json!({ + "id": "default", + "provider_key": provider, + "source_id": "default", + "source_format": format!("{provider}-history-v1"), + "enabled": enabled, + "command": [python_command(), script.display().to_string(), provider], + "timeout_seconds": 10 + }); + if let Some(refresh) = refresh { + source_manifest["refresh"] = json!(refresh); + } let manifest = json!({ "schema_version": 1, "name": provider, "display_name": format!("{provider} history"), "version": "0.1.0", - "history_sources": [{ - "id": "default", - "provider_key": provider, - "source_id": "default", - "source_format": format!("{provider}-history-v1"), - "enabled": enabled, - "command": [python_command(), script.display().to_string(), provider], - "timeout_seconds": 10 - }] + "history_sources": [source_manifest] }); fs::write( manifest_dir.join("ctx-history-plugin.json"), @@ -179,24 +204,56 @@ fn write_raw_history_source_plugin( temp: &TempDir, provider: &str, script_body: &str, +) -> HistorySourcePluginFixture { + write_raw_history_source_plugin_with_options(temp, provider, script_body, false, None) +} + +fn write_raw_history_source_plugin_with_options( + temp: &TempDir, + provider: &str, + script_body: &str, + enabled: bool, + refresh: Option<&str>, +) -> HistorySourcePluginFixture { + write_raw_history_source_plugin_with_options_and_timeout( + temp, + provider, + script_body, + enabled, + refresh, + 10, + ) +} + +fn write_raw_history_source_plugin_with_options_and_timeout( + temp: &TempDir, + provider: &str, + script_body: &str, + enabled: bool, + refresh: Option<&str>, + timeout_seconds: u64, ) -> HistorySourcePluginFixture { let manifest_dir = temp.path().join("history-plugins").join(provider); fs::create_dir_all(&manifest_dir).unwrap(); let script = manifest_dir.join("export.py"); let run_marker = manifest_dir.join("ran"); fs::write(&script, script_body).unwrap(); + let mut source_manifest = json!({ + "id": "default", + "provider_key": provider, + "source_id": "default", + "source_format": format!("{provider}-history-v1"), + "enabled": enabled, + "command": [python_command(), script.display().to_string()], + "timeout_seconds": timeout_seconds + }); + if let Some(refresh) = refresh { + source_manifest["refresh"] = json!(refresh); + } let manifest = json!({ "schema_version": 1, "name": provider, - "history_sources": [{ - "id": "default", - "provider_key": provider, - "source_id": "default", - "source_format": format!("{provider}-history-v1"), - "enabled": false, - "command": [python_command(), script.display().to_string()], - "timeout_seconds": 10 - }] + "history_sources": [source_manifest] }); fs::write( manifest_dir.join("ctx-history-plugin.json"), @@ -3228,6 +3285,279 @@ fn search_refresh_off_serves_existing_index_without_importing() { assert_search_provider_oracle(&fresh, "codex", "onboarding", 1, "message"); } +#[test] +fn search_refresh_auto_runs_enabled_auto_history_source_plugins_incrementally() { + let temp = tempdir(); + let cursor_log = temp.path().join("cursor-log.txt"); + let plugin = write_history_source_plugin_with_refresh( + &temp, + "hermes", + true, + Some("auto"), + Some(&cursor_log), + ); + + let initial = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "search", + "hermes plugin initial marker", + "--provider", + "custom", + "--json", + ]), + ); + assert_eq!(initial["freshness"]["mode"], "auto"); + assert_eq!(initial["freshness"]["status"], "completed"); + assert_eq!(initial["freshness"]["source_count"], 1); + assert_eq!(initial["freshness"]["totals"]["imported_sources"], 1); + assert_eq!(initial["freshness"]["totals"]["imported_sessions"], 1); + assert_eq!(initial["freshness"]["totals"]["imported_events"], 1); + assert!( + !initial["results"].as_array().unwrap().is_empty(), + "initial plugin refresh was not searchable before query: {initial:#}" + ); + assert!(plugin.run_marker.exists()); + + fs::remove_file(&plugin.run_marker).unwrap(); + let incremental = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "search", + "hermes plugin incremental marker", + "--provider", + "custom", + "--json", + ]), + ); + assert_eq!(incremental["freshness"]["mode"], "auto"); + assert_eq!(incremental["freshness"]["status"], "completed"); + assert_eq!(incremental["freshness"]["source_count"], 1); + assert_eq!(incremental["freshness"]["totals"]["imported_sources"], 1); + assert_eq!(incremental["freshness"]["totals"]["imported_events"], 1); + assert!( + !incremental["results"].as_array().unwrap().is_empty(), + "incremental plugin refresh was not searchable before query: {incremental:#}" + ); + assert!(plugin.run_marker.exists()); + + let cursor_log = fs::read_to_string(cursor_log).unwrap(); + assert!(cursor_log.contains(r#""message_id":7"#), "{cursor_log}"); + assert!(cursor_log.contains("cursor_file="), "{cursor_log}"); +} + +#[test] +fn search_refresh_auto_combines_native_sources_and_auto_history_source_plugins() { + let temp = tempdir(); + let fixture = PathBuf::from(provider_history_fixture("codex-sessions")); + copy_dir_all(&fixture, &temp.path().join(".codex").join("sessions")); + let plugin = + write_history_source_plugin_with_refresh(&temp, "hermes", true, Some("auto"), None); + + let search = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args(["search", "hermes plugin initial marker", "--json"]), + ); + + assert_eq!(search["freshness"]["mode"], "auto"); + assert_eq!(search["freshness"]["status"], "completed"); + assert_eq!(search["freshness"]["source_count"], 2); + assert!( + search["freshness"]["totals"]["imported_sessions"] + .as_u64() + .unwrap() + >= 3 + ); + assert!( + !search["results"].as_array().unwrap().is_empty(), + "combined refresh did not make plugin history searchable: {search:#}" + ); + assert!(plugin.run_marker.exists()); +} + +#[test] +fn search_refresh_provider_filter_does_not_execute_history_source_plugins() { + let temp = tempdir(); + let fixture = PathBuf::from(provider_history_fixture("codex-sessions")); + copy_dir_all(&fixture, &temp.path().join(".codex").join("sessions")); + let plugin = + write_history_source_plugin_with_refresh(&temp, "hermes", true, Some("auto"), None); + + let search = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args(["search", "onboarding", "--provider", "codex", "--json"]), + ); + + assert_eq!(search["freshness"]["mode"], "auto"); + assert_eq!(search["freshness"]["status"], "completed"); + assert_eq!(search["freshness"]["source_count"], 1); + assert_search_provider_oracle(&search, "codex", "onboarding", 1, "message"); + assert!(!plugin.run_marker.exists()); +} + +#[test] +fn search_refresh_off_does_not_execute_history_source_plugins() { + let temp = tempdir(); + let plugin = + write_history_source_plugin_with_refresh(&temp, "hermes", true, Some("auto"), None); + + let search = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "search", + "hermes plugin initial marker", + "--provider", + "custom", + "--refresh", + "off", + "--json", + ]), + ); + + assert_eq!(search["freshness"]["mode"], "off"); + assert_eq!(search["freshness"]["status"], "skipped"); + assert!(search["results"].as_array().unwrap().is_empty()); + assert!(!plugin.run_marker.exists()); +} + +#[test] +fn search_refresh_auto_skips_disabled_or_manual_history_source_plugins() { + let temp = tempdir(); + let plugin_root = temp.path().join("history-plugins"); + let manual = write_history_source_plugin_at_with_refresh( + &plugin_root, + "hermes", + true, + Some("manual"), + None, + ); + let disabled = write_history_source_plugin_at_with_refresh( + &plugin_root, + "dorkos", + false, + Some("auto"), + None, + ); + + let search = json_output( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin_root) + .args([ + "search", + "plugin initial marker", + "--provider", + "custom", + "--json", + ]), + ); + + assert_eq!(search["freshness"]["mode"], "auto"); + assert_eq!(search["freshness"]["status"], "no_sources"); + assert_eq!(search["freshness"]["source_count"], 0); + assert!(search["results"].as_array().unwrap().is_empty()); + assert!(!manual.run_marker.exists()); + assert!(!disabled.run_marker.exists()); +} + +#[test] +fn search_refresh_strict_fails_on_history_source_plugin_failure() { + let temp = tempdir(); + let script = r#"#!/usr/bin/env python3 +import sys +print("plugin exploded", file=sys.stderr) +sys.exit(23) +"#; + let plugin = write_raw_history_source_plugin_with_options( + &temp, + "badplugin", + script, + true, + Some("auto"), + ); + + let stderr = failure_stderr( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "search", + "anything", + "--provider", + "custom", + "--refresh", + "strict", + "--json", + ]), + ); + + assert!(stderr.contains("search refresh failed"), "{stderr}"); + assert!( + stderr.contains("history source plugin badplugin/default failed"), + "{stderr}" + ); + assert!(stderr.contains("plugin exploded"), "{stderr}"); +} + +#[test] +fn search_refresh_strict_times_out_when_plugin_helper_keeps_stdout_open() { + let temp = tempdir(); + let script = r#"#!/usr/bin/env python3 +import json +import os +import subprocess + +observed = "2026-07-01T12:00:00Z" +source_id = os.environ["CTX_HISTORY_SOURCE_ID"] +provider_key = os.environ["CTX_HISTORY_PROVIDER_KEY"] +source_format = os.environ["CTX_HISTORY_SOURCE_FORMAT"] +cursor_stream = os.environ["CTX_HISTORY_CURSOR_STREAM"] +records = [ + {"record_type": "manifest", "schema_version": "ctx-history-jsonl-v1"}, + {"record_type": "source", "source_id": source_id, "provider_key": provider_key, "source_format": source_format, "observed_at": observed, "cursor": {"after": {"stream": cursor_stream, "cursor": json.dumps({"seq": 1}), "observed_at": observed}}}, + {"record_type": "session", "source_id": source_id, "session_id": "hanging-session", "started_at": observed, "agent_type": "primary", "is_primary": True, "status": "completed"}, + {"record_type": "event", "source_id": source_id, "session_id": "hanging-session", "event_index": 0, "event_type": "message", "role": "assistant", "occurred_at": observed, "payload": {"text": "hanging plugin marker"}, "preview": "hanging plugin marker"}, +] +for record in records: + print(json.dumps(record, separators=(",", ":")), flush=True) +subprocess.Popen(["sh", "-c", "sleep 5"]) +"#; + let plugin = write_raw_history_source_plugin_with_options_and_timeout( + &temp, + "hanging", + script, + true, + Some("auto"), + 1, + ); + + let started = Instant::now(); + let stderr = failure_stderr( + ctx(&temp) + .env("CTX_HISTORY_PLUGIN_PATH", &plugin.manifest_dir) + .args([ + "search", + "hanging plugin marker", + "--provider", + "custom", + "--refresh", + "strict", + "--json", + ]), + ); + assert!( + started.elapsed() < Duration::from_secs(3), + "plugin timeout did not bound pipe draining: {stderr}" + ); + assert!( + stderr.contains("history source plugin hanging/default timed out after 1s"), + "{stderr}" + ); +} + #[test] fn search_refresh_auto_imports_fresh_work_despite_large_existing_catalog() { let temp = tempdir(); diff --git a/docs/cli-reference.md b/docs/cli-reference.md index c5f67d1c..d0cf95f0 100644 --- a/docs/cli-reference.md +++ b/docs/cli-reference.md @@ -190,13 +190,12 @@ ctx search "this current task" --include-current-session ``` `search` defaults to `--refresh auto`, which quietly refreshes discovered native -provider sources before querying indexed sessions and events. The refresh is -best-effort and keeps JSON stdout reserved for the search result object. -History-source plugin commands are not executed by search refresh. On large -discovered sources or already-cataloged indexes, `auto` serves current results -without a foreground catch-up scan; use `--refresh strict` or `ctx import --all` -when you need a full catch-up before querying. Use `--refresh off` to search the -existing index without refreshing, or +provider sources and enabled auto history-source plugins before querying indexed +sessions and events. The refresh is best-effort and keeps JSON stdout reserved +for the search result object. On large discovered sources or already-cataloged +indexes, `auto` serves current results without a foreground catch-up scan; use +`--refresh strict` or `ctx import --all` when you need a full catch-up before +querying. Use `--refresh off` to search the existing index without refreshing, or `--refresh strict` to fail when the pre-search refresh cannot run or import successfully. Search-only sources without native import support are searched from the existing index until they are explicitly imported through a supported @@ -254,9 +253,10 @@ CLI provider filters use kebab-case names. JSON output and stable SQL views use provider IDs in ctx output; multiword IDs may be snake_case, such as `copilot_cli` or `factory_ai_droid`. -`search` reads discovered native provider files for pre-search refresh plus -SQLite, and may write newly discovered native provider history into the local -index before querying. +`search` reads discovered native provider files and runs enabled auto +history-source plugin commands for pre-search refresh, then queries SQLite. It +may write newly discovered provider or plugin history into the local index before +querying. ## SQL diff --git a/docs/first-10-minutes.md b/docs/first-10-minutes.md index 57b96635..c548670b 100644 --- a/docs/first-10-minutes.md +++ b/docs/first-10-minutes.md @@ -75,8 +75,9 @@ ctx search "build failure" --term checksum --term release --limit 5 ``` `--limit` is capped at `200`. Search defaults to `--refresh auto`, which -best-effort refreshes discovered native provider sources before querying; use -`--refresh off` to search only the existing index. +best-effort refreshes discovered native provider sources and enabled auto +history-source plugins before querying; use `--refresh off` to search only the +existing index. Inside Codex, ctx excludes the active session tree by default when it can identify it, so your current prompt and subagents do not dominate results. Add diff --git a/docs/history-source-plugins.md b/docs/history-source-plugins.md index 66274bb4..be81fbda 100644 --- a/docs/history-source-plugins.md +++ b/docs/history-source-plugins.md @@ -6,7 +6,8 @@ searchable in ctx without ctx owning their storage schemas. The narrow waist is: 1. A local manifest declares one or more history sources. -2. ctx invokes the declared command only during explicit import. +2. ctx invokes enabled auto-refresh commands during search refresh, or any + selected source during explicit import. 3. The command writes `ctx-history-jsonl-v1` records to stdout. 4. ctx validates and imports that stream atomically. 5. ctx passes the previous source cursor back on the next run. @@ -41,6 +42,7 @@ Manifest example: "source_id": "default", "source_format": "dorkos-claude-jsonl-v1", "enabled": true, + "refresh": "auto", "command": ["ctx-history-source-dorkos", "export"], "timeout_seconds": 300 } @@ -51,8 +53,10 @@ Manifest example: `name`, `id`, `provider_key`, and `source_id` must be stable lowercase ASCII identifiers. `command` is an argv array; ctx does not run it through a shell. -`enabled: true` means `ctx import --all` may run that source. Explicit imports -can run a discovered source even when it is not enabled. +`enabled: true` means `ctx import --all` may run that source. `refresh: auto` +means `ctx search` may run it during the normal pre-search refresh. Explicit +imports can run a discovered source even when it is not enabled or is marked +`refresh: manual`. ## Import @@ -78,9 +82,11 @@ from the supplied manifest path. `CTX_HISTORY_FULL_RESCAN=1`. The plugin should emit a fresh `source.cursor.after` checkpoint if the rescan succeeds. -`ctx setup` and search refresh do not execute plugins in this version. Run -`ctx import --history-source ` or `ctx import --all` to catch up -plugin-backed sources before searching. +`ctx setup` does not execute plugins. `ctx search` defaults to `--refresh auto` +and runs discovered plugin sources only when they are both `enabled: true` and +`refresh: auto`; `--refresh off` never runs plugins, and `--refresh strict` +fails if an auto plugin refresh fails. Plugin refresh is incremental because ctx +passes the previously stored source cursor before invoking the command. ## Runtime Environment @@ -108,6 +114,9 @@ while the plugin process runs and is the reliable cursor handoff path. The plugin must write only `ctx-history-jsonl-v1` JSONL to stdout. Progress and diagnostics belong on stderr. If the command exits nonzero or stdout is invalid, ctx imports nothing from that run and does not advance the cursor. +stdout is capped at 64 MiB per run and stderr at 256 KiB, so plugins should emit +incremental batches from the supplied cursor instead of full historical dumps +during normal refresh. Plugin commands receive a limited inherited environment by default: `PATH`, `HOME`, basic locale variables, temporary-directory variables, and XDG data or diff --git a/docs/product-contract.md b/docs/product-contract.md index 86d05b78..421b1f4d 100644 --- a/docs/product-contract.md +++ b/docs/product-contract.md @@ -14,10 +14,11 @@ product boundary is retrieval, not interpretation. transcript formats. - `ctx sources` reports known local provider history paths, including whether a native source is currently importable. -- `ctx import` indexes supported local transcript formats. -- `ctx search` can refresh discovered native provider sources before returning - ranked local hits from the local index, with event IDs when a hit maps to an - indexed event. +- `ctx import` indexes supported local transcript formats and selected local + history-source plugins. +- `ctx search` can refresh discovered native provider sources and enabled auto + history-source plugins before returning ranked local hits from the local + index, with event IDs when a hit maps to an indexed event. - `ctx show session` and `ctx show event` render transcripts, hits, and context windows using ctx-owned IDs, and `ctx show session --out` writes transcript artifacts. diff --git a/docs/search.md b/docs/search.md index 52c24265..e8a7bd29 100644 --- a/docs/search.md +++ b/docs/search.md @@ -3,8 +3,8 @@ `ctx search` finds matching indexed history. Default results are session-diverse: ctx shows the strongest matching span from each session, then lets you drill into dense event-level results when needed. By default it first performs a quiet -best-effort refresh of discovered native provider sources, then queries the -local SQLite store. +best-effort refresh of discovered native provider sources and enabled auto +history-source plugins, then queries the local SQLite store. ## Search @@ -102,18 +102,19 @@ work do not dominate history research. Use `--include-current-session` when you are intentionally looking for material from the active session tree. `--refresh` defaults to `auto`. `auto` attempts a best-effort pre-search import -of discovered native provider sources and serves the existing index if that -refresh fails. Search refresh does not execute history-source plugin commands. -On large discovered sources or already-cataloged indexes, `auto` serves current -results without a foreground catch-up scan; use `--refresh strict` or -`ctx import --all` when you need a full catch-up before querying. `off` skips -the pre-search refresh. `strict` fails the search if the refresh cannot run or -import successfully. Search-only sources without native import support are +of discovered native provider sources and enabled auto history-source plugins, +then serves the existing index if that refresh fails. On large discovered +sources or already-cataloged indexes, `auto` serves current results without a +foreground catch-up scan; use `--refresh strict` or `ctx import --all` when you +need a full catch-up before querying. `off` skips the pre-search refresh and +never runs plugin commands. `strict` fails the search if the refresh cannot run +or import successfully. Search-only sources without native import support are searched from the existing index until they are explicitly imported through a supported path. Use `--refresh off` for a strictly read-only search over the existing ctx index. -This avoids provider imports and avoids updating the ctx SQLite store. +This avoids provider imports, plugin execution, and updates to the ctx SQLite +store. ## History Reports