From d9cd535f6e1aa40955e1aedf9708ea72b4ea2035 Mon Sep 17 00:00:00 2001 From: William Pelrine Date: Mon, 13 Apr 2026 14:05:13 -0700 Subject: [PATCH 1/2] fix(app-server): preserve thread git info in live snapshots --- .../app-server/src/codex_message_processor.rs | 112 +++++++++++++++--- 1 file changed, 98 insertions(+), 14 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 3decc83f4a2..d1baf5faa0d 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2490,6 +2490,7 @@ impl CodexMessageProcessor { .collect() }; let core_dynamic_tool_count = core_dynamic_tools.len(); + let config_for_thread_snapshot = config.clone(); match listener_task_context .thread_manager @@ -2541,11 +2542,14 @@ impl CodexMessageProcessor { otel.name = "app_server.thread_start.config_snapshot", )) .await; - let mut thread = build_thread_from_snapshot( + let mut thread = build_thread_from_snapshot_with_git_info( + &config_for_thread_snapshot, thread_id, &config_snapshot, session_configured.rollout_path.clone(), - ); + session_configured.rollout_path.as_deref(), + ) + .await; // Auto-attach a thread listener when starting a thread. Self::log_listener_attach_result( @@ -3920,7 +3924,14 @@ impl CodexMessageProcessor { if include_turns { rollout_path = loaded_rollout_path.clone(); } - build_thread_from_snapshot(thread_uuid, &config_snapshot, loaded_rollout_path) + build_thread_from_snapshot_with_git_info( + &self.config, + thread_uuid, + &config_snapshot, + loaded_rollout_path.clone(), + loaded_rollout_path.as_deref(), + ) + .await }; if thread.forked_from_id.is_none() && let Some(rollout_path) = rollout_path.as_ref() @@ -4018,8 +4029,15 @@ impl CodexMessageProcessor { ) { if let Ok(thread) = self.thread_manager.get_thread(thread_id).await { let config_snapshot = thread.config_snapshot().await; - let loaded_thread = - build_thread_from_snapshot(thread_id, &config_snapshot, thread.rollout_path()); + let rollout_path = thread.rollout_path(); + let loaded_thread = build_thread_from_snapshot_with_git_info( + &self.config, + thread_id, + &config_snapshot, + rollout_path.clone(), + rollout_path.as_deref(), + ) + .await; self.thread_watch_manager.upsert_thread(loaded_thread).await; } @@ -4554,11 +4572,14 @@ impl CodexMessageProcessor { } InitialHistory::Forked(items) => { let config_snapshot = thread.config_snapshot().await; - let mut thread = build_thread_from_snapshot( + let mut thread = build_thread_from_snapshot_with_git_info( + &self.config, thread_id, &config_snapshot, Some(rollout_path.into()), - ); + Some(rollout_path), + ) + .await; thread.preview = preview_from_rollout_items(items); Ok(thread) } @@ -4779,6 +4800,15 @@ impl CodexMessageProcessor { let mut thread = summary_to_thread(summary); thread.forked_from_id = forked_from_id_from_rollout(fork_rollout_path.as_path()).await; + if thread.git_info.is_none() { + thread.git_info = best_available_thread_git_info( + &self.config, + thread_id, + Some(rollout_path.as_path()), + fallback_model_provider.as_str(), + ) + .await; + } thread } Err(err) => { @@ -4796,8 +4826,14 @@ impl CodexMessageProcessor { } else { let config_snapshot = forked_thread.config_snapshot().await; // forked thread names do not inherit the source thread name - let mut thread = - build_thread_from_snapshot(thread_id, &config_snapshot, /*path*/ None); + let mut thread = build_thread_from_snapshot_with_git_info( + &self.config, + thread_id, + &config_snapshot, + /*path*/ None, + Some(rollout_path.as_path()), + ) + .await; let history_items = match read_rollout_items_from_rollout(rollout_path.as_path()).await { Ok(items) => items, @@ -9476,6 +9512,58 @@ fn merge_mutable_thread_metadata(thread: &mut Thread, persisted_thread: Thread) thread.git_info = persisted_thread.git_info; } +fn map_conversation_git_info(git_info: ConversationGitInfo) -> ApiGitInfo { + ApiGitInfo { + sha: git_info.sha, + branch: git_info.branch, + origin_url: git_info.origin_url, + } +} + +fn map_optional_conversation_git_info(git_info: Option) -> Option { + git_info.map(map_conversation_git_info) +} + +async fn best_available_thread_git_info( + config: &Config, + thread_id: ThreadId, + rollout_path: Option<&Path>, + fallback_provider: &str, +) -> Option { + if let Some(summary) = read_summary_from_state_db_by_thread_id(config, thread_id).await { + let git_info = map_optional_conversation_git_info(summary.git_info); + if git_info.is_some() { + return git_info; + } + } + + if let Some(rollout_path) = rollout_path + && let Ok(summary) = read_summary_from_rollout(rollout_path, fallback_provider).await + { + return map_optional_conversation_git_info(summary.git_info); + } + + None +} + +async fn build_thread_from_snapshot_with_git_info( + config: &Config, + thread_id: ThreadId, + config_snapshot: &ThreadConfigSnapshot, + path: Option, + git_info_rollout_path: Option<&Path>, +) -> Thread { + let mut thread = build_thread_from_snapshot(thread_id, config_snapshot, path); + thread.git_info = best_available_thread_git_info( + config, + thread_id, + git_info_rollout_path, + config_snapshot.model_provider_id.as_str(), + ) + .await; + thread +} + fn preview_from_rollout_items(items: &[RolloutItem]) -> String { items .iter() @@ -9587,11 +9675,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread { let created_at = parse_datetime(timestamp.as_deref()); let updated_at = parse_datetime(updated_at.as_deref()).or(created_at); - let git_info = git_info.map(|info| ApiGitInfo { - sha: info.sha, - branch: info.branch, - origin_url: info.origin_url, - }); + let git_info = map_optional_conversation_git_info(git_info); Thread { id: conversation_id.to_string(), From 8d96d6226c9c30da6e390c5bbe64cc7e53194cbe Mon Sep 17 00:00:00 2001 From: William Pelrine Date: Mon, 13 Apr 2026 14:05:18 -0700 Subject: [PATCH 2/2] test(app-server): cover git info hydration on read and fork --- .../app-server/tests/suite/v2/thread_fork.rs | 103 ++++++++++++++++++ .../app-server/tests/suite/v2/thread_read.rs | 67 ++++++++++++ 2 files changed, 170 insertions(+) diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index 9907fc4b1de..f0884e8bf25 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -5,6 +5,7 @@ use app_test_support::create_fake_rollout; use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::to_response; use app_test_support::write_chatgpt_auth; +use codex_app_server_protocol::GitInfo; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCResponse; @@ -26,6 +27,7 @@ use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput; use codex_config::types::AuthCredentialsStoreMode; use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR; +use codex_protocol::protocol::GitInfo as RolloutGitInfo; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; @@ -183,6 +185,107 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_fork_preserves_git_info_for_materialized_forks() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let git_info = RolloutGitInfo { + commit_hash: None, + branch: Some("feature/source-branch".to_string()), + repository_url: Some("https://github.com/example/codex.git".to_string()), + }; + let conversation_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + Some(git_info), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id, + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + assert_eq!( + thread.git_info, + Some(GitInfo { + sha: None, + branch: Some("feature/source-branch".to_string()), + origin_url: Some("https://github.com/example/codex.git".to_string()), + }) + ); + + Ok(()) +} + +#[tokio::test] +async fn thread_fork_preserves_git_info_for_ephemeral_forks() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let git_info = RolloutGitInfo { + commit_hash: None, + branch: Some("feature/source-branch".to_string()), + repository_url: Some("https://github.com/example/codex.git".to_string()), + }; + let conversation_id = create_fake_rollout( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + Some(git_info), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id, + ephemeral: true, + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + assert!( + thread.path.is_none(), + "ephemeral forks should remain pathless" + ); + assert_eq!( + thread.git_info, + Some(GitInfo { + sha: None, + branch: Some("feature/source-branch".to_string()), + origin_url: Some("https://github.com/example/codex.git".to_string()), + }) + ); + + Ok(()) +} + #[tokio::test] async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/app-server/tests/suite/v2/thread_read.rs b/codex-rs/app-server/tests/suite/v2/thread_read.rs index c5fd699855c..a8d83d2228a 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_read.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_read.rs @@ -3,6 +3,7 @@ use app_test_support::McpProcess; use app_test_support::create_fake_rollout_with_text_elements; use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::to_response; +use codex_app_server_protocol::GitInfo; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; @@ -12,6 +13,8 @@ use codex_app_server_protocol::ThreadForkResponse; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; +use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams; +use codex_app_server_protocol::ThreadMetadataUpdateParams; use codex_app_server_protocol::ThreadNameUpdatedNotification; use codex_app_server_protocol::ThreadReadParams; use codex_app_server_protocol::ThreadReadResponse; @@ -253,6 +256,70 @@ async fn thread_read_loaded_thread_returns_precomputed_path_before_materializati Ok(()) } +#[tokio::test] +async fn thread_read_loaded_thread_preserves_persisted_git_info() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let update_id = mcp + .send_thread_metadata_update_request(ThreadMetadataUpdateParams { + thread_id: thread.id.clone(), + git_info: Some(ThreadMetadataGitInfoUpdateParams { + sha: Some(Some("abc123".to_string())), + branch: Some(Some("feature/desktop-pr-button".to_string())), + origin_url: Some(Some("https://github.com/example/codex.git".to_string())), + }), + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(update_id)), + ) + .await??; + + let read_id = mcp + .send_thread_read_request(ThreadReadParams { + thread_id: thread.id, + include_turns: false, + }) + .await?; + let read_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(read_id)), + ) + .await??; + let ThreadReadResponse { thread: read } = to_response::(read_resp)?; + + assert_eq!( + read.git_info, + Some(GitInfo { + sha: Some("abc123".to_string()), + branch: Some("feature/desktop-pr-button".to_string()), + origin_url: Some("https://github.com/example/codex.git".to_string()), + }) + ); + assert_eq!(read.status, ThreadStatus::Idle); + + Ok(()) +} + #[tokio::test] async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await;