Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion lib/claude_code/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,17 @@ defmodule ClaudeCode.Adapter do
"""
@callback interrupt(adapter :: pid()) :: :ok | {:error, term()}

@optional_callbacks [send_control_request: 3, get_server_info: 1, interrupt: 1]
@doc """
Executes an arbitrary `{Module, function, args}` on the adapter's node.

Local adapters call `apply(m, f, a)` directly. Distributed adapters
dispatch via `:rpc.call/4`. This lets Session run filesystem operations
(History, Plugin, etc.) on the correct node without per-function wiring.
"""
@callback execute(adapter :: pid(), module(), atom(), [term()]) ::
term() | {:error, {:rpc_failed, term()}}

@optional_callbacks [send_control_request: 3, get_server_info: 1, interrupt: 1, execute: 4]

# ============================================================================
# Notification Helpers
Expand Down
3 changes: 3 additions & 0 deletions lib/claude_code/adapter/node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ defmodule ClaudeCode.Adapter.Node do
@impl ClaudeCode.Adapter
defdelegate get_server_info(adapter), to: ClaudeCode.Adapter.Port

@impl ClaudeCode.Adapter
defdelegate execute(adapter, m, f, a), to: ClaudeCode.Adapter.Port

# ---------------------------------------------------------------------------
# Private Helpers
# ---------------------------------------------------------------------------
Expand Down
9 changes: 9 additions & 0 deletions lib/claude_code/adapter/port.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ defmodule ClaudeCode.Adapter.Port do
GenServer.call(adapter, :interrupt)
end

@impl ClaudeCode.Adapter
def execute(adapter, m, f, a) do
GenServer.call(adapter, {:execute, m, f, a})
end

# ============================================================================
# Server Callbacks
# ============================================================================
Expand Down Expand Up @@ -238,6 +243,10 @@ defmodule ClaudeCode.Adapter.Port do
{:reply, :ok, %{state | control_counter: new_counter}}
end

def handle_call({:execute, m, f, a}, _from, state) do
{:reply, apply(m, f, a), state}
end

@impl GenServer
def handle_info({:cli_resolved, {:ok, {executable, args, streaming_opts}}}, state) do
case open_cli_port(executable, args, state, streaming_opts) do
Expand Down
3 changes: 3 additions & 0 deletions lib/claude_code/adapter/test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ defmodule ClaudeCode.Adapter.Test do
GenServer.stop(adapter, :normal)
end

@impl ClaudeCode.Adapter
def execute(_adapter, m, f, a), do: apply(m, f, a)

# ============================================================================
# Server Callbacks
# ============================================================================
Expand Down
79 changes: 62 additions & 17 deletions lib/claude_code/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -321,11 +321,14 @@ defmodule ClaudeCode.Session do
# ============================================================================

@doc """
Reads conversation messages using `parentUuid` chain building.
Reads conversation messages for the current session.

Accepts either a session ID string or a running session reference.
Returns visible user/assistant messages in chronological order,
properly handling branched and compacted conversations.
Routes through the session server so History reads execute on the
correct node (local for Port, remote for Node adapter). Returns
`{:ok, []}` if no session ID has been captured yet (no queries made).

For local-only access by session ID string, use
`ClaudeCode.History.get_messages/2` directly.

## Options

Expand All @@ -336,10 +339,6 @@ defmodule ClaudeCode.Session do

## Examples

# Get messages by session ID
{:ok, messages} = ClaudeCode.Session.get_messages("abc123-def456")

# From a running session
{:ok, session} = ClaudeCode.start_link()
ClaudeCode.Session.stream(session, "Hello!") |> Stream.run()
{:ok, messages} = ClaudeCode.Session.get_messages(session)
Expand All @@ -349,19 +348,65 @@ defmodule ClaudeCode.Session do

See `ClaudeCode.History.get_messages/2` for more details.
"""
@spec get_messages(session() | String.t(), keyword()) ::
@spec get_messages(session(), keyword()) ::
{:ok, [ClaudeCode.History.SessionMessage.t()]} | {:error, term()}
def get_messages(session_or_id, opts \\ [])
def get_messages(session, opts \\ []) do
GenServer.call(session, {:history_call, :get_messages, opts})
end

@doc """
Lists sessions with rich metadata from the adapter's node.

Automatically injects `:project_path` from the session's `:cwd` option
if not provided. When `:project_path` is set, returns sessions for that
project directory. When omitted, returns sessions across all projects.

For local-only access, use `ClaudeCode.History.list_sessions/1` directly.

