diff --git a/aider/coders/__init__.py b/aider/coders/__init__.py index ebe4a47dd14..bbe3e1dd15f 100644 --- a/aider/coders/__init__.py +++ b/aider/coders/__init__.py @@ -3,6 +3,7 @@ from .ask_coder import AskCoder from .base_coder import Coder from .context_coder import ContextCoder +from .copypaste_coder import CopyPasteCoder from .editblock_coder import EditBlockCoder from .editblock_fenced_coder import EditBlockFencedCoder from .editor_diff_fenced_coder import EditorDiffFencedCoder @@ -33,4 +34,5 @@ EditorDiffFencedCoder, ContextCoder, AgentCoder, + CopyPasteCoder, ] diff --git a/aider/coders/base_coder.py b/aider/coders/base_coder.py index f4a311878be..0ce47c76afc 100755 --- a/aider/coders/base_coder.py +++ b/aider/coders/base_coder.py @@ -172,7 +172,7 @@ async def create( if from_coder: main_model = from_coder.main_model else: - main_model = models.Model(models.DEFAULT_MODEL_NAME) + main_model = models.Model(models.DEFAULT_MODEL_NAME, io=io) if edit_format == "code": edit_format = None @@ -229,6 +229,14 @@ async def create( kwargs = use_kwargs from_coder.ok_to_warm_cache = False + if getattr(main_model, "copy_paste_mode", False) and getattr( + main_model, "copy_paste_transport", "api" + ) == "clipboard": + res = coders.CopyPasteCoder(main_model, io, args=args, **kwargs) + await res.initialize_mcp_tools() + res.original_kwargs = dict(kwargs) + return res + for coder in coders.__all__: if hasattr(coder, "edit_format") and coder.edit_format == edit_format: res = coder(main_model, io, args=args, **kwargs) @@ -379,6 +387,9 @@ def __init__( self.io = io self.io.coder = weakref.ref(self) + self.manual_copy_paste = getattr(main_model, "copy_paste_transport", "api") == "clipboard" + self.copy_paste_mode = getattr(main_model, "copy_paste_mode", False) or auto_copy_context + self.shell_commands = [] self.partial_response_tool_calls = [] @@ -399,7 +410,7 @@ def __init__( self.main_model.reasoning_tag if self.main_model.reasoning_tag else REASONING_TAG ) - self.stream = stream and main_model.streaming + self.stream = stream and main_model.streaming and not self.manual_copy_paste if cache_prompts and self.main_model.cache_control: self.add_cache_headers = True @@ -581,6 +592,8 @@ def get_announcements(self): output += ", prompt cache" if main_model.info.get("supports_assistant_prefill"): output += ", infinite output" + if self.copy_paste_mode: + output += ", copy/paste mode" lines.append(output) diff --git a/aider/coders/copypaste_coder.py b/aider/coders/copypaste_coder.py new file mode 100644 index 00000000000..61b332ff81d --- /dev/null +++ b/aider/coders/copypaste_coder.py @@ -0,0 +1,221 @@ +import hashlib +import json +import math +import time +import uuid + +from aider.exceptions import LiteLLMExceptions +from aider.llm import litellm + +from .base_coder import Coder + + +class CopyPasteCoder(Coder): + """Coder implementation that performs clipboard-driven interactions. + + This coder swaps the transport mechanism (clipboard vs API) but must remain compatible with the + base ``Coder`` interface. In particular, many base methods assume ``self.gpt_prompts`` exists. + + We therefore mirror the prompt pack from the coder that matches the currently selected + ``edit_format``. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # Ensure CopyPasteCoder always has a prompt pack. + # We mirror prompts from the coder that matches the active edit format. + self._init_prompts_from_selected_edit_format() + + def _init_prompts_from_selected_edit_format(self): + """Initialize ``self.gpt_prompts`` based on the currently selected edit format. + + This prevents AttributeError crashes when base ``Coder`` code assumes ``self.gpt_prompts`` + exists (eg during message formatting, announcements, cancellation/cleanup paths, etc). + """ + # Determine the selected edit_format the same way Coder.create() does. + selected_edit_format = None + if getattr(self, "args", None) is not None and getattr(self.args, "edit_format", None): + selected_edit_format = self.args.edit_format + else: + selected_edit_format = getattr(self.main_model, "edit_format", None) + + # "code" is treated like None in Coder.create() + if selected_edit_format == "code": + selected_edit_format = None + + # If no edit format is selected, fall back to model default. + if selected_edit_format is None: + selected_edit_format = getattr(self.main_model, "edit_format", None) + + # Find the coder class that would have been selected for this edit_format. + try: + import aider.coders as coders + except Exception: + coders = None + + target_coder_class = None + if coders is not None: + for coder_cls in getattr(coders, "__all__", []): + if hasattr(coder_cls, "edit_format") and coder_cls.edit_format == selected_edit_format: + target_coder_class = coder_cls + break + + # Mirror prompt pack + edit_format where available. + if target_coder_class is not None and hasattr(target_coder_class, "gpt_prompts"): + self.gpt_prompts = target_coder_class.gpt_prompts + # Keep announcements/formatting consistent with the selected coder. + self.edit_format = getattr(target_coder_class, "edit_format", self.edit_format) + return + + # Last-resort fallback: avoid crashing if we can't determine the prompts. + # Prefer keeping any existing gpt_prompts (if one was set elsewhere). + if not hasattr(self, "gpt_prompts"): + self.gpt_prompts = None + + async def send(self, messages, model=None, functions=None, tools=None): + model = model or self.main_model + + if getattr(model, "copy_paste_transport", "api") == "api": + async for chunk in super().send(messages, model=model, functions=functions, tools=tools): + yield chunk + return + + if functions: + self.io.tool_warning("copy/paste mode ignores function call requests.") + if tools: + self.io.tool_warning("copy/paste mode ignores tool call requests.") + + self.io.reset_streaming_response() + + # Base Coder methods (eg show_send_output/preprocess_response) expect these streaming + # attributes to always exist, even when we bypass the normal API streaming path. + self.partial_response_content = "" + self.partial_response_function_call = None + # preprocess_response() does len(self.partial_response_tool_calls), so it must not be None. + self.partial_response_tool_calls = [] + + try: + hash_object, completion = self.copy_paste_completion(messages, model) + self.chat_completion_call_hashes.append(hash_object.hexdigest()) + self.show_send_output(completion) + self.calculate_and_show_tokens_and_cost(messages, completion) + finally: + self.preprocess_response() + + if self.partial_response_content: + self.io.ai_output(self.partial_response_content) + + def copy_paste_completion(self, messages, model): + try: + from aider.helpers import copypaste + except ImportError: # pragma: no cover - import error path + self.io.tool_error("copy/paste mode requires the pyperclip package.") + self.io.tool_output("Install it with: pip install pyperclip") + raise + + def content_to_text(content): + """Extract text from the various content formats Aider/LLMs can produce.""" + if not content: + return "" + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for part in content: + if isinstance(part, dict): + text = part.get("text") + if isinstance(text, str): + parts.append(text) + elif isinstance(part, str): + parts.append(part) + return "".join(parts) + if isinstance(content, dict): + text = content.get("text") + if isinstance(text, str): + return text + return "" + return str(content) + + lines = [] + for message in messages: + text_content = content_to_text(message.get("content")) + if not text_content: + continue + role = message.get("role") + if role: + lines.append(f"{role.upper()}:\n{text_content}") + else: + lines.append(text_content) + + prompt_text = "\n\n".join(lines).strip() + + try: + copypaste.copy_to_clipboard(prompt_text) + except copypaste.ClipboardError as err: # pragma: no cover - clipboard error path + self.io.tool_error(f"Unable to copy prompt to clipboard: {err}") + raise + + self.io.tool_output("Request copied to clipboard.") + self.io.tool_output("Paste it into your LLM interface, then copy the reply back.") + self.io.tool_output("Waiting for clipboard updates (Ctrl+C to cancel)...") + + try: + last_value = copypaste.read_clipboard() + except copypaste.ClipboardError as err: # pragma: no cover - clipboard error path + self.io.tool_error(f"Unable to read clipboard: {err}") + raise + + try: + response_text = copypaste.wait_for_clipboard_change(initial=last_value) + except copypaste.ClipboardError as err: # pragma: no cover - clipboard error path + self.io.tool_error(f"Unable to read clipboard: {err}") + raise + + # Estimate tokens locally using the model's tokenizer; fallback to heuristic. + def _safe_token_count(text): + """Return token count via the model tokenizer, falling back to a heuristic.""" + if not text: + return 0 + try: + count = model.token_count(text) + if isinstance(count, int) and count >= 0: + return count + except Exception as ex: + # Try to map known LiteLLM exceptions to user-friendly messages, then fall back. + try: + ex_info = LiteLLMExceptions().get_ex_info(ex) + if ex_info and ex_info.description: + self.io.tool_warning( + f"Token count failed: {ex_info.description} Falling back to heuristic." + ) + except Exception: + # Avoid masking the original issue during error mapping. + pass + return int(math.ceil(len(text) / 4)) + + prompt_tokens = _safe_token_count(prompt_text) + completion_tokens = _safe_token_count(response_text) + total_tokens = prompt_tokens + completion_tokens + + completion = litellm.ModelResponse( + id=f"chatcmpl-{uuid.uuid4()}", + choices=[ + litellm.Choices( + index=0, + finish_reason="stop", + message=litellm.Message(role="assistant", content=response_text), + ) + ], + created=int(time.time()), + model=model.name, + usage={ + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": total_tokens, + }, + ) + + kwargs = dict(model=model.name, messages=messages, stream=False) + hash_object = hashlib.sha1(json.dumps(kwargs, sort_keys=True).encode()) # nosec B324 + return hash_object, completion diff --git a/aider/commands.py b/aider/commands.py index 843c0691574..2d7f104918c 100644 --- a/aider/commands.py +++ b/aider/commands.py @@ -100,6 +100,7 @@ async def cmd_model(self, args): model_name, editor_model=self.coder.main_model.editor_model.name, weak_model=self.coder.main_model.weak_model.name, + io=self.io, ) await models.sanity_check_models(self.io, model) @@ -172,6 +173,7 @@ async def cmd_weak_model(self, args): self.coder.main_model.name, editor_model=self.coder.main_model.editor_model.name, weak_model=model_name, + io=self.io, ) await models.sanity_check_models(self.io, model) raise SwitchCoder(main_model=model) diff --git a/aider/copypaste.py b/aider/copypaste.py deleted file mode 100644 index c8dfbe378d0..00000000000 --- a/aider/copypaste.py +++ /dev/null @@ -1,72 +0,0 @@ -import threading -import time - -import pyperclip - - -class ClipboardWatcher: - """Watches clipboard for changes and updates IO placeholder""" - - def __init__(self, io, verbose=False): - self.io = io - self.verbose = verbose - self.stop_event = None - self.watcher_thread = None - self.last_clipboard = None - self.io.clipboard_watcher = self - - def start(self): - """Start watching clipboard for changes""" - self.stop_event = threading.Event() - self.last_clipboard = pyperclip.paste() - - def watch_clipboard(): - while not self.stop_event.is_set(): - try: - current = pyperclip.paste() - if current != self.last_clipboard: - self.last_clipboard = current - self.io.interrupt_input() - self.io.placeholder = current - if len(current.splitlines()) > 1: - self.io.placeholder = "\n" + self.io.placeholder + "\n" - - time.sleep(0.5) - except Exception as e: - if self.verbose: - from aider.dump import dump - - dump(f"Clipboard watcher error: {e}") - continue - - self.watcher_thread = threading.Thread(target=watch_clipboard, daemon=True) - self.watcher_thread.start() - - def stop(self): - """Stop watching clipboard for changes""" - if self.stop_event: - self.stop_event.set() - if self.watcher_thread: - self.watcher_thread.join() - self.watcher_thread = None - self.stop_event = None - - -def main(): - """Example usage of the clipboard watcher""" - from aider.io import InputOutput - - io = InputOutput() - watcher = ClipboardWatcher(io, verbose=True) - - try: - watcher.start() - while True: - time.sleep(1) - except KeyboardInterrupt: - print("\nStopped watching clipboard") - watcher.stop() - - -if __name__ == "__main__": - main() diff --git a/aider/helpers/copypaste.py b/aider/helpers/copypaste.py new file mode 100644 index 00000000000..6f241f313ec --- /dev/null +++ b/aider/helpers/copypaste.py @@ -0,0 +1,123 @@ +import threading +import time + +import pyperclip + + +class ClipboardError(Exception): + """Raised when clipboard operations fail.""" + + +class ClipboardStopped(Exception): + """Raised when clipboard monitoring stops before a change occurs.""" + + +def copy_to_clipboard(text): + """Copy text to the system clipboard.""" + try: + pyperclip.copy(text) + except Exception as err: # pragma: no cover - system clipboard errors + raise ClipboardError(err) from err + + +def read_clipboard(): + """Read text from the system clipboard.""" + try: + return pyperclip.paste() + except Exception as err: # pragma: no cover - system clipboard errors + raise ClipboardError(err) from err + + +def wait_for_clipboard_change(initial=None, poll_interval=0.5, stop_event=None): + """Block until the clipboard value changes and return the new contents.""" + last_value = initial + if last_value is None: + last_value = read_clipboard() + + while True: + current = read_clipboard() + if current != last_value: + return current + + if stop_event: + if stop_event.wait(poll_interval): + raise ClipboardStopped() + else: + time.sleep(poll_interval) + + +class ClipboardWatcher: + """Watches clipboard for changes and updates IO placeholder.""" + + def __init__(self, io, verbose=False): + self.io = io + self.verbose = verbose + self.stop_event = None + self.watcher_thread = None + self.last_clipboard = None + self.io.clipboard_watcher = self + + def start(self): + """Start watching clipboard for changes.""" + self.stop_event = threading.Event() + self.last_clipboard = read_clipboard() + + def watch_clipboard(): + while not self.stop_event.is_set(): + try: + current = wait_for_clipboard_change( + initial=self.last_clipboard, + stop_event=self.stop_event, + ) + except ClipboardStopped: + break + except ClipboardError as err: + if self.verbose: + from aider.dump import dump + + dump(f"Clipboard watcher error: {err}") + continue + except Exception as err: # pragma: no cover - unexpected errors + if self.verbose: + from aider.dump import dump + + dump(f"Clipboard watcher unexpected error: {err}") + continue + + self.last_clipboard = current + self.io.interrupt_input() + self.io.placeholder = current + if len(current.splitlines()) > 1: + self.io.placeholder = "\n" + self.io.placeholder + "\n" + + self.watcher_thread = threading.Thread(target=watch_clipboard, daemon=True) + self.watcher_thread.start() + + def stop(self): + """Stop watching clipboard for changes.""" + if self.stop_event: + self.stop_event.set() + if self.watcher_thread: + self.watcher_thread.join() + self.watcher_thread = None + self.stop_event = None + + +def main(): + """Example usage of the clipboard watcher.""" + from aider.io import InputOutput + + io = InputOutput() + watcher = ClipboardWatcher(io, verbose=True) + + try: + watcher.start() + while True: + time.sleep(1) + except KeyboardInterrupt: + print("\nStopped watching clipboard") + watcher.stop() + + +if __name__ == "__main__": + main() diff --git a/aider/main.py b/aider/main.py index 3df3ec7bdd9..658def04474 100644 --- a/aider/main.py +++ b/aider/main.py @@ -41,7 +41,7 @@ from aider.coders import Coder from aider.coders.base_coder import UnknownEditFormat from aider.commands import Commands, SwitchCoder -from aider.copypaste import ClipboardWatcher +from aider.helpers.copypaste import ClipboardWatcher from aider.deprecated import handle_deprecated_model_args from aider.format_settings import format_settings, scrub_sensitive_info from aider.helpers.file_searcher import generate_search_path_list @@ -990,6 +990,26 @@ def apply_model_overrides(model_name): base_model, cfg = entry return base_model, cfg.copy() + prefix = "" + base_model = model_name + if model_name.startswith(models.COPY_PASTE_PREFIX): + prefix = models.COPY_PASTE_PREFIX + base_model = model_name[len(prefix) :] + + if ":" in base_model: + base_model, suffix = base_model.rsplit(":", 1) + else: + suffix = None + + override_kwargs = {} + if suffix and base_model in overrides and suffix in overrides[base_model]: + override_kwargs = overrides[base_model][suffix].copy() + + if prefix: + base_model = prefix + base_model + + return base_model, override_kwargs + # Apply overrides (if any) to the selected models main_model_name, main_model_overrides = apply_model_overrides(args.model) weak_model_name, weak_model_overrides = apply_model_overrides(args.weak_model) @@ -1002,6 +1022,7 @@ def apply_model_overrides(model_name): weak_model_name, weak_model=False, verbose=args.verbose, + io=io, override_kwargs=weak_model_overrides, ) @@ -1012,6 +1033,7 @@ def apply_model_overrides(model_name): editor_model_name, editor_model=False, verbose=args.verbose, + io=io, override_kwargs=editor_model_overrides, ) @@ -1052,9 +1074,13 @@ def apply_model_overrides(model_name): editor_model=editor_model_obj, editor_edit_format=args.editor_edit_format, verbose=args.verbose, + io=io, override_kwargs=main_model_overrides, ) + if args.copy_paste and main_model.copy_paste_transport == "api": + main_model.enable_copy_paste_mode() + # Check if deprecated remove_reasoning is set if main_model.remove_reasoning is not None: io.tool_warning( diff --git a/aider/models.py b/aider/models.py index a3bc3024539..3e4c1514447 100644 --- a/aider/models.py +++ b/aider/models.py @@ -24,6 +24,7 @@ from aider.utils import check_pip_install_extra RETRY_TIMEOUT = 60 +COPY_PASTE_PREFIX = "cp:" request_timeout = 600 @@ -316,15 +317,32 @@ def __init__( weak_model=None, editor_model=None, editor_edit_format=None, - verbose=False, + verbose=False, io=None, override_kwargs=None, ): - # Map any alias to its canonical name + # Determine copy/paste mode and map model aliases + provided_model = model or "" + if isinstance(provided_model, Model): + provided_model = provided_model.name + elif not isinstance(provided_model, str): + provided_model = str(provided_model) + + self.io = io + self.verbose = verbose + self.override_kwargs = override_kwargs or {} + + self.copy_paste_mode = False + self.copy_paste_transport = "api" + + if provided_model.startswith(COPY_PASTE_PREFIX): + model = provided_model.removeprefix(COPY_PASTE_PREFIX) + self.enable_copy_paste_mode(transport="clipboard") + else: + model = provided_model + model = MODEL_ALIASES.get(model, model) self.name = model - self.verbose = verbose - self.override_kwargs = override_kwargs or {} self.max_chat_history_tokens = 1024 self.weak_model = None @@ -355,6 +373,9 @@ def __init__( else: self.get_editor_model(editor_model, editor_edit_format) + if self.copy_paste_transport == "clipboard": + self.streaming = False + def get_model_info(self, model): return model_info_manager.get_model_info(model) @@ -590,6 +611,10 @@ def apply_generic_model_settings(self, model): def __str__(self): return self.name + def enable_copy_paste_mode(self, *, transport="api"): + self.copy_paste_mode = True + self.copy_paste_transport = transport + def get_weak_model(self, provided_weak_model): # If provided_weak_model is False, set weak_model to self if provided_weak_model is False: @@ -597,6 +622,11 @@ def get_weak_model(self, provided_weak_model): self.weak_model_name = None return + if self.copy_paste_transport == "clipboard": + self.weak_model = self + self.weak_model_name = None + return + # If provided_weak_model is already a Model object, use it directly if isinstance(provided_weak_model, Model): self.weak_model = provided_weak_model @@ -618,6 +648,7 @@ def get_weak_model(self, provided_weak_model): self.weak_model = Model( self.weak_model_name, weak_model=False, + io=self.io, ) return self.weak_model @@ -625,6 +656,11 @@ def commit_message_models(self): return [self.weak_model, self] def get_editor_model(self, provided_editor_model, editor_edit_format): + if self.copy_paste_transport == "clipboard": + provided_editor_model = False + self.editor_model_name = self.name + self.editor_model = self + # If provided_editor_model is already a Model object, use it directly if isinstance(provided_editor_model, Model): self.editor_model = provided_editor_model @@ -643,6 +679,7 @@ def get_editor_model(self, provided_editor_model, editor_edit_format): self.editor_model = Model( self.editor_model_name, editor_model=False, + io=self.io, ) if not self.editor_edit_format: @@ -1201,6 +1238,9 @@ async def sanity_check_models(io, main_model): async def sanity_check_model(io, model): + if getattr(model, "copy_paste_transport", "api") == "clipboard": + return False + show = False if model.missing_keys: diff --git a/aider/website/_data/blame.yml b/aider/website/_data/blame.yml index 2d302504e6e..78a1f5a9db2 100644 --- a/aider/website/_data/blame.yml +++ b/aider/website/_data/blame.yml @@ -3383,7 +3383,7 @@ aider/commands.py: Paul Gauthier: 28 Paul Gauthier (aider): 21 - aider/copypaste.py: + aider/helpers/copypaste.py: Paul Gauthier: 5 Paul Gauthier (aider): 60 aider/exceptions.py: diff --git a/tests/basic/test_main.py b/tests/basic/test_main.py index 20e30845f29..7ed6564e5c3 100644 --- a/tests/basic/test_main.py +++ b/tests/basic/test_main.py @@ -11,7 +11,7 @@ from prompt_toolkit.input import DummyInput from prompt_toolkit.output import DummyOutput -from aider.coders import Coder +from aider.coders import Coder, CopyPasteCoder from aider.dump import dump # noqa: F401 from aider.io import InputOutput from aider.main import check_gitignore, load_dotenv_files, main, setup_git @@ -89,6 +89,45 @@ async def test_main_with_subdir_repo_fnames(self, _): self.assertTrue((subdir / "foo.txt").exists()) self.assertTrue((subdir / "bar.txt").exists()) + async def test_main_copy_paste_model_overrides(self): + overrides = json.dumps({"gpt-4o": {"fast": {"temperature": 0.42}}}) + coder = await main( + [ + "--no-git", + "--exit", + "--yes", + "--model", + "cp:gpt-4o:fast", + "--model-overrides", + overrides, + ], + input=DummyInput(), + output=DummyOutput(), + return_coder=True, + ) + + self.assertIsInstance(coder, CopyPasteCoder) + self.assertTrue(coder.main_model.copy_paste_mode) + self.assertEqual(coder.main_model.copy_paste_transport, "clipboard") + self.assertEqual(coder.main_model.override_kwargs, {"temperature": 0.42}) + + @patch("aider.main.ClipboardWatcher") + async def test_main_copy_paste_flag_sets_mode(self, mock_watcher): + mock_watcher.return_value = MagicMock() + + coder = await main( + ["--no-git", "--exit", "--yes", "--copy-paste"], + input=DummyInput(), + output=DummyOutput(), + return_coder=True, + ) + + self.assertNotIsInstance(coder, CopyPasteCoder) + self.assertTrue(coder.main_model.copy_paste_mode) + self.assertEqual(coder.main_model.copy_paste_transport, "api") + self.assertTrue(coder.copy_paste_mode) + self.assertFalse(coder.manual_copy_paste) + async def test_main_with_git_config_yml(self): make_repo() diff --git a/tests/coders/test_copypaste_coder.py b/tests/coders/test_copypaste_coder.py new file mode 100644 index 00000000000..9804e0c1300 --- /dev/null +++ b/tests/coders/test_copypaste_coder.py @@ -0,0 +1,169 @@ +import hashlib +import json +from types import SimpleNamespace +from unittest.mock import MagicMock, call, patch + +import pytest + +from aider.coders.copypaste_coder import CopyPasteCoder +from aider.coders.editblock_coder import EditBlockCoder + + +def test_init_prompts_uses_selected_edit_format(): + coder = CopyPasteCoder.__new__(CopyPasteCoder) + coder.args = SimpleNamespace(edit_format="diff") + coder.main_model = SimpleNamespace(edit_format=None) + coder.edit_format = None + coder.gpt_prompts = None + + coder._init_prompts_from_selected_edit_format() + + assert coder.gpt_prompts is EditBlockCoder.gpt_prompts + assert coder.edit_format == EditBlockCoder.edit_format + + +def test_init_prompts_preserves_existing_when_no_match(monkeypatch): + coder = CopyPasteCoder.__new__(CopyPasteCoder) + coder.args = SimpleNamespace(edit_format="custom-format") + coder.main_model = SimpleNamespace(edit_format=None) + coder.edit_format = "original-format" + coder.gpt_prompts = "original-prompts" + + import aider.coders as coders + + monkeypatch.setattr(coders, "__all__", [], raising=False) + + coder._init_prompts_from_selected_edit_format() + + assert coder.gpt_prompts == "original-prompts" + assert coder.edit_format == "original-format" + + +@pytest.mark.asyncio +async def test_send_uses_copy_paste_flow(monkeypatch): + coder = CopyPasteCoder.__new__(CopyPasteCoder) + + io = MagicMock() + coder.io = io + coder.stream = False + coder.partial_response_content = "" + coder.partial_response_tool_calls = [] + coder.partial_response_function_call = None + coder.chat_completion_call_hashes = [] + coder.show_send_output = MagicMock() + coder.calculate_and_show_tokens_and_cost = MagicMock() + + def fake_preprocess_response(): + coder.partial_response_content = "final-response" + + coder.preprocess_response = fake_preprocess_response + + class ModelStub: + copy_paste_mode = True + copy_paste_transport = "clipboard" + name = "cp:gpt-4o" + + @staticmethod + def token_count(text): + return len(text) + + coder.main_model = ModelStub() + + hash_obj = MagicMock() + hash_obj.hexdigest.return_value = "hash" + completion = MagicMock() + + with patch.object( + CopyPasteCoder, "copy_paste_completion", return_value=(hash_obj, completion) + ) as mock_completion: + messages = [{"role": "user", "content": "Hello"}] + chunks = [chunk async for chunk in coder.send(messages)] + + assert chunks == [] + mock_completion.assert_called_once_with(messages, coder.main_model) + coder.show_send_output.assert_called_once_with(completion) + coder.calculate_and_show_tokens_and_cost.assert_called_once_with(messages, completion) + assert coder.chat_completion_call_hashes == ["hash"] + coder.io.ai_output.assert_called_once_with("final-response") + + +def test_copy_paste_completion_interacts_with_clipboard(monkeypatch): + coder = CopyPasteCoder.__new__(CopyPasteCoder) + + io = MagicMock() + coder.io = io + + import aider.helpers.copypaste as copypaste + + copy_mock = MagicMock() + read_mock = MagicMock(return_value="initial value") + wait_mock = MagicMock(return_value="assistant reply") + + monkeypatch.setattr(copypaste, "copy_to_clipboard", copy_mock) + monkeypatch.setattr(copypaste, "read_clipboard", read_mock) + monkeypatch.setattr(copypaste, "wait_for_clipboard_change", wait_mock) + + class DummyMessage: + def __init__(self, **kwargs): + self.data = kwargs + + class DummyChoices: + def __init__(self, **kwargs): + self.data = kwargs + + class DummyModelResponse: + def __init__(self, **kwargs): + self.kwargs = kwargs + + monkeypatch.setattr("aider.coders.copypaste_coder.litellm.Message", DummyMessage) + monkeypatch.setattr("aider.coders.copypaste_coder.litellm.Choices", DummyChoices) + monkeypatch.setattr( + "aider.coders.copypaste_coder.litellm.ModelResponse", DummyModelResponse + ) + + class ModelStub: + name = "cp:gpt-4o" + copy_paste_mode = True + copy_paste_transport = "clipboard" + + @staticmethod + def token_count(text): + return len(text) + + model = ModelStub() + + messages = [ + {"role": "system", "content": "keep calm"}, + {"role": "user", "content": [{"text": "Hello"}, {"text": "!"}]}, + {"role": "assistant", "content": [{"text": "Prior"}, {"text": " reply"}]}, + ] + + hash_obj, completion = coder.copy_paste_completion(messages, model) + + expected_prompt = "SYSTEM:\nkeep calm\n\nUSER:\nHello!\n\nASSISTANT:\nPrior reply" + copy_mock.assert_called_once_with(expected_prompt) + read_mock.assert_called_once() + wait_mock.assert_called_once_with(initial="initial value") + + io.tool_output.assert_has_calls( + [ + call("Request copied to clipboard."), + call("Paste it into your LLM interface, then copy the reply back."), + call("Waiting for clipboard updates (Ctrl+C to cancel)..."), + ] + ) + + expected_hash = hashlib.sha1( + json.dumps({"model": model.name, "messages": messages, "stream": False}, sort_keys=True).encode() + ).hexdigest() + assert hash_obj.hexdigest() == expected_hash + + usage = completion.kwargs["usage"] + assert usage["prompt_tokens"] == len(expected_prompt) + assert usage["completion_tokens"] == len("assistant reply") + assert usage["total_tokens"] == len(expected_prompt) + len("assistant reply") + + choices = completion.kwargs["choices"] + assert len(choices) == 1 + choice_payload = choices[0].data + assert choice_payload["message"].data["content"] == "assistant reply"