Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions e2e/cagent_api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package e2e_test

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/docker/cagent/cmd/root"
"github.com/docker/cagent/pkg/server"
)

type Session struct {
Title string `json:"title"`
}

func TestCagentAPI_ListSessions(t *testing.T) {
type testcase struct {
db string
expectedCount int
}

for _, tc := range []testcase{
{"one-session.db", 1},
{"two-sessions.db", 2},
{"transfer-task.db", 1},
{"session.db", 1},
{"session-not-found.db", 17},
{"desktop.db", 2},
} {
t.Run(tc.db, func(t *testing.T) {
dbPath, err := filepath.Abs("testdata/db/" + tc.db)
require.NoError(t, err)

socketPath := startCagentAPI(t, dbPath)
client := &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", socketPath)
},
},
}

resp, err := client.Get("http://localhost/api/sessions")
require.NoError(t, err)
defer resp.Body.Close()

var sessions []Session
err = json.NewDecoder(resp.Body).Decode(&sessions)
require.NoError(t, err)

assert.Len(t, sessions, tc.expectedCount)
})
}
}

func startCagentAPI(t *testing.T, db string) string {
t.Helper()

tmpDir := t.TempDir()
t.Chdir(tmpDir)

copyFile(t, "session.db", db)
if _, err := os.Stat(db + "-wal"); err == nil {
copyFile(t, "session.db-wal", db+"-wal")
}

ln, err := server.Listen(t.Context(), "unix://cagent.sock")
require.NoError(t, err)
t.Cleanup(func() {
_ = ln.Close()
})

file, err := ln.(*net.UnixListener).File()
require.NoError(t, err)

go func() {
var stdout, stderr bytes.Buffer
_ = root.Execute(t.Context(), nil, &stdout, &stderr, "api", "-s", "session.db", "--listen", fmt.Sprintf("fd://%d", file.Fd()), "default")
}()

return "cagent.sock"
}

func copyFile(t *testing.T, dst, src string) {
t.Helper()

srcFile, err := os.Open(src)
require.NoError(t, err)
defer srcFile.Close()

dstFile, err := os.Create(dst)
require.NoError(t, err)
defer dstFile.Close()

_, err = io.Copy(dstFile, srcFile)
require.NoError(t, err)
}
Binary file added e2e/testdata/db/desktop.db
Binary file not shown.
Binary file added e2e/testdata/db/desktop.db-wal
Binary file not shown.
Binary file added e2e/testdata/db/one-session.db
Binary file not shown.
Binary file added e2e/testdata/db/one-session.db-wal
Binary file not shown.
Binary file added e2e/testdata/db/session-not-found.db
Binary file not shown.
Binary file added e2e/testdata/db/session.db
Binary file not shown.
Binary file added e2e/testdata/db/session.db-wal
Binary file not shown.
Binary file added e2e/testdata/db/transfer-task.db
Binary file not shown.
Binary file added e2e/testdata/db/transfer-task.db-wal
Binary file not shown.
Binary file added e2e/testdata/db/two-sessions.db
Binary file not shown.
Binary file added e2e/testdata/db/two-sessions.db-wal
Binary file not shown.
9 changes: 9 additions & 0 deletions pkg/server/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"os"
"path/filepath"
"strconv"
"strings"
)

Expand All @@ -17,6 +18,14 @@ func Listen(ctx context.Context, addr string) (net.Listener, error) {
return listenNamedPipe(path)
}

if fdStr, ok := strings.CutPrefix(addr, "fd://"); ok {
fd, err := strconv.Atoi(fdStr)
if err != nil {
return nil, err
}
return net.FileListener(os.NewFile(uintptr(fd), ""))
}

return listenTCP(ctx, addr)
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/session/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,9 +659,20 @@ func (s *SQLiteSessionStore) loadSessionItemsWith(ctx context.Context, q querier
})