## Options

- `:project_path` - Project directory to list sessions for (default: session cwd)
- `:limit` - Maximum number of sessions to return
- `:include_worktrees` - Scan git worktrees (default: `true`)
- `:claude_dir` - Override `~/.claude` (for testing)

## Examples

{:ok, sessions} = ClaudeCode.Session.list_sessions(session)
{:ok, recent} = ClaudeCode.Session.list_sessions(session, limit: 10)

def get_messages(session_id, opts) when is_binary(session_id) do
ClaudeCode.History.get_messages(session_id, opts)
See `ClaudeCode.History.list_sessions/1` for more details.
"""
@spec list_sessions(session(), keyword()) :: {:ok, [ClaudeCode.History.SessionInfo.t()]}
def list_sessions(session, opts \\ []) do
GenServer.call(session, {:history_list, opts})
end

def get_messages(session, opts) do
case session_id(session) do
nil -> {:error, :no_session_id}
sid -> ClaudeCode.History.get_messages(sid, opts)
end
# ============================================================================
# Remote Execution
# ============================================================================

@doc """
Executes an arbitrary function call on the adapter's node.

Runs `apply(module, function, args)` on whatever node the adapter lives on.
For local adapters (Port), this is equivalent to a direct `apply`. For
distributed adapters (Node), this dispatches via `:rpc.call`.

## Examples

# Read a file on the adapter's node
{:ok, contents} = ClaudeCode.Session.execute(session, File, :read, ["/workspace/config.json"])

# List directory on the adapter's node
{:ok, files} = ClaudeCode.Session.execute(session, File, :ls, ["/workspace"])

