From 65827b0deffa20ccc90cdbe392e3238ad567deaa Mon Sep 17 00:00:00 2001 From: Peter vogel Date: Thu, 15 Jan 2026 04:59:22 +0100 Subject: [PATCH 1/2] feat(telegram): add optional Telegram bot integration --- src-tauri/Cargo.toml | 5 +- src-tauri/src/lib.rs | 34 +- src-tauri/src/settings.rs | 35 +- src-tauri/src/state.rs | 15 +- src-tauri/src/telegram.rs | 1672 +++++++++++++++++ src-tauri/src/telegram_stub.rs | 34 + src-tauri/src/types.rs | 28 + src/App.tsx | 3 + src/features/app/hooks/useAppServerEvents.ts | 12 +- .../settings/components/SettingsView.tsx | 246 ++- src/features/settings/hooks/useAppSettings.ts | 7 + src/features/threads/hooks/useThreads.ts | 4 +- src/services/tauri.ts | 5 + src/types.ts | 14 + 14 files changed, 2094 insertions(+), 20 deletions(-) create mode 100644 src-tauri/src/telegram.rs create mode 100644 src-tauri/src/telegram_stub.rs diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index cea35972c..5ccffcda8 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -23,13 +23,16 @@ tauri-plugin-opener = "2" tauri-plugin-process = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" -tokio = { version = "1", features = ["io-util", "process", "rt", "sync", "time"] } +tokio = { version = "1", features = ["io-util", "process", "rt", "sync", "time", "macros"] } uuid = { version = "1", features = ["v4"] } tauri-plugin-dialog = "2" git2 = "0.20.3" fix-path-env = { git = "https://github.com/tauri-apps/fix-path-env-rs" } ignore = "0.4.25" portable-pty = "0.8" +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +sha2 = "0.10" +hex = "0.4" [target."cfg(not(any(target_os = \"android\", target_os = \"ios\")))".dependencies] tauri-plugin-updater = "2" diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index af108ad8b..19551826c 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -4,6 +4,12 @@ use tauri::{Manager, WebviewUrl, WebviewWindowBuilder}; mod backend; mod codex; mod event_sink; +#[cfg(desktop)] +mod telegram; +#[cfg(mobile)] +mod telegram_stub; +#[cfg(mobile)] +use telegram_stub as telegram; mod git; mod prompts; mod settings; @@ -24,7 +30,7 @@ pub fn run() { } } - tauri::Builder::default() + let app = tauri::Builder::default() .enable_macos_default_menu(false) .menu(|handle| { let app_name = handle.package_info().name.clone(); @@ -127,6 +133,7 @@ pub fn run() { .setup(|app| { let state = state::AppState::load(&app.handle()); app.manage(state); + telegram::start_telegram_poller(app.handle().clone()); #[cfg(desktop)] app.handle() .plugin(tauri_plugin_updater::Builder::new().build())?; @@ -171,8 +178,27 @@ pub fn run() { terminal::terminal_open, terminal::terminal_write, terminal::terminal_resize, - terminal::terminal_close + terminal::terminal_close, + telegram::telegram_bot_status ]) - .run(tauri::generate_context!()) - .expect("error while running tauri application"); + .build(tauri::generate_context!()) + .expect("error while building tauri application"); + + #[cfg(desktop)] + let exit_notified = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + #[cfg(desktop)] + let exit_notified_for_run = exit_notified.clone(); + + app.run(move |app_handle, event| { + #[cfg(desktop)] + if matches!(event, tauri::RunEvent::ExitRequested { .. }) { + use std::sync::atomic::Ordering; + if !exit_notified_for_run.swap(true, Ordering::SeqCst) { + let handle = app_handle.clone(); + tauri::async_runtime::spawn(async move { + telegram::notify_app_exit(handle).await; + }); + } + } + }); } diff --git a/src-tauri/src/settings.rs b/src-tauri/src/settings.rs index f7dd45427..3c48abf7b 100644 --- a/src-tauri/src/settings.rs +++ b/src-tauri/src/settings.rs @@ -10,13 +10,42 @@ pub(crate) async fn get_app_settings(state: State<'_, AppState>) -> Result AppSettings { + let mut merged = incoming.clone(); + + // Preserve generated secrets if a stale client sends empty values. + if merged.telegram_pairing_secret.trim().is_empty() { + merged.telegram_pairing_secret = current.telegram_pairing_secret.clone(); + } + + // Linking happens out-of-band (via Telegram). Make this additive so a stale UI + // snapshot can't wipe the allowlist. + if !current.telegram_allowed_user_ids.is_empty() { + let mut ids = current.telegram_allowed_user_ids.clone(); + for id in &incoming.telegram_allowed_user_ids { + if !ids.contains(id) { + ids.push(*id); + } + } + merged.telegram_allowed_user_ids = ids; + } + + // Default chat id is learned during linking; don't lose it if the UI sends null. + if merged.telegram_default_chat_id.is_none() { + merged.telegram_default_chat_id = current.telegram_default_chat_id; + } + + merged +} + #[tauri::command] pub(crate) async fn update_app_settings( settings: AppSettings, state: State<'_, AppState>, ) -> Result { - write_settings(&state.settings_path, &settings)?; let mut current = state.app_settings.lock().await; - *current = settings.clone(); - Ok(settings) + let merged = merge_settings(¤t, &settings); + write_settings(&state.settings_path, &merged)?; + *current = merged.clone(); + Ok(merged) } diff --git a/src-tauri/src/state.rs b/src-tauri/src/state.rs index b0d6f426a..57130e848 100644 --- a/src-tauri/src/state.rs +++ b/src-tauri/src/state.rs @@ -4,8 +4,9 @@ use std::sync::Arc; use tauri::{AppHandle, Manager}; use tokio::sync::Mutex; +use uuid::Uuid; -use crate::storage::{read_settings, read_workspaces}; +use crate::storage::{read_settings, read_workspaces, write_settings}; use crate::types::{AppSettings, WorkspaceEntry}; pub(crate) struct AppState { @@ -16,6 +17,9 @@ pub(crate) struct AppState { pub(crate) storage_path: PathBuf, pub(crate) settings_path: PathBuf, pub(crate) app_settings: Mutex, + #[cfg(desktop)] + pub(crate) telegram_tx: + Mutex>>, } impl AppState { @@ -27,7 +31,12 @@ impl AppState { let storage_path = data_dir.join("workspaces.json"); let settings_path = data_dir.join("settings.json"); let workspaces = read_workspaces(&storage_path).unwrap_or_default(); - let app_settings = read_settings(&settings_path).unwrap_or_default(); + let mut app_settings = read_settings(&settings_path).unwrap_or_default(); + + if app_settings.telegram_pairing_secret.trim().is_empty() { + app_settings.telegram_pairing_secret = Uuid::new_v4().to_string(); + let _ = write_settings(&settings_path, &app_settings); + } Self { workspaces: Mutex::new(workspaces), sessions: Mutex::new(HashMap::new()), @@ -35,6 +44,8 @@ impl AppState { storage_path, settings_path, app_settings: Mutex::new(app_settings), + #[cfg(desktop)] + telegram_tx: Mutex::new(None), } } } diff --git a/src-tauri/src/telegram.rs b/src-tauri/src/telegram.rs new file mode 100644 index 000000000..c4c5e5d0c --- /dev/null +++ b/src-tauri/src/telegram.rs @@ -0,0 +1,1672 @@ +use std::collections::{HashMap, HashSet}; +use std::time::{Duration, Instant}; + +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use sha2::{Digest, Sha256}; +use tauri::{AppHandle, Manager, State}; +use tokio::sync::mpsc; +use tokio::sync::oneshot; +use tokio::time::sleep; + +use crate::codex::spawn_workspace_session; +use crate::state::AppState; +use crate::types::{WorkspaceInfo, WorkspaceKind}; + +#[derive(Debug, Clone)] +pub(crate) enum TelegramEvent { + AppServerEvent { workspace_id: String, message: Value }, +} + +#[derive(Debug, Serialize, Clone)] +pub(crate) struct TelegramBotStatus { + pub(crate) ok: bool, + pub(crate) username: Option, + pub(crate) id: Option, + pub(crate) error: Option, +} + +#[derive(Debug, Clone)] +struct TelegramConfig { + enabled: bool, + token: Option, + allowed_user_ids: Vec, + default_chat_id: Option, + send_app_status: bool, + send_completed: bool, + pairing_secret: String, + default_access_mode: String, +} + +fn compute_pairing_code(secret: &str) -> String { + let digest = Sha256::digest(secret.as_bytes()); + // 16 bytes -> 32 hex chars (easy to type, still collision-resistant for our use). + hex::encode(&digest[..16]) +} + +fn normalize_text_preview(value: &str) -> String { + let trimmed = value.trim().replace('\n', " "); + if trimmed.chars().count() > 80 { + trimmed.chars().take(77).collect::() + "…" + } else { + trimmed + } +} + +fn thread_key(workspace_id: &str, thread_id: &str) -> String { + format!("{workspace_id}::{thread_id}") +} + +fn normalize_status_label(value: &str) -> String { + let trimmed = value.trim().replace('\n', " "); + if trimmed.is_empty() { + return "(untitled)".to_string(); + } + // Keep status output compact like the sidebar list. + if trimmed.chars().count() > 46 { + trimmed.chars().take(43).collect::() + "…" + } else { + trimmed + } +} + +fn build_status_keyboard(items: &[TelegramStatusButton]) -> Value { + let mut rows: Vec> = Vec::new(); + for item in items { + rows.push(vec![json!({ + "text": item.label, + "callback_data": item.callback_data, + })]); + } + rows.push(vec![json!({"text":"πŸ”„ Refresh","callback_data":"status:refresh"}), json!({"text":"πŸ”Œ Disconnect","callback_data":"disconnect"})]); + json!({ "inline_keyboard": rows }) +} + +fn build_main_reply_keyboard() -> Value { + json!({ + "keyboard": [ + [{ "text": "πŸ“Š Status" }, { "text": "πŸ”Œ Disconnect" }], + ], + "resize_keyboard": true, + }) +} + +const TELEGRAM_MAX_TEXT_CHARS: usize = 4096; +// Keep a little headroom for any formatting/escaping and future additions. +const TELEGRAM_SAFE_TEXT_CHARS: usize = 3800; + +fn count_chars(value: &str) -> usize { + value.chars().count() +} + +fn split_telegram_text(value: &str, limit: usize) -> Vec { + if value.is_empty() { + return vec![String::new()]; + } + + let mut chunks: Vec = Vec::new(); + let mut current = String::new(); + + for part in value.split_inclusive('\n') { + if !current.is_empty() && count_chars(¤t) + count_chars(part) > limit { + chunks.push(current.trim_end_matches('\n').to_string()); + current.clear(); + } + + if count_chars(part) > limit { + let mut buf = String::new(); + for ch in part.chars() { + if count_chars(&buf) + 1 > limit { + chunks.push(buf.trim_end_matches('\n').to_string()); + buf.clear(); + } + buf.push(ch); + } + if !buf.is_empty() { + current.push_str(&buf); + } + } else { + current.push_str(part); + } + } + + if !current.is_empty() { + chunks.push(current.trim_end_matches('\n').to_string()); + } + + // Telegram hard limit safety (shouldn't be hit with our limits, but keep robust). + chunks + .into_iter() + .flat_map(|chunk| { + if count_chars(&chunk) <= TELEGRAM_MAX_TEXT_CHARS { + vec![chunk] + } else { + let mut split: Vec = Vec::new(); + let mut buf = String::new(); + for ch in chunk.chars() { + if count_chars(&buf) + 1 > TELEGRAM_MAX_TEXT_CHARS { + split.push(buf); + buf = String::new(); + } + buf.push(ch); + } + if !buf.is_empty() { + split.push(buf); + } + split + } + }) + .collect() +} + +fn build_connected_inline_keyboard() -> Value { + json!({ + "inline_keyboard": [ + [ + { "text": "πŸ“Š Status", "callback_data": "status:refresh" }, + { "text": "πŸ”Œ Disconnect", "callback_data": "disconnect" } + ] + ] + }) +} + +#[derive(Debug, Clone)] +struct TelegramStatusButton { + label: String, + callback_data: String, +} + +#[derive(Debug, Clone)] +struct ThreadSelection { + workspace_id: String, + thread_id: String, + label: String, +} + +#[derive(Debug, Clone)] +struct PendingReply { + chat_id: i64, + message_id: i64, + workspace_id: String, + thread_id: String, + turn_id: String, + thread_label: String, + created_at: Instant, +} + +#[derive(Debug, Deserialize)] +struct TelegramResponse { + ok: bool, + result: Option, + description: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramUpdate { + update_id: i64, + message: Option, + edited_message: Option, + callback_query: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramCallbackQuery { + id: Option, + from: Option, + message: Option, + data: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramUser { + id: i64, +} + +#[derive(Debug, Deserialize)] +struct TelegramMessage { + message_id: i64, + chat: TelegramChat, + from: Option, + text: Option, + photo: Option>, + caption: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramChat { + id: i64, +} + +#[derive(Debug, Deserialize)] +struct TelegramPhotoSize { + file_id: String, + width: Option, + height: Option, + file_size: Option, +} + +#[derive(Debug, Deserialize)] +struct TelegramBotInfo { + id: i64, + username: Option, +} + +#[derive(Clone)] +struct TelegramApi { + client: reqwest::Client, +} + +impl TelegramApi { + fn new() -> Self { + Self { + client: reqwest::Client::new(), + } + } + + async fn call Deserialize<'de>>( + &self, + token: &str, + method: &str, + params: Vec<(&str, String)>, + ) -> Result { + let url = format!("https://api.telegram.org/bot{token}/{method}"); + let res = self + .client + .post(url) + .form(¶ms) + .send() + .await + .map_err(|e| e.to_string())?; + let status = res.status(); + let text = res.text().await.map_err(|e| e.to_string())?; + if !status.is_success() { + return Err(format!("Telegram API error: HTTP {status} {text}")); + } + serde_json::from_str::(&text).map_err(|e| format!("{e}: {text}")) + } + + async fn get_me(&self, token: &str) -> Result { + let response: TelegramResponse = self.call(token, "getMe", vec![]).await?; + if response.ok { + response + .result + .ok_or_else(|| "Telegram getMe: missing result".to_string()) + } else { + Err(response + .description + .unwrap_or_else(|| "Telegram getMe failed".to_string())) + } + } + + async fn get_updates( + &self, + token: &str, + offset: Option, + ) -> Result, String> { + let mut params = vec![("timeout", "50".to_string())]; + if let Some(offset) = offset { + params.push(("offset", offset.to_string())); + } + // Only the update types we actually handle. + params.push(( + "allowed_updates", + r#"["message","edited_message","callback_query"]"#.to_string(), + )); + let response: TelegramResponse> = + self.call(token, "getUpdates", params).await?; + if response.ok { + Ok(response.result.unwrap_or_default()) + } else { + Err(response + .description + .unwrap_or_else(|| "Telegram getUpdates failed".to_string())) + } + } + + async fn send_message( + &self, + token: &str, + chat_id: i64, + text: &str, + reply_markup: Option, + ) -> Result { + let mut params = vec![("chat_id", chat_id.to_string()), ("text", text.to_string())]; + if let Some(markup) = reply_markup { + params.push(( + "reply_markup", + serde_json::to_string(&markup).map_err(|e| e.to_string())?, + )); + } + let response: TelegramResponse = + self.call(token, "sendMessage", params).await?; + if response.ok { + response + .result + .ok_or_else(|| "Telegram sendMessage: missing result".to_string()) + } else { + Err(response + .description + .unwrap_or_else(|| "Telegram sendMessage failed".to_string())) + } + } + + async fn edit_message_text( + &self, + token: &str, + chat_id: i64, + message_id: i64, + text: &str, + reply_markup: Option, + ) -> Result<(), String> { + let mut params = vec![ + ("chat_id", chat_id.to_string()), + ("message_id", message_id.to_string()), + ("text", text.to_string()), + ]; + if let Some(markup) = reply_markup { + params.push(( + "reply_markup", + serde_json::to_string(&markup).map_err(|e| e.to_string())?, + )); + } + let response: TelegramResponse = self.call(token, "editMessageText", params).await?; + if response.ok { + Ok(()) + } else { + Err(response + .description + .unwrap_or_else(|| "Telegram editMessageText failed".to_string())) + } + } + + async fn delete_message( + &self, + token: &str, + chat_id: i64, + message_id: i64, + ) -> Result<(), String> { + let params = vec![ + ("chat_id", chat_id.to_string()), + ("message_id", message_id.to_string()), + ]; + let response: TelegramResponse = self.call(token, "deleteMessage", params).await?; + if response.ok { + Ok(()) + } else { + Err(response + .description + .unwrap_or_else(|| "Telegram deleteMessage failed".to_string())) + } + } + + async fn answer_callback_query( + &self, + token: &str, + callback_query_id: &str, + text: Option<&str>, + ) -> Result<(), String> { + let mut params = vec![("callback_query_id", callback_query_id.to_string())]; + if let Some(text) = text { + params.push(("text", text.to_string())); + } + let response: TelegramResponse = + self.call(token, "answerCallbackQuery", params).await?; + if response.ok { + Ok(()) + } else { + Err(response + .description + .unwrap_or_else(|| "Telegram answerCallbackQuery failed".to_string())) + } + } + + async fn send_chat_action(&self, token: &str, chat_id: i64, action: &str) -> Result<(), String> { + let params = vec![ + ("chat_id", chat_id.to_string()), + ("action", action.to_string()), + ]; + let response: TelegramResponse = + self.call(token, "sendChatAction", params).await?; + if response.ok { + Ok(()) + } else { + Err(response + .description + .unwrap_or_else(|| "Telegram sendChatAction failed".to_string())) + } + } + + async fn get_file_path(&self, token: &str, file_id: &str) -> Result, String> { + #[derive(Debug, Deserialize)] + struct FileResult { + file_path: Option, + } + #[derive(Debug, Deserialize)] + struct FileResponse { + ok: bool, + result: Option, + description: Option, + } + let params = vec![("file_id", file_id.to_string())]; + let response: FileResponse = self.call(token, "getFile", params).await?; + if response.ok { + Ok(response.result.and_then(|r| r.file_path)) + } else { + Err(response + .description + .unwrap_or_else(|| "Telegram getFile failed".to_string())) + } + } +} + +async fn send_message_long( + api: &TelegramApi, + token: &str, + chat_id: i64, + text: &str, + reply_markup: Option, +) -> Result, String> { + let chunks = split_telegram_text(text, TELEGRAM_SAFE_TEXT_CHARS); + let mut sent: Vec = Vec::new(); + for (idx, chunk) in chunks.iter().enumerate() { + let markup = if idx == 0 { reply_markup.clone() } else { None }; + let msg = api.send_message(token, chat_id, chunk, markup).await?; + sent.push(msg); + } + Ok(sent) +} + +async fn edit_message_text_long( + api: &TelegramApi, + token: &str, + chat_id: i64, + message_id: i64, + text: &str, + reply_markup: Option, +) -> Result<(), String> { + let chunks = split_telegram_text(text, TELEGRAM_SAFE_TEXT_CHARS); + let first = chunks.first().cloned().unwrap_or_default(); + + match api + .edit_message_text(token, chat_id, message_id, &first, reply_markup.clone()) + .await + { + Ok(()) => {} + Err(_) => { + // Edits can fail if Telegram rejects the payload (e.g. length). Keep the inline + // keyboard and send the full content in follow-up messages. + let _ = api + .edit_message_text( + token, + chat_id, + message_id, + "βœ… Done. (see reply below)", + reply_markup, + ) + .await; + send_message_long(api, token, chat_id, text, Some(build_main_reply_keyboard())).await?; + return Ok(()); + } + } + + for chunk in chunks.iter().skip(1) { + let _ = api + .send_message(token, chat_id, chunk, Some(build_main_reply_keyboard())) + .await?; + } + + Ok(()) +} + +async fn animate_working_message( + api: TelegramApi, + token: String, + chat_id: i64, + message_id: i64, + label: String, + mut cancel: oneshot::Receiver<()>, +) { + // Keep it lightweight to avoid Telegram rate limits. + let frames = [ + "⏳ Working", + "⏳ Working.", + "⏳ Working..", + "⏳ Working...", + ]; + let mut idx: usize = 0; + loop { + let next = format!("{frame}\n\n➑️ Sending to:\n{label}", frame = frames[idx], label = label); + idx = (idx + 1) % frames.len(); + + tokio::select! { + _ = &mut cancel => { + break; + } + _ = sleep(Duration::from_millis(1200)) => { + // Best-effort. If edits fail (message deleted / too old / etc.), stop animating. + if api.edit_message_text(&token, chat_id, message_id, &next, None).await.is_err() { + break; + } + } + } + } +} + +fn read_config(settings: &crate::types::AppSettings) -> TelegramConfig { + TelegramConfig { + enabled: settings.telegram_enabled, + token: settings + .telegram_bot_token + .clone() + .filter(|value| !value.trim().is_empty()), + allowed_user_ids: settings.telegram_allowed_user_ids.clone(), + default_chat_id: settings.telegram_default_chat_id, + send_app_status: settings.telegram_send_app_status, + send_completed: settings.telegram_send_completed_messages, + pairing_secret: settings.telegram_pairing_secret.clone(), + default_access_mode: settings.default_access_mode.clone(), + } +} + +pub(crate) async fn notify_app_exit(app: AppHandle) { + let api = TelegramApi::new(); + let config = { + let state = app.state::(); + let settings = state.app_settings.lock().await; + read_config(&settings) + }; + + if !config.enabled || !config.send_app_status { + return; + } + let (Some(token), Some(chat_id)) = (config.token, config.default_chat_id) else { + return; + }; + let _ = api + .send_message( + &token, + chat_id, + "πŸ›‘ CodexMonitor stopped.", + Some(build_main_reply_keyboard()), + ) + .await; +} + +pub(crate) fn start_telegram_poller(app: AppHandle) { + tauri::async_runtime::spawn(async move { + let (tx, rx) = mpsc::unbounded_channel::(); + let state = app.state::(); + *state.telegram_tx.lock().await = Some(tx); + telegram_loop(app, rx).await; + }); +} + +pub(crate) fn emit_app_server_event(app: AppHandle, workspace_id: String, message: Value) { + tauri::async_runtime::spawn(async move { + let state = app.state::(); + let maybe_tx = state.telegram_tx.lock().await.clone(); + if let Some(tx) = maybe_tx { + let _ = tx.send(TelegramEvent::AppServerEvent { workspace_id, message }); + } + }); +} + +#[tauri::command] +pub(crate) async fn telegram_bot_status(state: State<'_, AppState>) -> Result { + let settings = state.app_settings.lock().await; + let config = read_config(&settings); + let token = config + .token + .ok_or_else(|| "Telegram token is not configured.".to_string())?; + let api = TelegramApi::new(); + match api.get_me(&token).await { + Ok(info) => Ok(TelegramBotStatus { + ok: true, + username: info.username, + id: Some(info.id), + error: None, + }), + Err(err) => Ok(TelegramBotStatus { + ok: false, + username: None, + id: None, + error: Some(err), + }), + } +} + +async fn ensure_workspace_connected( + app: &AppHandle, + workspace_id: &str, +) -> Result<(), String> { + let state = app.state::(); + { + let sessions = state.sessions.lock().await; + if let Some(session) = sessions.get(workspace_id) { + let mut child = session.child.lock().await; + match child.try_wait() { + Ok(Some(_)) => { + // Dead session: fall through and respawn below. + } + Ok(None) => return Ok(()), + Err(_) => return Ok(()), + } + } + } + let entry = { + let workspaces = state.workspaces.lock().await; + workspaces + .get(workspace_id) + .cloned() + .ok_or("workspace not found")? + }; + let default_bin = { + let settings = state.app_settings.lock().await; + settings.codex_bin.clone() + }; + let session = spawn_workspace_session(entry.clone(), default_bin, app.clone()).await?; + state.sessions.lock().await.insert(entry.id, session); + Ok(()) +} + +async fn telegram_loop(app: AppHandle, mut rx: mpsc::UnboundedReceiver) { + let api = TelegramApi::new(); + let mut last_config: Option = None; + let mut offset: Option = None; + + let mut selections: HashMap = HashMap::new(); + let mut pending: HashMap = HashMap::new(); + // Thread key -> pending map key (workspace::thread::turn) + let mut pending_by_thread: HashMap = HashMap::new(); + let mut pending_animation_cancel: HashMap> = HashMap::new(); + let mut running_threads: HashSet = HashSet::new(); + let mut sent_agent_item_ids: HashSet = HashSet::new(); + let mut known_thread_labels: HashMap = HashMap::new(); + + let mut status_tokens: HashMap = HashMap::new(); + let mut status_tokens_expires_at: HashMap = HashMap::new(); + + loop { + let config = { + let state = app.state::(); + let settings = state.app_settings.lock().await; + read_config(&settings) + }; + + if let Some(prev) = last_config.as_ref() { + if prev.enabled && !config.enabled && prev.send_app_status { + if let (Some(token), Some(chat_id)) = (prev.token.clone(), prev.default_chat_id) { + let _ = api + .send_message( + &token, + chat_id, + "πŸ›‘ CodexMonitor stopped.", + Some(build_main_reply_keyboard()), + ) + .await; + } + } + } + + if !config.enabled { + last_config = Some(config); + sleep(Duration::from_millis(750)).await; + continue; + } + + let Some(token) = config.token.clone() else { + last_config = Some(config); + sleep(Duration::from_millis(750)).await; + continue; + }; + + if last_config + .as_ref() + .map(|prev| !prev.enabled && config.enabled) + .unwrap_or(true) + { + if config.send_app_status { + if let Some(chat_id) = config.default_chat_id { + let _ = api + .send_message( + &token, + chat_id, + "βœ… CodexMonitor started.", + Some(build_main_reply_keyboard()), + ) + .await; + } + } + } + last_config = Some(config.clone()); + + // Cleanup expired tokens/pending. + let now = Instant::now(); + status_tokens_expires_at.retain(|token, exp| { + if *exp <= now { + status_tokens.remove(token); + false + } else { + true + } + }); + pending.retain(|_, pending| pending.created_at.elapsed() < Duration::from_secs(15 * 60)); + let valid_pending_keys: HashSet = pending.keys().cloned().collect(); + pending_by_thread.retain(|_, key| valid_pending_keys.contains(key)); + pending_animation_cancel.retain(|key, _| valid_pending_keys.contains(key)); + + tokio::select! { + Some(event) = rx.recv() => { + match event { + TelegramEvent::AppServerEvent { workspace_id, message } => { + let method = message.get("method").and_then(|v| v.as_str()).unwrap_or(""); + let params = message.get("params").cloned().unwrap_or(Value::Null); + if method == "turn/started" { + if let Some(turn) = params.get("turn") { + let thread_id = turn.get("threadId").or_else(|| turn.get("thread_id")).and_then(|v| v.as_str()).unwrap_or(""); + if !thread_id.is_empty() { + running_threads.insert(thread_key(&workspace_id, thread_id)); + } + } + } + if method == "turn/completed" || method == "error" { + if let Some(turn) = params.get("turn") { + let thread_id = turn.get("threadId").or_else(|| turn.get("thread_id")).and_then(|v| v.as_str()).unwrap_or(""); + if !thread_id.is_empty() { + running_threads.remove(&thread_key(&workspace_id, thread_id)); + } + } else { + let thread_id = params.get("threadId").or_else(|| params.get("thread_id")).and_then(|v| v.as_str()).unwrap_or(""); + if !thread_id.is_empty() { + running_threads.remove(&thread_key(&workspace_id, thread_id)); + } + } + } + + if method == "item/completed" { + let thread_id = params.get("threadId").or_else(|| params.get("thread_id")).and_then(|v| v.as_str()).unwrap_or("").to_string(); + let item = params.get("item").cloned().unwrap_or(Value::Null); + let item_type = item.get("type").and_then(|v| v.as_str()).unwrap_or(""); + if item_type == "agentMessage" { + let item_id = item.get("id").and_then(|v| v.as_str()).unwrap_or("").to_string(); + let text = item.get("text").and_then(|v| v.as_str()).unwrap_or("").to_string(); + let turn_id = item.get("turnId").or_else(|| item.get("turn_id")).and_then(|v| v.as_str()).unwrap_or("").to_string(); + + if !item_id.is_empty() && sent_agent_item_ids.contains(&item_id) { + continue; + } + if !item_id.is_empty() { + sent_agent_item_ids.insert(item_id.clone()); + } + + // 1) Exact match: pending turn id equals this agentMessage turn id. + if !turn_id.is_empty() { + let key = format!("{workspace_id}::{thread_id}::{turn_id}"); + if let Some(pending_entry) = pending.remove(&key) { + pending_by_thread.remove(&thread_key(&workspace_id, &thread_id)); + let _token_key = ensure_thread_token( + &pending_entry.workspace_id, + &pending_entry.thread_id, + &pending_entry.thread_label, + &mut status_tokens, + &mut status_tokens_expires_at, + ); + let reply_text = if text.trim().is_empty() { + format!("βœ… {}\n\nDone.", pending_entry.thread_label) + } else { + format!("βœ… {}\n\n{}", pending_entry.thread_label, text) + }; + let reply_text = format!( + "{reply_text}\n\n➑️ Next messages will go to:\n{}", + pending_entry.thread_label + ); + if let Some(cancel) = pending_animation_cancel.remove(&key) { + let _ = cancel.send(()); + } + let _ = api + .delete_message(&token, pending_entry.chat_id, pending_entry.message_id) + .await; + let _ = send_message_long( + &api, + &token, + pending_entry.chat_id, + &reply_text, + Some(build_main_reply_keyboard()), + ) + .await; + continue; + } + } + + // 2) Fallback: some app-server events don't include turn ids reliably. + // If we have a pending "working" message for this thread, consume it. + let tkey = thread_key(&workspace_id, &thread_id); + if let Some(pending_key) = pending_by_thread.get(&tkey).cloned() { + if let Some(pending_entry) = pending.remove(&pending_key) { + pending_by_thread.remove(&tkey); + let _token_key = ensure_thread_token( + &pending_entry.workspace_id, + &pending_entry.thread_id, + &pending_entry.thread_label, + &mut status_tokens, + &mut status_tokens_expires_at, + ); + let reply_text = if text.trim().is_empty() { + format!("βœ… {}\n\nDone.", pending_entry.thread_label) + } else { + format!("βœ… {}\n\n{}", pending_entry.thread_label, text) + }; + let reply_text = format!( + "{reply_text}\n\n➑️ Next messages will go to:\n{}", + pending_entry.thread_label + ); + if let Some(cancel) = pending_animation_cancel.remove(&pending_key) { + let _ = cancel.send(()); + } + let _ = api + .delete_message(&token, pending_entry.chat_id, pending_entry.message_id) + .await; + let _ = send_message_long( + &api, + &token, + pending_entry.chat_id, + &reply_text, + Some(build_main_reply_keyboard()), + ) + .await; + continue; + } + } + + if config.send_completed { + if let Some(chat_id) = config.default_chat_id { + let label = known_thread_labels + .get(&thread_key(&workspace_id, &thread_id)) + .cloned() + .unwrap_or_else(|| format!("Agent {thread_id}")); + let token_key = ensure_thread_token( + &workspace_id, + &thread_id, + &label, + &mut status_tokens, + &mut status_tokens_expires_at, + ); + let preview = if text.trim().is_empty() { + "βœ… Agent completed.".to_string() + } else { + // For Telegram-only mode we prefer full answers rather than + // abbreviated previews. + format!("βœ… {}\n\n{}", label, text) + }; + let preview = format!( + "{preview}\n\n➑️ Next messages will go to:\n{label}" + ); + let _ = send_message_long( + &api, + &token, + chat_id, + &preview, + Some(build_reply_after_completion(&token_key)), + ) + .await; + } + } + } + } + } + } + } + updates = api.get_updates(&token, offset) => { + let updates = match updates { + Ok(value) => value, + Err(_) => { + sleep(Duration::from_millis(750)).await; + continue; + } + }; + for upd in updates { + offset = Some(upd.update_id + 1); + + if let Some(callback) = upd.callback_query { + let from_id = callback.from.as_ref().map(|u| u.id); + let chat_id = callback.message.as_ref().map(|m| m.chat.id); + let msg_id = callback.message.as_ref().map(|m| m.message_id); + let data = callback.data.unwrap_or_default(); + + if let (Some(user_id), Some(chat_id), Some(msg_id)) = (from_id, chat_id, msg_id) { + if !config.allowed_user_ids.contains(&user_id) { + if let Some(cb_id) = callback.id.as_deref() { + let _ = api.answer_callback_query(&token, cb_id, Some("Not authorized.")).await; + } + continue; + } + + if data == "disconnect" { + selections.remove(&chat_id); + let _ = api + .edit_message_text( + &token, + chat_id, + msg_id, + "πŸ”Œ Disconnected. Use /status to pick an agent.", + None, + ) + .await; + let _ = api + .send_message( + &token, + chat_id, + "Use the buttons below to continue.", + Some(build_main_reply_keyboard()), + ) + .await; + if let Some(cb_id) = callback.id.as_deref() { + let _ = api.answer_callback_query(&token, cb_id, Some("Disconnected.")).await; + } + continue; + } + + if data == "status:refresh" { + let _ = api.answer_callback_query(&token, callback.id.as_deref().unwrap_or(""), Some("Refreshing…")).await; + // Fallthrough: treat as /status from this chat. + let _ = send_status(&app, &api, &token, &config, chat_id, &running_threads, &mut status_tokens, &mut status_tokens_expires_at, &mut known_thread_labels).await; + continue; + } + + if let Some(rest) = data.strip_prefix("select:") { + if let Some(sel) = status_tokens.get(rest).cloned() { + known_thread_labels.insert( + thread_key(&sel.workspace_id, &sel.thread_id), + sel.label.clone(), + ); + selections.insert(chat_id, sel.clone()); + let text = format!("βœ… Connected. Send messages now.\n\nAgent: {}", sel.label); + let _ = api + .edit_message_text( + &token, + chat_id, + msg_id, + &text, + Some(build_connected_inline_keyboard()), + ) + .await; + if let Ok(Some(preview)) = fetch_thread_preview(&app, &sel.workspace_id, &sel.thread_id).await { + let last = normalize_text_preview(&preview); + let _ = api + .send_message( + &token, + chat_id, + &format!("🧠 Last reply:\n{last}"), + Some(build_main_reply_keyboard()), + ) + .await; + } + if let Some(cb_id) = callback.id.as_deref() { + let _ = api.answer_callback_query(&token, cb_id, Some("Connected.")).await; + } + } else if let Some(cb_id) = callback.id.as_deref() { + let _ = api.answer_callback_query(&token, cb_id, Some("Selection expired. Use /status again.")).await; + } + continue; + } + + if let Some(rest) = data.strip_prefix("new:") { + if let Some(sel) = status_tokens.get(rest).cloned() { + let _ = ensure_workspace_connected(&app, &sel.workspace_id).await; + let thread_id = match start_new_thread(&app, &sel.workspace_id).await { + Ok(id) => id, + Err(err) => { + let _ = api.edit_message_text(&token, chat_id, msg_id, &format!("Failed to start thread: {err}"), None).await; + continue; + } + }; + let next_sel = ThreadSelection { + workspace_id: sel.workspace_id, + thread_id: thread_id.clone(), + label: "New agent".to_string(), + }; + known_thread_labels.insert( + thread_key(&next_sel.workspace_id, &next_sel.thread_id), + next_sel.label.clone(), + ); + selections.insert(chat_id, next_sel.clone()); + let text = format!("πŸ†• New agent started.\n\nAgent: {}", next_sel.label); + let _ = api + .edit_message_text( + &token, + chat_id, + msg_id, + &text, + Some(build_connected_inline_keyboard()), + ) + .await; + let _ = api + .send_message( + &token, + chat_id, + "Send a message to start.", + Some(build_main_reply_keyboard()), + ) + .await; + } else if let Some(cb_id) = callback.id.as_deref() { + let _ = api.answer_callback_query(&token, cb_id, Some("Selection expired. Use /status again.")).await; + } + continue; + } + } + + if let Some(cb_id) = callback.id.as_deref() { + let _ = api.answer_callback_query(&token, cb_id, None).await; + } + continue; + } + + let message = upd.message.or(upd.edited_message); + if let Some(message) = message { + let chat_id = message.chat.id; + let user_id = message.from.as_ref().map(|u| u.id); + let text = message.text.clone().or(message.caption.clone()).unwrap_or_default(); + let trimmed = text.trim().to_string(); + + // Always allow pairing if the code matches. + if let Some(rest) = trimmed.strip_prefix("/link ") { + let expected = compute_pairing_code(&config.pairing_secret); + if rest.trim() == expected { + if let Some(uid) = user_id { + let _ = link_user(&app, uid, chat_id).await; + let _ = api.send_message(&token, chat_id, "βœ… Linked. Use /status to pick an agent.", Some(build_main_reply_keyboard())).await; + } else { + let _ = api.send_message(&token, chat_id, "Failed to link: missing user id.", None).await; + } + } else { + let _ = api.send_message(&token, chat_id, "Invalid link code.", None).await; + } + continue; + } + + let Some(uid) = user_id else { + continue; + }; + if !config.allowed_user_ids.contains(&uid) { + let hint = "Not linked yet. Open CodexMonitor β†’ Settings β†’ Cloud β†’ Telegram and send the /link code to this bot."; + let _ = api.send_message(&token, chat_id, hint, None).await; + continue; + } + + if trimmed == "/start" || trimmed == "/help" { + let pairing = compute_pairing_code(&config.pairing_secret); + let msg = format!( + "πŸ€– CodexMonitor Telegram control\n\nCommands:\n/status - pick an agent\n/disconnect - detach\n\nIf you haven't linked yet, send:\n/link {pairing}" + ); + let _ = api.send_message(&token, chat_id, &msg, Some(build_main_reply_keyboard())).await; + continue; + } + + if trimmed == "/status" || trimmed.eq_ignore_ascii_case("status") || trimmed == "πŸ“Š Status" { + let _ = send_status(&app, &api, &token, &config, chat_id, &running_threads, &mut status_tokens, &mut status_tokens_expires_at, &mut known_thread_labels).await; + continue; + } + + if trimmed == "/disconnect" || trimmed == "πŸ”Œ Disconnect" { + selections.remove(&chat_id); + let _ = api.send_message(&token, chat_id, "πŸ”Œ Disconnected. Use /status to pick an agent.", Some(build_main_reply_keyboard())).await; + continue; + } + + let Some(selection) = selections.get(&chat_id).cloned() else { + let _ = api.send_message(&token, chat_id, "Pick an agent first: /status", Some(build_main_reply_keyboard())).await; + continue; + }; + + if ensure_workspace_connected(&app, &selection.workspace_id).await.is_err() { + let _ = api.send_message(&token, chat_id, "Workspace is not connected.", None).await; + continue; + } + + let _ = api.send_chat_action(&token, chat_id, "typing").await; + let working = api + .send_message( + &token, + chat_id, + &format!("⏳ Working…\n\n➑️ Sending to:\n{}", selection.label), + Some(build_main_reply_keyboard()), + ) + .await; + let working_msg_id = working.map(|m| m.message_id).unwrap_or(0); + + let mut images: Vec = Vec::new(); + if let Some(photo) = message.photo.as_ref() { + if let Some(best) = photo.iter().max_by_key(|p| p.file_size.unwrap_or(0)) { + if let Ok(Some(path)) = api.get_file_path(&token, &best.file_id).await { + images.push(format!("https://api.telegram.org/file/bot{token}/{path}")); + } + } + } + + match send_to_codex( + &app, + &selection.workspace_id, + &selection.thread_id, + &trimmed, + &config.default_access_mode, + if images.is_empty() { None } else { Some(images) }, + ).await { + Ok(turn_id) => { + if working_msg_id != 0 && !turn_id.trim().is_empty() { + let key = format!("{}::{}::{}", selection.workspace_id, selection.thread_id, turn_id); + let tkey = thread_key(&selection.workspace_id, &selection.thread_id); + pending_by_thread.insert(tkey, key.clone()); + + let (cancel_tx, cancel_rx) = oneshot::channel::<()>(); + pending_animation_cancel.insert(key.clone(), cancel_tx); + tauri::async_runtime::spawn(animate_working_message( + api.clone(), + token.clone(), + chat_id, + working_msg_id, + selection.label.clone(), + cancel_rx, + )); + + pending.insert(key, PendingReply { + chat_id, + message_id: working_msg_id, + workspace_id: selection.workspace_id, + thread_id: selection.thread_id, + turn_id: turn_id.clone(), + thread_label: selection.label, + created_at: Instant::now(), + }); + } else if working_msg_id != 0 { + let _ = api.edit_message_text(&token, chat_id, working_msg_id, "βœ… Sent.", None).await; + } + } + Err(err) => { + if working_msg_id != 0 { + let _ = api.edit_message_text(&token, chat_id, working_msg_id, &format!("❌ {err}"), None).await; + } else { + let _ = api.send_message(&token, chat_id, &format!("❌ {err}"), None).await; + } + } + } + } + } + } + } + } +} + +fn short_token_digest(value: &[u8]) -> String { + // 8 bytes => 16 hex chars; fits safely into Telegram callback_data limits. + hex::encode(&value[..8]) +} + +fn thread_token_for(workspace_id: &str, thread_id: &str) -> String { + let digest = Sha256::digest(format!("{workspace_id}:{thread_id}").as_bytes()); + format!("t{}", short_token_digest(&digest)) +} + +fn workspace_token_for(workspace_id: &str) -> String { + let digest = Sha256::digest(workspace_id.as_bytes()); + format!("w{}", short_token_digest(&digest)) +} + +fn ensure_thread_token( + workspace_id: &str, + thread_id: &str, + label: &str, + status_tokens: &mut HashMap, + status_tokens_expires_at: &mut HashMap, +) -> String { + let token_key = thread_token_for(workspace_id, thread_id); + status_tokens.insert( + token_key.clone(), + ThreadSelection { + workspace_id: workspace_id.to_string(), + thread_id: thread_id.to_string(), + label: label.to_string(), + }, + ); + status_tokens_expires_at.insert( + token_key.clone(), + Instant::now() + Duration::from_secs(10 * 60), + ); + token_key +} + +fn build_reply_after_completion(_token_key: &str) -> Value { + json!({ + "inline_keyboard": [ + [{ "text": "πŸ“Š Status", "callback_data": "status:refresh" }, { "text": "πŸ”Œ Disconnect", "callback_data": "disconnect" }] + ] + }) +} + +async fn start_new_thread(app: &AppHandle, workspace_id: &str) -> Result { + let state = app.state::(); + let sessions = state.sessions.lock().await; + let session = sessions.get(workspace_id).ok_or("workspace not connected")?; + let params = json!({ + "cwd": session.entry.path, + "approvalPolicy": "on-request" + }); + let response = session.send_request("thread/start", params).await?; + let thread_id = response + .get("result") + .and_then(|result| result.get("thread")) + .and_then(|thread| thread.get("id")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if thread_id.is_empty() { + Err("thread/start did not return a thread id".to_string()) + } else { + Ok(thread_id) + } +} + +async fn send_to_codex( + app: &AppHandle, + workspace_id: &str, + thread_id: &str, + text: &str, + access_mode: &str, + images: Option>, +) -> Result { + let state = app.state::(); + let sessions = state.sessions.lock().await; + let session = sessions.get(workspace_id).ok_or("workspace not connected")?; + + let sandbox_policy = match access_mode { + "full-access" => json!({ "type": "dangerFullAccess" }), + "read-only" => json!({ "type": "readOnly" }), + _ => json!({ + "type": "workspaceWrite", + "writableRoots": [session.entry.path], + "networkAccess": true + }), + }; + + let approval_policy = if access_mode == "full-access" { + "never" + } else { + "on-request" + }; + + let trimmed_text = text.trim(); + let mut input: Vec = Vec::new(); + if !trimmed_text.is_empty() { + input.push(json!({ "type": "text", "text": trimmed_text })); + } + if let Some(paths) = images { + for path in paths { + let trimmed = path.trim().to_string(); + if trimmed.is_empty() { + continue; + } + input.push(json!({ "type": "image", "url": trimmed })); + } + } + if input.is_empty() { + return Err("empty user message".to_string()); + } + + let params = json!({ + "threadId": thread_id, + "input": input, + "cwd": session.entry.path, + "approvalPolicy": approval_policy, + "sandboxPolicy": sandbox_policy, + "model": Value::Null, + "effort": Value::Null, + }); + let response = session.send_request("turn/start", params).await?; + let turn_id = response + .get("result") + .and_then(|result| result.get("turn")) + .and_then(|turn| turn.get("id")) + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + Ok(turn_id) +} + +async fn link_user(app: &AppHandle, user_id: i64, chat_id: i64) -> Result<(), String> { + let state = app.state::(); + let mut settings = state.app_settings.lock().await; + if !settings.telegram_allowed_user_ids.contains(&user_id) { + settings.telegram_allowed_user_ids.push(user_id); + } + if settings.telegram_default_chat_id.is_none() { + settings.telegram_default_chat_id = Some(chat_id); + } + crate::storage::write_settings(&state.settings_path, &settings)?; + Ok(()) +} + +async fn send_status( + app: &AppHandle, + api: &TelegramApi, + token: &str, + config: &TelegramConfig, + chat_id: i64, + running_threads: &HashSet, + status_tokens: &mut HashMap, + status_tokens_expires_at: &mut HashMap, + known_thread_labels: &mut HashMap, +) -> Result<(), String> { + let workspaces = list_workspaces_info(app).await; + if workspaces.is_empty() { + api.send_message( + token, + chat_id, + "No workspaces yet.", + Some(build_main_reply_keyboard()), + ) + .await?; + return Ok(()); + } + + let mut text_lines: Vec = Vec::new(); + let mut buttons: Vec = Vec::new(); + + for workspace in workspaces { + text_lines.push(format!("#{}", workspace.name)); + let ws_id = workspace.id.clone(); + if let Err(err) = ensure_workspace_connected(app, &ws_id).await { + text_lines.push(format!(" (failed to connect: {err})")); + continue; + } + + let mut threads: Vec = Vec::new(); + let mut last_err: Option = None; + for attempt in 0..5 { + match list_threads_preview(app, &ws_id).await { + Ok(list) => { + if !list.is_empty() { + threads = list; + last_err = None; + break; + } + threads = list; + } + Err(err) => { + last_err = Some(err); + } + } + if attempt < 4 { + sleep(Duration::from_millis(950)).await; + } + } + if threads.is_empty() { + if let Some(err) = last_err { + text_lines.push(format!(" (failed to list threads: {err})")); + } else { + text_lines.push(" (no threads yet β€” try /status again in a moment)".to_string()); + } + } else { + let max_threads = 7usize; + for thread in threads.iter().take(max_threads) { + let key = thread_key(&ws_id, &thread.thread_id); + let icon = if running_threads.contains(&key) { "πŸ”΅" } else { "🟒" }; + text_lines.push(format!(" {icon} {}", thread.label)); + } + if threads.len() > max_threads { + text_lines.push(format!(" … {} more", threads.len() - max_threads)); + } + } + + let tok = workspace_token_for(&ws_id); + status_tokens.insert( + tok.clone(), + ThreadSelection { + workspace_id: ws_id.clone(), + thread_id: String::new(), + label: workspace.name.clone(), + }, + ); + status_tokens_expires_at.insert(tok.clone(), Instant::now() + Duration::from_secs(10 * 60)); + buttons.push(TelegramStatusButton { + label: format!("βž• New agent Β· {}", workspace.name), + callback_data: format!("new:{tok}"), + }); + + for thread in threads.iter().take(7) { + known_thread_labels.insert(thread_key(&ws_id, &thread.thread_id), thread.label.clone()); + let token_key = thread_token_for(&ws_id, &thread.thread_id); + status_tokens.insert( + token_key.clone(), + ThreadSelection { + workspace_id: ws_id.clone(), + thread_id: thread.thread_id.clone(), + label: thread.label.clone(), + }, + ); + status_tokens_expires_at.insert( + token_key.clone(), + Instant::now() + Duration::from_secs(10 * 60), + ); + buttons.push(TelegramStatusButton { + label: thread.label.clone(), + callback_data: format!("select:{token_key}"), + }); + } + } + + let header = if config.default_chat_id == Some(chat_id) { + "πŸ“Š CodexMonitor status (notifications target)".to_string() + } else { + "πŸ“Š CodexMonitor status".to_string() + }; + let body = if text_lines.is_empty() { + "No workspaces.".to_string() + } else { + text_lines.join("\n") + }; + let text = format!("{header}\n\n{body}"); + let _ = send_message_long( + api, + token, + chat_id, + &text, + Some(build_status_keyboard(&buttons)), + ) + .await?; + Ok(()) +} + +#[derive(Debug, Clone)] +struct ThreadPreview { + thread_id: String, + label: String, +} + +async fn list_threads_preview(app: &AppHandle, workspace_id: &str) -> Result, String> { + let state = app.state::(); + let sessions = state.sessions.lock().await; + let session = sessions.get(workspace_id).ok_or("workspace not connected")?; + let workspace_path = session.entry.path.clone(); + let canonical_workspace = std::fs::canonicalize(&workspace_path) + .ok() + .and_then(|p| p.to_str().map(|s| s.to_string())); + + // App-server returns a paginated list in `result.data` + `result.nextCursor`. + // Match the UI behavior and filter by cwd. + let mut cursor: Option = None; + let mut collected: Vec = Vec::new(); + for _ in 0..3 { + let response = session + .send_request( + "thread/list", + json!({ + "cursor": cursor, + "limit": 40, + }), + ) + .await?; + let result = response.get("result").unwrap_or(&response); + let data = result + .get("data") + .and_then(|v| v.as_array()) + .cloned() + .unwrap_or_default(); + collected.extend(data); + + let next_cursor = result + .get("nextCursor") + .or_else(|| result.get("next_cursor")) + .and_then(|v| v.as_str()) + .map(|v| v.to_string()); + + cursor = next_cursor; + if cursor.is_none() || collected.len() >= 80 { + break; + } + } + + let mut threads: Vec = Vec::new(); + for thread in collected { + let Some(cwd) = thread.get("cwd").and_then(|v| v.as_str()) else { + continue; + }; + if cwd == workspace_path { + threads.push(thread); + continue; + } + // Fallback: handle symlink/canonical path differences. + if let (Some(cws), Ok(ccwd)) = (canonical_workspace.as_deref(), std::fs::canonicalize(cwd)) + { + if let Some(ccwd_str) = ccwd.to_str() { + if ccwd_str == cws { + threads.push(thread); + } + } + } + } + let mut previews = Vec::new(); + for thread in threads { + let id = thread.get("id").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if id.is_empty() { + continue; + } + let preview_text = thread + .get("preview") + .and_then(|v| v.as_str()) + .unwrap_or("") + .trim() + .to_string(); + let title = thread + .get("title") + .or_else(|| thread.get("name")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + let label_source = if !preview_text.is_empty() { + preview_text + } else if !title.trim().is_empty() { + title.to_string() + } else { + format!("Agent {id}") + }; + let label = normalize_status_label(&label_source); + previews.push(ThreadPreview { + thread_id: id, + label, + }); + } + Ok(previews) +} + +async fn fetch_thread_preview( + app: &AppHandle, + workspace_id: &str, + thread_id: &str, +) -> Result, String> { + if thread_id.trim().is_empty() { + return Ok(None); + } + let state = app.state::(); + let sessions = state.sessions.lock().await; + let session = sessions.get(workspace_id).ok_or("workspace not connected")?; + let response = session + .send_request( + "thread/resume", + json!({ + "threadId": thread_id, + }), + ) + .await?; + + // Best-effort: find the last agentMessage text from the resume payload. + fn collect_agent_messages(value: &Value, out: &mut Vec) { + match value { + Value::Array(items) => { + for item in items { + collect_agent_messages(item, out); + } + } + Value::Object(map) => { + if map + .get("type") + .and_then(|v| v.as_str()) + .is_some_and(|v| v == "agentMessage") + { + if let Some(text) = map.get("text").and_then(|v| v.as_str()) { + if !text.trim().is_empty() { + out.push(text.to_string()); + } + } + } + for value in map.values() { + collect_agent_messages(value, out); + } + } + _ => {} + } + } + + let mut agent_texts: Vec = Vec::new(); + collect_agent_messages(&response, &mut agent_texts); + if let Some(last) = agent_texts.into_iter().last() { + return Ok(Some(last)); + } + + let thread = response + .get("result") + .and_then(|v| v.get("thread")) + .or_else(|| response.get("thread")); + let preview = thread + .and_then(|t| t.get("preview")) + .and_then(|v| v.as_str()) + .map(|v| v.to_string()); + Ok(preview) +} + +async fn list_workspaces_info(app: &AppHandle) -> Vec { + let state = app.state::(); + let workspaces = state.workspaces.lock().await; + let sessions = state.sessions.lock().await; + let mut result = Vec::new(); + for entry in workspaces.values() { + result.push(WorkspaceInfo { + id: entry.id.clone(), + name: entry.name.clone(), + path: entry.path.clone(), + connected: sessions.contains_key(&entry.id), + codex_bin: entry.codex_bin.clone(), + kind: entry.kind.clone(), + parent_id: entry.parent_id.clone(), + worktree: entry.worktree.clone(), + settings: entry.settings.clone(), + }); + } + result.sort_by(|a, b| { + let a_order = a.settings.sort_order.unwrap_or(u32::MAX); + let b_order = b.settings.sort_order.unwrap_or(u32::MAX); + a_order.cmp(&b_order).then_with(|| a.name.cmp(&b.name)) + }); + // Only main workspaces; worktrees are nested and noisy for Telegram. + result + .into_iter() + .filter(|workspace| matches!(workspace.kind, WorkspaceKind::Main)) + .collect() +} diff --git a/src-tauri/src/telegram_stub.rs b/src-tauri/src/telegram_stub.rs new file mode 100644 index 000000000..c034f4f97 --- /dev/null +++ b/src-tauri/src/telegram_stub.rs @@ -0,0 +1,34 @@ +use serde::Serialize; +use serde_json::Value; +use tauri::{AppHandle, State}; + +use crate::state::AppState; + +#[derive(Debug, Clone)] +pub(crate) enum TelegramEvent { + AppServerEvent { workspace_id: String, message: Value }, +} + +#[derive(Debug, Serialize, Clone)] +pub(crate) struct TelegramBotStatus { + pub(crate) ok: bool, + pub(crate) username: Option, + pub(crate) id: Option, + pub(crate) error: Option, +} + +pub(crate) fn start_telegram_poller(_app: AppHandle) {} + +pub(crate) fn emit_app_server_event(_app: AppHandle, _workspace_id: String, _message: Value) {} + +pub(crate) async fn notify_app_exit(_app: AppHandle) {} + +#[tauri::command] +pub(crate) async fn telegram_bot_status(_state: State<'_, AppState>) -> Result { + Ok(TelegramBotStatus { + ok: false, + username: None, + id: None, + error: Some("Telegram integration is only available on desktop builds.".to_string()), + }) +} diff --git a/src-tauri/src/types.rs b/src-tauri/src/types.rs index f517317e5..17c75a231 100644 --- a/src-tauri/src/types.rs +++ b/src-tauri/src/types.rs @@ -128,6 +128,20 @@ pub(crate) struct WorkspaceSettings { pub(crate) struct AppSettings { #[serde(default, rename = "codexBin")] pub(crate) codex_bin: Option, + #[serde(default, rename = "telegramEnabled")] + pub(crate) telegram_enabled: bool, + #[serde(default, rename = "telegramBotToken")] + pub(crate) telegram_bot_token: Option, + #[serde(default, rename = "telegramAllowedUserIds")] + pub(crate) telegram_allowed_user_ids: Vec, + #[serde(default, rename = "telegramDefaultChatId")] + pub(crate) telegram_default_chat_id: Option, + #[serde(default, rename = "telegramSendAppStatus")] + pub(crate) telegram_send_app_status: bool, + #[serde(default, rename = "telegramSendCompletedMessages")] + pub(crate) telegram_send_completed_messages: bool, + #[serde(default, rename = "telegramPairingSecret")] + pub(crate) telegram_pairing_secret: String, #[serde(default = "default_access_mode", rename = "defaultAccessMode")] pub(crate) default_access_mode: String, #[serde(default = "default_ui_scale", rename = "uiScale")] @@ -155,6 +169,13 @@ impl Default for AppSettings { fn default() -> Self { Self { codex_bin: None, + telegram_enabled: false, + telegram_bot_token: None, + telegram_allowed_user_ids: Vec::new(), + telegram_default_chat_id: None, + telegram_send_app_status: false, + telegram_send_completed_messages: false, + telegram_pairing_secret: String::new(), default_access_mode: "current".to_string(), ui_scale: 1.0, notification_sounds_enabled: true, @@ -170,6 +191,13 @@ mod tests { fn app_settings_defaults_from_empty_json() { let settings: AppSettings = serde_json::from_str("{}").expect("settings deserialize"); assert!(settings.codex_bin.is_none()); + assert!(!settings.telegram_enabled); + assert!(settings.telegram_bot_token.is_none()); + assert!(settings.telegram_allowed_user_ids.is_empty()); + assert!(settings.telegram_default_chat_id.is_none()); + assert!(!settings.telegram_send_app_status); + assert!(!settings.telegram_send_completed_messages); + assert!(settings.telegram_pairing_secret.is_empty()); assert_eq!(settings.default_access_mode, "current"); assert!((settings.ui_scale - 1.0).abs() < f64::EPSILON); assert!(settings.notification_sounds_enabled); diff --git a/src/App.tsx b/src/App.tsx index 6d42ddc55..ac36e3b10 100644 --- a/src/App.tsx +++ b/src/App.tsx @@ -62,6 +62,7 @@ import { useCopyThread } from "./features/threads/hooks/useCopyThread"; import { usePanelVisibility } from "./features/layout/hooks/usePanelVisibility"; import { useTerminalController } from "./features/terminal/hooks/useTerminalController"; import { playNotificationSound } from "./utils/notificationSounds"; +import { telegramBotStatus } from "./services/tauri"; import type { AccessMode, DiffLineReference, QueuedMessage, WorkspaceInfo } from "./types"; function useWindowLabel() { @@ -565,6 +566,7 @@ function MainApp() { } const handleOpenSettings = () => setSettingsOpen(true); + const handleTelegramBotStatus = useCallback(() => telegramBotStatus(), []); const orderValue = (entry: WorkspaceInfo) => typeof entry.settings.sortOrder === "number" @@ -959,6 +961,7 @@ function MainApp() { await queueSaveSettings(next); }} onRunDoctor={doctor} + onTelegramBotStatus={handleTelegramBotStatus} onUpdateWorkspaceCodexBin={async (id, codexBin) => { await updateWorkspaceCodexBin(id, codexBin); }} diff --git a/src/features/app/hooks/useAppServerEvents.ts b/src/features/app/hooks/useAppServerEvents.ts index d44d3e01c..04a9aa06f 100644 --- a/src/features/app/hooks/useAppServerEvents.ts +++ b/src/features/app/hooks/useAppServerEvents.ts @@ -98,8 +98,10 @@ export function useAppServerEvents(handlers: AppServerEventHandlers) { if (method === "turn/started") { const params = message.params as Record; const turn = params.turn as Record | undefined; - const threadId = String(turn?.threadId ?? turn?.thread_id ?? ""); - const turnId = String(turn?.id ?? ""); + const threadId = String( + turn?.threadId ?? turn?.thread_id ?? params.threadId ?? params.thread_id ?? "", + ); + const turnId = String(turn?.id ?? params.turnId ?? params.turn_id ?? ""); if (threadId) { handlers.onTurnStarted?.(workspace_id, threadId, turnId); } @@ -125,8 +127,10 @@ export function useAppServerEvents(handlers: AppServerEventHandlers) { if (method === "turn/completed") { const params = message.params as Record; const turn = params.turn as Record | undefined; - const threadId = String(turn?.threadId ?? turn?.thread_id ?? ""); - const turnId = String(turn?.id ?? ""); + const threadId = String( + turn?.threadId ?? turn?.thread_id ?? params.threadId ?? params.thread_id ?? "", + ); + const turnId = String(turn?.id ?? params.turnId ?? params.turn_id ?? ""); if (threadId) { handlers.onTurnCompleted?.(workspace_id, threadId, turnId); } diff --git a/src/features/settings/components/SettingsView.tsx b/src/features/settings/components/SettingsView.tsx index 74e8c90d1..b7da5bb2f 100644 --- a/src/features/settings/components/SettingsView.tsx +++ b/src/features/settings/components/SettingsView.tsx @@ -5,15 +5,19 @@ import { ChevronUp, LayoutGrid, SlidersHorizontal, + MessageCircle, Stethoscope, TerminalSquare, Trash2, X, } from "lucide-react"; -import type { AppSettings, CodexDoctorResult, WorkspaceInfo } from "../../../types"; -import { - clampUiScale, -} from "../../../utils/uiScale"; +import type { + AppSettings, + CodexDoctorResult, + TelegramBotStatus, + WorkspaceInfo, +} from "../../../types"; +import { clampUiScale } from "../../../utils/uiScale"; type SettingsViewProps = { workspaces: WorkspaceInfo[]; @@ -25,13 +29,14 @@ type SettingsViewProps = { appSettings: AppSettings; onUpdateAppSettings: (next: AppSettings) => Promise; onRunDoctor: (codexBin: string | null) => Promise; + onTelegramBotStatus?: () => Promise; onUpdateWorkspaceCodexBin: (id: string, codexBin: string | null) => Promise; scaleShortcutTitle: string; scaleShortcutText: string; onTestNotificationSound: () => void; }; -type SettingsSection = "projects" | "display"; +type SettingsSection = "projects" | "display" | "integrations"; type CodexSection = SettingsSection | "codex"; function orderValue(workspace: WorkspaceInfo) { @@ -49,6 +54,7 @@ export function SettingsView({ appSettings, onUpdateAppSettings, onRunDoctor, + onTelegramBotStatus, onUpdateWorkspaceCodexBin, scaleShortcutTitle, scaleShortcutText, @@ -64,6 +70,14 @@ export function SettingsView({ status: "idle" | "running" | "done"; result: CodexDoctorResult | null; }>({ status: "idle", result: null }); + const [telegramTokenDraft, setTelegramTokenDraft] = useState( + appSettings.telegramBotToken ?? "", + ); + const [telegramStatusState, setTelegramStatusState] = useState<{ + status: "idle" | "running" | "done"; + result: TelegramBotStatus | null; + }>({ status: "idle", result: null }); + const [telegramPairingCode, setTelegramPairingCode] = useState(null); const [isSavingSettings, setIsSavingSettings] = useState(false); const projects = useMemo(() => { @@ -87,6 +101,36 @@ export function SettingsView({ setScaleDraft(`${Math.round(clampUiScale(appSettings.uiScale) * 100)}%`); }, [appSettings.uiScale]); + useEffect(() => { + setTelegramTokenDraft(appSettings.telegramBotToken ?? ""); + }, [appSettings.telegramBotToken]); + + useEffect(() => { + let active = true; + if (!appSettings.telegramPairingSecret?.trim?.()) { + setTelegramPairingCode(null); + return; + } + void (async () => { + try { + const enc = new TextEncoder().encode(appSettings.telegramPairingSecret); + const digest = await crypto.subtle.digest("SHA-256", enc); + const bytes = Array.from(new Uint8Array(digest)).slice(0, 16); + const hex = bytes.map((b) => b.toString(16).padStart(2, "0")).join(""); + if (active) { + setTelegramPairingCode(hex); + } + } catch { + if (active) { + setTelegramPairingCode(null); + } + } + })(); + return () => { + active = false; + }; + }, [appSettings.telegramPairingSecret]); + useEffect(() => { setOverrideDrafts((prev) => { const next: Record = {}; @@ -180,6 +224,56 @@ export function SettingsView({ } }; + const updateTelegramSettings = async (patch: Partial) => { + setIsSavingSettings(true); + try { + await onUpdateAppSettings({ + ...appSettings, + ...patch, + }); + } finally { + setIsSavingSettings(false); + } + }; + + const handleCheckTelegramToken = async () => { + if (!onTelegramBotStatus) { + setTelegramStatusState({ + status: "done", + result: { + ok: false, + username: null, + id: null, + error: "Telegram bot status is not available on this platform.", + }, + }); + return; + } + + setTelegramStatusState({ status: "running", result: null }); + try { + const draftToken = telegramTokenDraft.trim(); + const currentToken = (appSettings.telegramBotToken ?? "").trim(); + if (draftToken !== currentToken) { + await updateTelegramSettings({ + telegramBotToken: draftToken.length ? draftToken : null, + }); + } + const res = await onTelegramBotStatus(); + setTelegramStatusState({ status: "done", result: res }); + } catch (err) { + setTelegramStatusState({ + status: "done", + result: { + ok: false, + username: null, + id: null, + error: String(err), + }, + }); + } + }; + return (
@@ -221,6 +315,14 @@ export function SettingsView({ Codex +
{activeSection === "projects" && ( @@ -538,6 +640,140 @@ export function SettingsView({ )} + {activeSection === "integrations" && ( +
+
Integrations
+
+ Optional notifications and remote control. +
+ +
Telegram
+
+ Use a Telegram bot to receive status updates and send prompts. +
+ +
+
+
Enable Telegram
+
+ Requires a bot token and linking via the /link code. +
+
+ +
+ +
+ +
+ setTelegramTokenDraft(event.target.value)} + /> + +
+
+ + {telegramStatusState.result && ( +
+
+ {telegramStatusState.result.ok + ? `Bot OK${telegramStatusState.result.username ? ` (@${telegramStatusState.result.username})` : ""}` + : "Bot check failed"} +
+ {telegramStatusState.result.error && ( +
+ {telegramStatusState.result.error} +
+ )} +
+ )} + + {appSettings.telegramEnabled && + telegramPairingCode && + telegramStatusState.result?.ok && ( +
+ +
Send this message to your bot:
+
+ {`/link ${telegramPairingCode}`} +
+
Then use /status in Telegram.
+
+ )} + +
+
+
Send app status
+
+ Posts β€œstarted/stopped” messages to the notifications chat. +
+
+ +
+ +
+
+
Send completed messages
+
+ Sends a notification when an agent finishes a reply. +
+
+ +
+
+ )}
diff --git a/src/features/settings/hooks/useAppSettings.ts b/src/features/settings/hooks/useAppSettings.ts index 029bdbede..14fe796df 100644 --- a/src/features/settings/hooks/useAppSettings.ts +++ b/src/features/settings/hooks/useAppSettings.ts @@ -5,6 +5,13 @@ import { clampUiScale, UI_SCALE_DEFAULT } from "../../../utils/uiScale"; const defaultSettings: AppSettings = { codexBin: null, + telegramEnabled: false, + telegramBotToken: null, + telegramAllowedUserIds: [], + telegramDefaultChatId: null, + telegramSendAppStatus: false, + telegramSendCompletedMessages: false, + telegramPairingSecret: "", defaultAccessMode: "current", uiScale: UI_SCALE_DEFAULT, notificationSoundsEnabled: true, diff --git a/src/features/threads/hooks/useThreads.ts b/src/features/threads/hooks/useThreads.ts index efee04a58..819c39149 100644 --- a/src/features/threads/hooks/useThreads.ts +++ b/src/features/threads/hooks/useThreads.ts @@ -27,6 +27,7 @@ import { import { useAppServerEvents } from "../../app/hooks/useAppServerEvents"; import { buildConversationItem, + buildConversationItemFromThreadItem, buildItemsFromThread, getThreadTimestamp, isReviewingFromThread, @@ -464,7 +465,8 @@ export function useThreads({ dispatch({ type: "markReviewing", threadId, isReviewing: false }); markProcessing(threadId, false); } - const converted = buildConversationItem(item); + const converted = + buildConversationItemFromThreadItem(item) ?? buildConversationItem(item); if (converted) { dispatch({ type: "upsertItem", threadId, item: converted }); } diff --git a/src/services/tauri.ts b/src/services/tauri.ts index 48eb8377c..c23b3a706 100644 --- a/src/services/tauri.ts +++ b/src/services/tauri.ts @@ -3,6 +3,7 @@ import { open } from "@tauri-apps/plugin-dialog"; import type { AppSettings, CodexDoctorResult, + TelegramBotStatus, WorkspaceInfo, WorkspaceSettings, } from "../types"; @@ -203,6 +204,10 @@ export async function runCodexDoctor( return invoke("codex_doctor", { codexBin }); } +export async function telegramBotStatus(): Promise { + return invoke("telegram_bot_status"); +} + export async function getWorkspaceFiles(workspaceId: string) { return invoke("list_workspace_files", { workspaceId }); } diff --git a/src/types.ts b/src/types.ts index a4195b8f0..77956974b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -63,6 +63,13 @@ export type AccessMode = "read-only" | "current" | "full-access"; export type AppSettings = { codexBin: string | null; + telegramEnabled: boolean; + telegramBotToken: string | null; + telegramAllowedUserIds: number[]; + telegramDefaultChatId: number | null; + telegramSendAppStatus: boolean; + telegramSendCompletedMessages: boolean; + telegramPairingSecret: string; defaultAccessMode: AccessMode; uiScale: number; notificationSoundsEnabled: boolean; @@ -80,6 +87,13 @@ export type CodexDoctorResult = { nodeDetails: string | null; }; +export type TelegramBotStatus = { + ok: boolean; + username: string | null; + id: number | null; + error: string | null; +}; + export type ApprovalRequest = { workspace_id: string; request_id: number; From 464eabfcbdc2a8097e4ee152f0c88a75571a9905 Mon Sep 17 00:00:00 2001 From: Peter vogel Date: Fri, 16 Jan 2026 02:08:26 +0100 Subject: [PATCH 2/2] chore(telegram): update Cargo.lock --- src-tauri/Cargo.lock | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 7e43f59ad..88603fe3b 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -474,10 +474,13 @@ version = "0.1.0" dependencies = [ "fix-path-env", "git2", + "hex", "ignore", "portable-pty", + "reqwest", "serde", "serde_json", + "sha2", "tauri", "tauri-build", "tauri-plugin-dialog", @@ -4545,9 +4548,21 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "socket2", + "tokio-macros", "windows-sys 0.61.2", ] +[[package]] +name = "tokio-macros" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "tokio-rustls" version = "0.26.4"