case "subsession":
// Skip if subsession_id is NULL (can happen if the sub-session was deleted
// and the foreign key set the reference to NULL)
if !row.subsessionID.Valid || row.subsessionID.String == "" {
slog.Warn("Skipping subsession item with NULL reference", "session_id", sessionID, "position", row.position)
continue
}
// Recursively load sub-session
subSession, err := s.loadSessionWith(ctx, q, row.subsessionID.String)
if err != nil {
if errors.Is(err, ErrNotFound) {
// Sub-session was deleted but item reference remains (orphaned reference)
slog.Warn("Skipping orphaned subsession reference", "session_id", sessionID, "subsession_id", row.subsessionID.String)
continue
}
return nil, fmt.Errorf("getting sub-session %s: %w", row.subsessionID.String, err)
}
items = append(items, Item{SubSession: subSession})
Expand Down Expand Up @@ -707,6 +718,13 @@ func (s *SQLiteSessionStore) loadMessagesFromLegacyColumn(ctx context.Context, s
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
// Handle case where messages column doesn't exist (very old or corrupted database)
// This can happen if the database was created before the messages column was added
// or if migrations failed partially
if sqliteutil.IsNoSuchColumnError(err) {
slog.Warn("messages column not found in sessions table, returning empty messages", "session_id", sessionID)
return nil, nil
}
return nil, err
}

Expand Down
59 changes: 59 additions & 0 deletions pkg/session/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,65 @@ func TestForwardCompatibility_SummaryPopulated(t *testing.T) {
assert.Equal(t, "This is a summary of the conversation.", items[1].Summary)
}

// TestOrphanedSubsessionReference verifies that loading sessions gracefully
// handles orphaned subsession references (where the subsession was deleted
// but the reference in session_items remains).
func TestOrphanedSubsessionReference(t *testing.T) {
tempDB := filepath.Join(t.TempDir(), "test_orphaned.db")

store, err := NewSQLiteSessionStore(tempDB)
require.NoError(t, err)
defer store.(*SQLiteSessionStore).Close()

sqliteStore := store.(*SQLiteSessionStore)

// Create parent session
parentSession := &Session{
ID: "parent-session",
CreatedAt: time.Now(),
}
err = store.AddSession(t.Context(), parentSession)
require.NoError(t, err)

// Add a message to parent
_, err = store.AddMessage(t.Context(), "parent-session", UserMessage("Start task"))
require.NoError(t, err)

// Create and add a sub-session
subSession := &Session{
ID: "sub-session-to-delete",
CreatedAt: time.Now(),
Messages: []Item{
NewMessageItem(UserMessage("Sub task")),
},
}
err = store.AddSubSession(t.Context(), "parent-session", subSession)
require.NoError(t, err)

// Verify session loads correctly before deletion
retrieved, err := store.GetSession(t.Context(), "parent-session")
require.NoError(t, err)
assert.Len(t, retrieved.Messages, 2) // user message + subsession

// Now delete the sub-session directly from the database
// This simulates a scenario where the sub-session is deleted but the
// reference in session_items remains (the FK sets it to NULL)
_, err = sqliteStore.db.ExecContext(t.Context(), "DELETE FROM sessions WHERE id = ?", "sub-session-to-delete")
require.NoError(t, err)

// Loading the parent session should gracefully skip the orphaned reference
retrieved, err = store.GetSession(t.Context(), "parent-session")
require.NoError(t, err)
assert.Len(t, retrieved.Messages, 1) // Only the user message remains
assert.Equal(t, "Start task", retrieved.Messages[0].Message.Message.Content)

// GetSessions should also work without error
sessions, err := store.GetSessions(t.Context())
require.NoError(t, err)
assert.Len(t, sessions, 1)
assert.Equal(t, "parent-session", sessions[0].ID)
}

// TestMigration_ExistingMessagesToSessionItems verifies that the migration
// properly converts legacy messages JSON to session_items table.
func TestMigration_ExistingMessagesToSessionItems(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/sqliteutil/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ func IsCantOpenError(err error) bool {
return false
}

// IsNoSuchColumnError checks if the error is due to a missing column in SQLite.
// This typically happens when querying a column that doesn't exist in the schema.
func IsNoSuchColumnError(err error) bool {
var sqliteErr *sqlite.Error
if errors.As(err, &sqliteErr) {
// SQLITE_ERROR (1) is the generic SQL error code used for "no such column"
return sqliteErr.Code() == sqlite3.SQLITE_ERROR
}
return false
}

// DiagnoseDBOpenError provides a more helpful error message when SQLite
// fails to open/create a database file.
func DiagnoseDBOpenError(path string, originalErr error) error {
Expand Down