# Run a custom module function
result = ClaudeCode.Session.execute(session, MyApp.Sandbox, :cleanup, [workspace_id])
"""
@spec execute(session(), module(), atom(), [term()]) :: term()
def execute(session, module, function, args) do
GenServer.call(session, {:adapter_call, module, function, args})
end

# ============================================================================
Expand Down
42 changes: 42 additions & 0 deletions lib/claude_code/session/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,31 @@ defmodule ClaudeCode.Session.Server do
end
end

def handle_call({:adapter_call, m, f, a}, _from, state) do
result = adapter_execute(m, f, a, state)
{:reply, result, state}
end

def handle_call({:history_call, function, opts}, _from, state) do
result =
case state.session_id do
nil ->
{:ok, []}

sid ->
opts = inject_history_defaults(opts, state)
adapter_execute(ClaudeCode.History, function, [sid, opts], state)
end

{:reply, result, state}
end

def handle_call({:history_list, opts}, _from, state) do
opts = inject_history_defaults(opts, state)
result = adapter_execute(ClaudeCode.History, :list_sessions, [opts], state)
{:reply, result, state}
end

@impl true
def handle_cast({:stream_cleanup, request_ref}, state) do
new_requests = Map.delete(state.requests, request_ref)
Expand Down Expand Up @@ -470,4 +495,21 @@ defmodule ClaudeCode.Session.Server do
defp supports_control?(adapter_module) do
function_exported?(adapter_module, :send_control_request, 3)
end

# ============================================================================
# Private Functions - Adapter Execute
# ============================================================================

defp adapter_execute(m, f, a, state) do
if function_exported?(state.adapter_module, :execute, 4) do
state.adapter_module.execute(state.adapter_pid, m, f, a)
else
apply(m, f, a)
end
end

defp inject_history_defaults(opts, state) do
cwd = Keyword.get(state.session_options, :cwd)
if cwd, do: Keyword.put_new(opts, :project_path, cwd), else: opts
end
end
54 changes: 54 additions & 0 deletions test/claude_code/adapter/node_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,58 @@ defmodule ClaudeCode.Adapter.NodeTest do
NodeAdapter.stop(adapter_pid)
end
end

describe "execute/4" do
@describetag :distributed

setup do
args = Enum.flat_map(:code.get_path(), fn path -> [~c"-pa", to_charlist(path)] end)

{:ok, peer, node} =
:peer.start(%{
name: :"execute_test_peer_#{System.unique_integer([:positive])}",
args: args
})

on_exit(fn ->
try do
:peer.stop(peer)
catch
:exit, _ -> :ok
end
end)

{:ok, node: node, peer: peer}
end

test "runs MFA on remote node via GenServer.call", %{node: node} do
workspace = Path.join(System.tmp_dir!(), "execute_test_#{System.unique_integer([:positive])}")
on_exit(fn -> File.rm_rf!(workspace) end)

{:ok, adapter_pid} = ClaudeCode.Adapter.Node.start_link(self(), node: node, cwd: workspace)
assert_receive {:adapter_status, :provisioning}, 1000

# :erlang.node/0 returns the node of the calling process — via RPC this is the remote node
result = ClaudeCode.Adapter.Node.execute(adapter_pid, :erlang, :node, [])
assert result == node

ClaudeCode.Adapter.Node.stop(adapter_pid)
end

test "exits when remote node disconnects", %{node: node, peer: peer} do
workspace =
Path.join(System.tmp_dir!(), "execute_rpc_fail_#{System.unique_integer([:positive])}")

on_exit(fn -> File.rm_rf!(workspace) end)

{:ok, adapter_pid} = ClaudeCode.Adapter.Node.start_link(self(), node: node, cwd: workspace)
assert_receive {:adapter_status, :provisioning}, 1000

# Stop the peer to simulate node disconnect
:peer.stop(peer)
Process.sleep(100)

assert catch_exit(ClaudeCode.Adapter.Node.execute(adapter_pid, :erlang, :node, []))
end
end
end
40 changes: 34 additions & 6 deletions test/claude_code/adapter/port_test.exs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
defmodule ClaudeCode.Adapter.PortTest do
use ExUnit.Case, async: true

alias ClaudeCode.Adapter.Port

# ============================================================================
# shell_escape/1 Tests
# ============================================================================

alias ClaudeCode.Adapter.Port
alias ClaudeCode.Hook.PermissionDecision.Allow

describe "shell_escape/1" do
Expand Down Expand Up @@ -216,19 +215,19 @@ defmodule ClaudeCode.Adapter.PortTest do
callbacks = ClaudeCode.Adapter.behaviour_info(:callbacks)

Enum.each(callbacks, fn {fun, arity} ->
assert function_exported?(ClaudeCode.Adapter.Port, fun, arity),
assert function_exported?(Port, fun, arity),
"Missing callback: #{fun}/#{arity}"
end)
end
end

describe "control adapter callbacks" do
test "Adapter.Port exports send_control_request/3" do
assert function_exported?(ClaudeCode.Adapter.Port, :send_control_request, 3)
assert function_exported?(Port, :send_control_request, 3)
end

test "Adapter.Port exports get_server_info/1" do
assert function_exported?(ClaudeCode.Adapter.Port, :get_server_info, 1)
assert function_exported?(Port, :get_server_info, 1)
end
end

Expand Down Expand Up @@ -700,7 +699,7 @@ defmodule ClaudeCode.Adapter.PortTest do

describe "interrupt" do
test "Adapter.Port exports interrupt/1" do
assert function_exported?(ClaudeCode.Adapter.Port, :interrupt, 1)
assert function_exported?(Port, :interrupt, 1)
end

test "interrupt sends control message and returns :ok" do
Expand Down Expand Up @@ -1704,4 +1703,33 @@ defmodule ClaudeCode.Adapter.PortTest do
GenServer.stop(adapter)
end
end

describe "execute/4" do
test "runs MFA via GenServer call to adapter" do
{:ok, context} =
MockCLI.setup_with_script("""
#!/bin/bash
while IFS= read -r line; do
if echo "$line" | grep -q '"type":"control_request"'; then
REQ_ID=$(echo "$line" | grep -o '"request_id":"[^"]*"' | cut -d'"' -f4)
echo "{\\\"type\\\":\\\"control_response\\\",\\\"response\\\":{\\\"subtype\\\":\\\"success\\\",\\\"request_id\\\":\\\"$REQ_ID\\\",\\\"response\\\":{}}}"
fi
done
exit 0
""")

session = self()

{:ok, adapter} =
Port.start_link(session, api_key: "test-key", cli_path: context[:mock_script])

assert_receive {:adapter_status, :provisioning}, 1000
assert_receive {:adapter_status, :ready}, 5000

assert Port.execute(adapter, String, :upcase, ["hello"]) == "HELLO"
assert {:error, :enoent} = Port.execute(adapter, File, :read, ["/nonexistent/path"])

GenServer.stop(adapter)
end
end
end
7 changes: 7 additions & 0 deletions test/claude_code/adapter/test_adapter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,11 @@ defmodule ClaudeCode.Adapter.TestAdapterTest do
"Missing callback: #{fun}/#{arity}"
end)
end

describe "execute/4" do
test "runs MFA locally and returns the result" do
result = Test.execute(self(), String, :upcase, ["hello"])
assert result == "HELLO"
end
end
end
Loading
Loading