Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 98 additions & 14 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2490,6 +2490,7 @@ impl CodexMessageProcessor {
.collect()
};
let core_dynamic_tool_count = core_dynamic_tools.len();
let config_for_thread_snapshot = config.clone();

match listener_task_context
.thread_manager
Expand Down Expand Up @@ -2541,11 +2542,14 @@ impl CodexMessageProcessor {
otel.name = "app_server.thread_start.config_snapshot",
))
.await;
let mut thread = build_thread_from_snapshot(
let mut thread = build_thread_from_snapshot_with_git_info(
&config_for_thread_snapshot,
thread_id,
&config_snapshot,
session_configured.rollout_path.clone(),
);
session_configured.rollout_path.as_deref(),
)
.await;

// Auto-attach a thread listener when starting a thread.
Self::log_listener_attach_result(
Expand Down Expand Up @@ -3920,7 +3924,14 @@ impl CodexMessageProcessor {
if include_turns {
rollout_path = loaded_rollout_path.clone();
}
build_thread_from_snapshot(thread_uuid, &config_snapshot, loaded_rollout_path)
build_thread_from_snapshot_with_git_info(
&self.config,
thread_uuid,
&config_snapshot,
loaded_rollout_path.clone(),
loaded_rollout_path.as_deref(),
)
.await
};
if thread.forked_from_id.is_none()
&& let Some(rollout_path) = rollout_path.as_ref()
Expand Down Expand Up @@ -4018,8 +4029,15 @@ impl CodexMessageProcessor {
) {
if let Ok(thread) = self.thread_manager.get_thread(thread_id).await {
let config_snapshot = thread.config_snapshot().await;
let loaded_thread =
build_thread_from_snapshot(thread_id, &config_snapshot, thread.rollout_path());
let rollout_path = thread.rollout_path();
let loaded_thread = build_thread_from_snapshot_with_git_info(
&self.config,
thread_id,
&config_snapshot,
rollout_path.clone(),
rollout_path.as_deref(),
)
.await;
self.thread_watch_manager.upsert_thread(loaded_thread).await;
}

Expand Down Expand Up @@ -4554,11 +4572,14 @@ impl CodexMessageProcessor {
}
InitialHistory::Forked(items) => {
let config_snapshot = thread.config_snapshot().await;
let mut thread = build_thread_from_snapshot(
let mut thread = build_thread_from_snapshot_with_git_info(
&self.config,
thread_id,
&config_snapshot,
Some(rollout_path.into()),
);
Some(rollout_path),
)
.await;
thread.preview = preview_from_rollout_items(items);
Ok(thread)
}
Expand Down Expand Up @@ -4779,6 +4800,15 @@ impl CodexMessageProcessor {
let mut thread = summary_to_thread(summary);
thread.forked_from_id =
forked_from_id_from_rollout(fork_rollout_path.as_path()).await;
if thread.git_info.is_none() {
thread.git_info = best_available_thread_git_info(
&self.config,
thread_id,
Some(rollout_path.as_path()),
fallback_model_provider.as_str(),
)
.await;
}
thread
}
Err(err) => {
Expand All @@ -4796,8 +4826,14 @@ impl CodexMessageProcessor {
} else {
let config_snapshot = forked_thread.config_snapshot().await;
// forked thread names do not inherit the source thread name
let mut thread =
build_thread_from_snapshot(thread_id, &config_snapshot, /*path*/ None);
let mut thread = build_thread_from_snapshot_with_git_info(
&self.config,
thread_id,
&config_snapshot,
/*path*/ None,
Some(rollout_path.as_path()),
)
.await;
let history_items = match read_rollout_items_from_rollout(rollout_path.as_path()).await
{
Ok(items) => items,
Expand Down Expand Up @@ -9476,6 +9512,58 @@ fn merge_mutable_thread_metadata(thread: &mut Thread, persisted_thread: Thread)
thread.git_info = persisted_thread.git_info;
}

fn map_conversation_git_info(git_info: ConversationGitInfo) -> ApiGitInfo {
ApiGitInfo {
sha: git_info.sha,
branch: git_info.branch,
origin_url: git_info.origin_url,
}
}

fn map_optional_conversation_git_info(git_info: Option<ConversationGitInfo>) -> Option<ApiGitInfo> {
git_info.map(map_conversation_git_info)
}

async fn best_available_thread_git_info(
config: &Config,
thread_id: ThreadId,
rollout_path: Option<&Path>,
fallback_provider: &str,
) -> Option<ApiGitInfo> {
if let Some(summary) = read_summary_from_state_db_by_thread_id(config, thread_id).await {
let git_info = map_optional_conversation_git_info(summary.git_info);
if git_info.is_some() {
return git_info;
}
}

if let Some(rollout_path) = rollout_path
&& let Ok(summary) = read_summary_from_rollout(rollout_path, fallback_provider).await
{
return map_optional_conversation_git_info(summary.git_info);
}

None
}

async fn build_thread_from_snapshot_with_git_info(
config: &Config,
thread_id: ThreadId,
config_snapshot: &ThreadConfigSnapshot,
path: Option<PathBuf>,
git_info_rollout_path: Option<&Path>,
) -> Thread {
let mut thread = build_thread_from_snapshot(thread_id, config_snapshot, path);
thread.git_info = best_available_thread_git_info(
config,
thread_id,
git_info_rollout_path,
config_snapshot.model_provider_id.as_str(),
)
.await;
thread
}

fn preview_from_rollout_items(items: &[RolloutItem]) -> String {
items
.iter()
Expand Down Expand Up @@ -9587,11 +9675,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {

let created_at = parse_datetime(timestamp.as_deref());
let updated_at = parse_datetime(updated_at.as_deref()).or(created_at);
let git_info = git_info.map(|info| ApiGitInfo {
sha: info.sha,
branch: info.branch,
origin_url: info.origin_url,
});
let git_info = map_optional_conversation_git_info(git_info);

Thread {
id: conversation_id.to_string(),
Expand Down
103 changes: 103 additions & 0 deletions codex-rs/app-server/tests/suite/v2/thread_fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use app_test_support::create_fake_rollout;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use codex_app_server_protocol::GitInfo;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
Expand All @@ -26,6 +27,7 @@ use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_config::types::AuthCredentialsStoreMode;
use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR;
use codex_protocol::protocol::GitInfo as RolloutGitInfo;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
Expand Down Expand Up @@ -183,6 +185,107 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn thread_fork_preserves_git_info_for_materialized_forks() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;

let git_info = RolloutGitInfo {
commit_hash: None,
branch: Some("feature/source-branch".to_string()),
repository_url: Some("https://github.com/example/codex.git".to_string()),
};
let conversation_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
"Saved user message",
Some("mock_provider"),
Some(git_info),
)?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
..Default::default()
})
.await?;
let fork_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
)
.await??;
let ThreadForkResponse { thread, .. } = to_response::<ThreadForkResponse>(fork_resp)?;

assert_eq!(
thread.git_info,
Some(GitInfo {
sha: None,
branch: Some("feature/source-branch".to_string()),
origin_url: Some("https://github.com/example/codex.git".to_string()),
})
);

Ok(())
}

#[tokio::test]
async fn thread_fork_preserves_git_info_for_ephemeral_forks() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;

let git_info = RolloutGitInfo {
commit_hash: None,
branch: Some("feature/source-branch".to_string()),
repository_url: Some("https://github.com/example/codex.git".to_string()),
};
let conversation_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
"Saved user message",
Some("mock_provider"),
Some(git_info),
)?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let fork_id = mcp
.send_thread_fork_request(ThreadForkParams {
thread_id: conversation_id,
ephemeral: true,
..Default::default()
})
.await?;
let fork_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
)
.await??;
let ThreadForkResponse { thread, .. } = to_response::<ThreadForkResponse>(fork_resp)?;

