Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions crates/jp_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,20 @@ fn run_inner(cli: Cli, format: OutputFormat) -> Result<()> {
}
}

// Load conversation IDs and metadata from disk so that
// `apply_conversation_config` (called inside `load_partial_config`) can
// access conversation events via lazy loading.
if let Err(error) = workspace.load_conversation_index() {
tracing::error!(error = ?error, "Failed to load conversation index.");
}

let partial = load_partial_config(&cli.command, Some(&workspace), &cli.globals.config)?;
let config = Arc::new(build(partial)?);

if let Err(error) = workspace.load_conversations_from_disk(config.clone()) {
tracing::error!(error = ?error, "Failed to load workspace.");
// For fresh workspaces, create a default stream now that we have the
// final config.
if let Err(error) = workspace.ensure_active_conversation_stream(config.clone()) {
tracing::error!(error = ?error, "Failed to ensure active conversation stream.");
}

let runtime = build_runtime(cli.root.threads, "jp-worker")?;
Expand Down
65 changes: 49 additions & 16 deletions crates/jp_workspace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,31 +154,36 @@ impl Workspace {
self.storage.as_ref().and_then(Storage::user_storage_path)
}

/// Load the workspace state from the persisted storage.
/// Load conversation IDs, metadata, and `TombMap` entries from disk.
///
/// If the workspace is not persisted, this method will return an error.
/// This populates the workspace state so that conversation events can be
/// accessed via [`get_events`](Self::get_events) (which triggers lazy
/// loading from storage). For the active conversation, the stream is
/// eagerly loaded for performance.
///
/// For fresh workspaces (no conversations on disk), this registers the
/// active conversation ID in the events `TombMap` but does **not** create a
/// default stream — call [`ensure_active_conversation_stream`] after
/// the final [`AppConfig`] is available for that.
///
/// Call [`sanitize`](Self::sanitize) before this method to ensure the
/// filesystem is in a consistent state.
pub fn load_conversations_from_disk(&mut self, config: Arc<AppConfig>) -> Result<()> {
trace!("Loading state.");
///
/// [`ensure_active_conversation_stream`]: Self::ensure_active_conversation_stream
pub fn load_conversation_index(&mut self) -> Result<()> {
trace!("Loading conversation index.");

let storage = self.storage.as_mut().ok_or(Error::MissingStorage)?;
let storage = self.storage.as_ref().ok_or(Error::MissingStorage)?;
let conversation_ids = storage.load_all_conversation_ids();

if conversation_ids.is_empty() {
debug!("No conversations found, workspace is fresh.");

// Ensure the active conversation has an events entry so the
// invariant (active_conversation_id → events) always holds.
// Register the active conversation ID so that `get_events` can
// find the entry (lazy loading will return `None` since there is
// no file on disk, which is expected for fresh workspaces).
let active_id = self.active_conversation_id();
let _err = self
.state
.local
.events
.entry(active_id)
.or_default()
.set(ConversationStream::new(config).with_created_at(active_id.timestamp()));
self.state.local.events.entry(active_id).or_default();

return Ok(());
}
Expand Down Expand Up @@ -206,8 +211,7 @@ impl Workspace {
.map(|id| (id, OnceCell::new()))
.collect();

// We can `set` without checking if the cell is already initialized, as
// we just initialized it above.let _err = events
// Eagerly load the active conversation stream for performance.
let _err = events
.entry(metadata.active_conversation_id)
.or_default()
Expand All @@ -227,6 +231,35 @@ impl Workspace {
Ok(())
}

/// Ensure the active conversation has an event stream.
///
/// For fresh workspaces where no stream exists on disk, this creates a
/// default [`ConversationStream`] with the given config as its base.
///
/// For existing workspaces where the stream was already loaded (eagerly
/// by [`load_conversation_index`] or lazily via [`get_events`]), this is
/// a no-op.
///
/// [`load_conversation_index`]: Self::load_conversation_index
/// [`get_events`]: Self::get_events
pub fn ensure_active_conversation_stream(&mut self, config: Arc<AppConfig>) -> Result<()> {
let active_id = self.active_conversation_id();

if self.get_events(&active_id).is_some() {
return Ok(());
}

let _err = self
.state
.local
.events
.entry(active_id)
.or_default()
.set(ConversationStream::new(config).with_created_at(active_id.timestamp()));

Ok(())
}

/// Persists the current in-memory workspace state back to disk atomically.
///
/// This is a no-op if persistence is disabled.
Expand Down
141 changes: 134 additions & 7 deletions crates/jp_workspace/src/lib_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ use std::{collections::HashMap, fs, time::Duration};

use camino_tempfile::tempdir;
use datetime_literal::datetime;
use jp_config::{
PartialConfig as _,
fs::load_partial,
model::id::{ModelIdOrAliasConfig, PartialModelIdOrAliasConfig, ProviderId},
util::build,
};
use jp_conversation::ConversationsMetadata;
use jp_storage::{
CONVERSATIONS_DIR, METADATA_FILE,
Expand Down Expand Up @@ -234,7 +240,7 @@ fn test_workspace_cannot_remove_active_conversation() {
}

#[test]
fn test_load_succeeds_when_no_conversations_exist() {
fn test_load_index_fresh_workspace_then_ensure_stream() {
let tmp = tempdir().unwrap();
let root = tmp.path().join("root");
let storage = root.join("storage");
Expand All @@ -247,16 +253,137 @@ fn test_load_succeeds_when_no_conversations_exist() {
let mut workspace = Workspace::new(&root).persisted_at(&storage).unwrap();
workspace.disable_persistence();

// A fresh workspace with no conversations on disk is valid — load
// should succeed with default state.
let config = Arc::new(AppConfig::new_test());
workspace.load_conversations_from_disk(config).unwrap();

// The active conversation must have an events entry.
// Phase 1: load index — no conversations on disk, so the active
// conversation entry is registered but has no stream yet.
workspace.load_conversation_index().unwrap();
let active_id = workspace.active_conversation_id();
assert!(
workspace.get_events(&active_id).is_none(),
"fresh workspace should have no stream before ensure_active_conversation_stream"
);

// Phase 2: create the default stream with the final config.
let config = Arc::new(AppConfig::new_test());
workspace.ensure_active_conversation_stream(config).unwrap();
assert!(workspace.get_events(&active_id).is_some());
}

#[test]
fn test_load_index_existing_workspace_events_accessible() {
let tmp = tempdir().unwrap();
let root = tmp.path().join("root");
let storage_path = root.join("storage");

let config = Arc::new(AppConfig::new_test());
let id = ConversationId::try_from(datetime!(2024-03-15 12:00:00 Z)).unwrap();

// Write a conversation to disk.
{
let mut ws = Workspace::new(&root).persisted_at(&storage_path).unwrap();
ws.create_conversation_with_id(id, Conversation::default(), config.clone());
ws.set_active_conversation_id(id, DateTime::<Utc>::UNIX_EPOCH)
.unwrap();
ws.persist().unwrap();
}

// Reload from scratch — only load_conversation_index, no config needed.
let mut ws = Workspace::new(&root).persisted_at(&storage_path).unwrap();
ws.disable_persistence();
ws.load_conversation_index().unwrap();

// Events should be accessible via lazy loading.
assert_eq!(ws.active_conversation_id(), id);
let events = ws.get_events(&id);
assert!(
events.is_some(),
"events must be lazily loadable after load_conversation_index"
);

// The stream's config should be retrievable (this is what
// apply_conversation_config relies on).
let stream_config = events.unwrap().config();
assert!(stream_config.is_ok());

// ensure_active_conversation_stream should be a no-op.
ws.ensure_active_conversation_stream(config).unwrap();
assert!(ws.get_events(&id).is_some());
}

/// Regression test for the bug where continuing a conversation without the
/// original config passed in via `--cfg` caused a spurious `ConfigDelta` that
/// reset the model and disabled all tools.
///
/// The root cause was that `load_partial_config` ran before conversation events
/// were loaded from disk, so `apply_conversation_config` couldn't read the
/// stream config and fell back to an empty partial.
///
/// This test exercises the full round-trip:
/// 1. Create a conversation with a custom model name, persist to disk.
/// 2. Reload with `load_conversation_index` only (no config needed).
/// 3. Simulate `apply_conversation_config`: merge stream config into a bare
/// partial (as if no custom config was passed).
/// 4. Build the final config and assert the custom model name survived.
/// 5. Assert that `get_config_delta_from_cli` would produce no delta.
#[test]
fn test_conversation_config_preserved_across_reload() {
let tmp = tempdir().unwrap();
let root = tmp.path().join("root");
let storage_path = root.join("storage");

// Build a config with a distinctive model name.
let mut custom_config = AppConfig::new_test();
custom_config.assistant.model.id =
ModelIdOrAliasConfig::Id((ProviderId::Anthropic, "custom-model").try_into().unwrap());
let id = ConversationId::try_from(datetime!(2024-05-20 10:00:00 Z)).unwrap();

// Persist a conversation that was created with the custom config.
{
let mut ws = Workspace::new(&root).persisted_at(&storage_path).unwrap();
ws.create_conversation_with_id(id, Conversation::default(), Arc::new(custom_config));
ws.set_active_conversation_id(id, DateTime::<Utc>::UNIX_EPOCH)
.unwrap();
ws.persist().unwrap();
}

// Simulate a second invocation WITHOUT the custom config.
let mut ws = Workspace::new(&root).persisted_at(&storage_path).unwrap();
ws.disable_persistence();
ws.load_conversation_index().unwrap();

// Simulate `apply_conversation_config`: merge the stream's config into a
// bare (default) partial, exactly as the CLI does when no `--cfg` flag is
// provided. We use new_test().to_partial() to represent the default config
// (file-based + env, but no custom config overlay).
let bare_partial = AppConfig::new_test().to_partial();
let stream_config = ws
.get_events(&id)
.expect("events must be accessible")
.config()
.expect("valid config")
.to_partial();
let merged = load_partial(bare_partial, stream_config).expect("merge ok");
let final_config = build(merged).expect("valid config");

// The custom config's model name must survive the round-trip.
let resolved = final_config.assistant.model.id.resolved();
assert_eq!(
resolved.name.as_ref(),
"custom-model",
"conversation config must be preserved when continuing without --cfg flag"
);

// The delta must NOT contain a model change — this was the core symptom of
// the bug, where the model reverted to the default.
let stream_partial = ws.get_events(&id).unwrap().config().unwrap().to_partial();
let delta = stream_partial.delta(final_config.to_partial());
assert_eq!(
delta.assistant.model.id,
PartialModelIdOrAliasConfig::empty(),
"config delta must not contain a model change when continuing a conversation without \
overrides"
);
}

#[test]
fn test_workspace_persist_active_conversation() {
let tmp = tempdir().unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/jp_workspace/src/sanitize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ impl Workspace {
/// those that fail validation, and ensures the active conversation ID
/// resolves to a valid conversation. Returns a report of what was repaired.
///
/// This should be called before [`load_conversations_from_disk`] to
/// This should be called before [`load_conversation_index`] to
/// guarantee the filesystem is in a consistent state.
///
/// [`load_conversations_from_disk`]: Self::load_conversations_from_disk
/// [`load_conversation_index`]: Self::load_conversation_index
pub fn sanitize(&mut self) -> Result<SanitizeReport> {
let storage = self.storage.as_ref().ok_or(Error::MissingStorage)?;

Expand Down
16 changes: 9 additions & 7 deletions crates/jp_workspace/src/sanitize_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn test_reassigns_active_when_trashed() {
assert!(report.active_reassigned);
assert!(!report.default_created);

ws.load_conversations_from_disk(test_config()).unwrap();
ws.load_conversation_index().unwrap();
assert_eq!(ws.active_conversation_id(), id2);
}

Expand All @@ -100,7 +100,7 @@ fn test_reassigns_to_most_recent_valid() {
let report = ws.sanitize().unwrap();

assert!(report.active_reassigned);
ws.load_conversations_from_disk(test_config()).unwrap();
ws.load_conversation_index().unwrap();
assert_eq!(ws.active_conversation_id(), id2);
}

Expand All @@ -123,7 +123,7 @@ fn test_reassigns_when_active_has_no_directory() {
assert!(report.active_reassigned);
assert!(!report.default_created);

ws.load_conversations_from_disk(test_config()).unwrap();
ws.load_conversation_index().unwrap();
assert_eq!(ws.active_conversation_id(), id2);
}

Expand All @@ -143,7 +143,8 @@ fn test_default_created_when_all_trashed() {
assert!(report.default_created);
assert_eq!(report.trashed.len(), 1);

ws.load_conversations_from_disk(test_config()).unwrap();
ws.load_conversation_index().unwrap();
ws.ensure_active_conversation_stream(test_config()).unwrap();
}

#[test]
Expand All @@ -162,7 +163,7 @@ fn test_corrupt_global_metadata_reassigns_to_valid() {
assert!(report.active_reassigned);
assert!(!report.default_created);

ws.load_conversations_from_disk(test_config()).unwrap();
ws.load_conversation_index().unwrap();
assert_eq!(ws.active_conversation_id(), id2);
}

Expand All @@ -178,7 +179,8 @@ fn test_corrupt_global_metadata_with_no_conversations() {
assert!(!report.has_repairs());
assert!(!storage.conversations_metadata_exists());

ws.load_conversations_from_disk(test_config()).unwrap();
ws.load_conversation_index().unwrap();
ws.ensure_active_conversation_stream(test_config()).unwrap();
}

#[test]
Expand Down Expand Up @@ -235,7 +237,7 @@ fn test_sanitize_then_load_with_mixed_valid_and_invalid() {
assert!(report.active_reassigned);
assert!(!report.default_created);

ws.load_conversations_from_disk(test_config()).unwrap();
ws.load_conversation_index().unwrap();
assert_eq!(ws.active_conversation_id(), id3);
assert!(ws.get_conversation(&id1).is_some());
}
Loading