assert!(
thread.path.is_none(),
"ephemeral forks should remain pathless"
);
assert_eq!(
thread.git_info,
Some(GitInfo {
sha: None,
branch: Some("feature/source-branch".to_string()),
origin_url: Some("https://github.com/example/codex.git".to_string()),
})
);

Ok(())
}

#[tokio::test]
async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
Expand Down
67 changes: 67 additions & 0 deletions codex-rs/app-server/tests/suite/v2/thread_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use app_test_support::McpProcess;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::GitInfo;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
Expand All @@ -12,6 +13,8 @@ use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams;
use codex_app_server_protocol::ThreadMetadataUpdateParams;
use codex_app_server_protocol::ThreadNameUpdatedNotification;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
Expand Down Expand Up @@ -253,6 +256,70 @@ async fn thread_read_loaded_thread_returns_precomputed_path_before_materializati
Ok(())
}

#[tokio::test]
async fn thread_read_loaded_thread_preserves_persisted_git_info() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;

let update_id = mcp
.send_thread_metadata_update_request(ThreadMetadataUpdateParams {
thread_id: thread.id.clone(),
git_info: Some(ThreadMetadataGitInfoUpdateParams {
sha: Some(Some("abc123".to_string())),
branch: Some(Some("feature/desktop-pr-button".to_string())),
origin_url: Some(Some("https://github.com/example/codex.git".to_string())),
}),
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(update_id)),
)
.await??;

let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: thread.id,
include_turns: false,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread: read } = to_response::<ThreadReadResponse>(read_resp)?;

assert_eq!(
read.git_info,
Some(GitInfo {
sha: Some("abc123".to_string()),
branch: Some("feature/desktop-pr-button".to_string()),
origin_url: Some("https://github.com/example/codex.git".to_string()),
})
);
assert_eq!(read.status, ThreadStatus::Idle);

Ok(())
}

#[tokio::test]
async fn thread_name_set_is_reflected_in_read_list_and_resume() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
Expand Down