Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/ade-cli/src/services/sync/syncHostService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2846,8 +2846,8 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
try {
let appliedCount = 0;
if (changes.length > 0) {
args.db.sync.applyChanges(changes);
appliedCount = changes.length;
const applyResult = args.db.sync.applyChanges(changes);
appliedCount = applyResult.appliedCount;
peer.lastAppliedAt = nowIso();
lastBroadcastAt = nowIso();
args.onStateChanged?.();
Expand Down
6 changes: 4 additions & 2 deletions apps/ade-cli/src/services/sync/syncPeerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,15 @@ export function createSyncPeerService(args: SyncPeerServiceArgs) {
const payload = (envelope.payload ?? {}) as SyncChangesetBatchPayload;
const changes = Array.isArray(payload.changes) ? payload.changes : [];
try {
let appliedCount = 0;
if (changes.length) {
args.db.sync.applyChanges(changes);
const applyResult = args.db.sync.applyChanges(changes);
appliedCount = applyResult.appliedCount;
args.onRemoteChangesApplied?.();
}
latestRemoteDbVersion = Math.max(latestRemoteDbVersion, Math.floor(payload.toDbVersion ?? latestRemoteDbVersion));
if (connectionDraft) connectionDraft.lastRemoteDbVersion = latestRemoteDbVersion;
sendChangesetAck(payload, true, args.db.sync.getDbVersion(), changes.length);
sendChangesetAck(payload, true, args.db.sync.getDbVersion(), appliedCount);
emitStatus();
} catch (error) {
sendChangesetAck(payload, false, args.db.sync.getDbVersion(), 0, error);
Expand Down
107 changes: 107 additions & 0 deletions apps/ade-cli/src/services/sync/syncService.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import fs from "node:fs";
import net from "node:net";
import os from "node:os";
import path from "node:path";
import { afterEach, describe, expect, it, vi } from "vitest";
import { openKvDb, type AdeDb } from "../../../../desktop/src/main/services/state/kvDb";
import { createSyncService, type SyncService } from "./syncService";

function createLogger() {
return {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
}

function makeTempRoot(prefix: string): string {
return fs.mkdtempSync(path.join(os.tmpdir(), prefix));
}

async function getUnusedPort(): Promise<number> {
const server = net.createServer();
await new Promise<void>((resolve) => server.listen(0, "127.0.0.1", resolve));
const address = server.address();
const port = typeof address === "object" && address ? address.port : 0;
await new Promise<void>((resolve, reject) => {
server.close((error) => (error ? reject(error) : resolve()));
});
return port;
}

function createService(db: AdeDb, projectRoot: string): SyncService {
return createSyncService({
db,
logger: createLogger() as any,
projectRoot,
hostStartupEnabled: false,
localDeviceIdPath: path.join(projectRoot, ".ade", "secrets", "sync-device-id"),
phonePairingStateDir: path.join(projectRoot, ".ade", "secrets", "sync"),
fileService: {} as any,
laneService: {
list: vi.fn(async () => []),
} as any,
prService: {} as any,
sessionService: {
list: vi.fn(() => []),
get: vi.fn(() => null),
readTranscriptTail: vi.fn(async () => ""),
} as any,
ptyService: {
readTranscriptTail: vi.fn(async () => ""),
enrichSessions: vi.fn((rows: unknown[]) => rows),
} as any,
computerUseArtifactBrokerService: {
listArtifacts: vi.fn(() => []),
} as any,
agentChatService: {
listSessions: vi.fn(async () => []),
} as any,
processService: {
listRuntime: vi.fn(() => []),
} as any,
});
}

describe("createSyncService", () => {
const cleanupRoots: string[] = [];

afterEach(() => {
for (const root of cleanupRoots.splice(0)) {
fs.rmSync(root, { recursive: true, force: true });
}
});

it("keeps the local device registry when connectToBrain fails before handshake", async () => {
const projectRoot = makeTempRoot("ade-sync-service-connect-fail-");
cleanupRoots.push(projectRoot);
const db = await openKvDb(path.join(projectRoot, ".ade", "kv.sqlite"), createLogger() as any);
(db.sync as { isAvailable?: () => boolean }).isAvailable = () => true;
const service = createService(db, projectRoot);
const registry = service.getDeviceRegistryService();
registry.upsertPeerMetadata({
deviceId: "peer-old",
deviceName: "Previous host",
platform: "macOS",
deviceType: "desktop",
siteId: "peer-site",
dbVersion: 12,
});

expect(registry.getDevice("peer-old")?.name).toBe("Previous host");

try {
await expect(service.connectToBrain({
host: "127.0.0.1",
port: await getUnusedPort(),
token: "bad-token",
})).rejects.toThrow();

expect(registry.getDevice("peer-old")?.name).toBe("Previous host");
} finally {
await service.dispose();
db.close();
}
});
});
2 changes: 1 addition & 1 deletion apps/ade-cli/src/services/sync/syncService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1018,11 +1018,11 @@ export function createSyncService(args: SyncServiceArgs) {
throw new Error("Machine sync is unavailable because the CRDT database extension is not loaded.");
}
await stopHostIfRunning();
deviceRegistryService.clearClusterRegistryForViewerJoin();
writeSavedDraft(draft);
syncPeerService.setSavedDraft(draft);
try {
await syncPeerService.connect(draft);
deviceRegistryService.clearClusterRegistryForViewerJoin();
deviceRegistryService.touchLocalDevice({ lastSeenAt: nowIso() });
syncPeerService.flushLocalChanges();
await sleep(150);
Expand Down
20 changes: 20 additions & 0 deletions apps/desktop/src/main/services/state/kvDb.migrations.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { createRequire } from "node:module";
import { describe, expect, it } from "vitest";
import { createLaneWorktreeLockService } from "../lanes/laneWorktreeLockService";
import { openKvDb } from "./kvDb";
import { isCrsqliteAvailable } from "./crsqliteExtension";

const require = createRequire(import.meta.url);

Expand Down Expand Up @@ -156,6 +157,25 @@ describe("kvDb migrations - legacy upgrade paths", () => {
}
});

it.skipIf(!isCrsqliteAvailable())("skips primary-key retrofit for tables that already have __crsql_clock companions", async () => {
const dbPath = makeDbPath("ade-kvdb-pk-retrofit-skip-crr-");
const first = await openKvDb(dbPath, createLogger());
first.run("create unique index if not exists temp_ade_pk_retrofit_probe on lanes(project_id, name)");
first.close();

const reopened = await openKvDb(dbPath, createLogger());
try {
expect(
reopened.get<{ name: string }>(
"select name from sqlite_master where type = 'table' and name = 'lanes__crsql_clock' limit 1",
)?.name,
).toBe("lanes__crsql_clock");
reopened.run("drop index if exists temp_ade_pk_retrofit_probe");
} finally {
reopened.close();
}
});

it("coalesces duplicate lane_linear_issue_links rows during migrate", async () => {
Comment on lines 157 to 179
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Test may not exercise the actual PK-retrofit path for lanes

The probe index (temp_ade_pk_retrofit_probe) is dropped during the table-rebuild in the old (unfixed) code path, so its presence after reopen could serve as a witness. However, retrofitLegacyPrimaryKeyNotNullSchema only attempts a rebuild when a PK column in the stored CREATE TABLE SQL is missing the NOT NULL keyword. A fresh openKvDb produces a fully-constrained lanes schema, so the retrofit branch is never entered for lanes at all — the assertion that lanes__crsql_clock still exists would pass trivially even without the if (rawHasTable(db, ...)) continue; guard. To actually cover the regression, the test should inject a legacy schema for lanes (strip NOT NULL from the stored DDL, or use a fixture DB created with the old migration) so the migration genuinely reaches the guarded skip.

Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/desktop/src/main/services/state/kvDb.migrations.test.ts
Line: 157-179

Comment:
**Test may not exercise the actual PK-retrofit path for `lanes`**

The probe index (`temp_ade_pk_retrofit_probe`) is dropped during the table-rebuild in the old (unfixed) code path, so its presence after reopen could serve as a witness. However, `retrofitLegacyPrimaryKeyNotNullSchema` only attempts a rebuild when a PK column in the stored `CREATE TABLE` SQL is missing the `NOT NULL` keyword. A fresh `openKvDb` produces a fully-constrained `lanes` schema, so the retrofit branch is never entered for `lanes` at all — the assertion that `lanes__crsql_clock` still exists would pass trivially even without the `if (rawHasTable(db, ...)) continue;` guard. To actually cover the regression, the test should inject a legacy schema for `lanes` (strip `NOT NULL` from the stored DDL, or use a fixture DB created with the old migration) so the migration genuinely reaches the guarded skip.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Cursor Fix in Codex Fix in Claude Code

const dbPath = makeDbPath("ade-kvdb-linear-links-dedupe-");
fs.mkdirSync(path.dirname(dbPath), { recursive: true });
Expand Down
20 changes: 9 additions & 11 deletions apps/desktop/src/main/services/state/kvDb.sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,28 +278,28 @@ describe.skipIf(!isCrsqliteAvailable())("kvDb sync foundation", () => {

it("ignores CRDT changes for legacy unified_memories tables removed in #329", async () => {
const db2 = await openKvDb(makeDbPath("ade-kvdb-sync-mem-skip-"), createLogger() as any);
const legacyChange = {
table: "unified_memories",
pk: Buffer.from([0x01, 0x06, 0, 0, 0, 0, 0, 1]).toString("base64"),
const legacyChanges = ["unified_memories", "unified_memories_fts_content"].map((table, index) => ({
table,
pk: Buffer.from([0x01, 0x06, 0, 0, 0, 0, 0, index + 1]).toString("base64"),
cid: "id",
val: null,
col_version: 1,
db_version: 1,
db_version: index + 1,
site_id: "a".repeat(32),
cl: 1,
seq: 1,
};
seq: index,
}));

const beforeVersion = db2.sync.getDbVersion();
const result = db2.sync.applyChanges([legacyChange as any]);
const result = db2.sync.applyChanges(legacyChanges as any);
expect(result.appliedCount).toBe(0);
expect(result.touchedTables).toEqual([]);
expect(db2.sync.getDbVersion()).toBe(beforeVersion);

db2.close();
});

it("silently skips CRDT changes for unknown future tables", async () => {
it("rejects CRDT changes for unknown future tables", async () => {
const db2 = await openKvDb(makeDbPath("ade-kvdb-sync-future-table-"), createLogger() as any);
const futureChange = {
table: "missing_future_table",
Expand All @@ -314,9 +314,7 @@ describe.skipIf(!isCrsqliteAvailable())("kvDb sync foundation", () => {
};

const beforeVersion = db2.sync.getDbVersion();
const result = db2.sync.applyChanges([futureChange as any]);
expect(result.appliedCount).toBe(0);
expect(result.touchedTables).toEqual([]);
expect(() => db2.sync.applyChanges([futureChange as any])).toThrow(/unknown_sync_table:missing_future_table/);
expect(db2.sync.getDbVersion()).toBe(beforeVersion);

db2.close();
Expand Down
18 changes: 15 additions & 3 deletions apps/desktop/src/main/services/state/kvDb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ import type { ApplyRemoteChangesResult, CrsqlChangeRow, SyncScalar } from "../..

type DatabaseSyncConstructor = new (dbPath: string, options?: { allowExtension?: boolean }) => DatabaseSyncType;

/** CRDT tables removed from the schema; inbound tombstones are ignored. */
const SYNC_RETIRED_TABLES = new Set(["unified_memories", "unified_memories_fts"]);

function isRetiredIncomingSyncTable(tableName: string): boolean {
return SYNC_RETIRED_TABLES.has(tableName) || tableName.startsWith("unified_memories_");
}

// Anchor createRequire to a synthetic CJS file so builtin resolution follows the active runtime.
const require = createRequire(path.join(process.cwd(), "ade-runtime.cjs"));
const { DatabaseSync } = require("node:sqlite") as { DatabaseSync: DatabaseSyncConstructor };
Expand Down Expand Up @@ -208,6 +215,10 @@ function retrofitLegacyPrimaryKeyNotNullSchema(db: DatabaseSyncType): boolean {
runStatement(db, "pragma foreign_keys = off");
try {
for (const table of tables) {
// CRR tables must be altered via crsql_begin_alter/commit_alter or rebuilt
// with rebuildCrrTableWithBackfill — never DROP/rename wholesale.
if (rawHasTable(db, `${table.name}__crsql_clock`)) continue;

const tableInfo = allRows<{
name: string;
type: string;
Expand Down Expand Up @@ -3038,9 +3049,10 @@ export async function openKvDb(dbPath: string, logger: Logger): Promise<AdeDb> {
try {
for (const rawChange of changes) {
if (isLocalOnlyQueueWipeMarkerChange(rawChange)) continue;
// Skip changes for tables that no longer exist in the schema
// (e.g. unified_memories removed in #329).
if (!rawHasTable(db, rawChange.table)) continue;
if (!rawHasTable(db, rawChange.table)) {
if (isRetiredIncomingSyncTable(rawChange.table)) continue;
throw new Error(`unknown_sync_table:${rawChange.table}`);
}
const change = normalizeIncomingCrsqlChange(db, rawChange);
const result = runStatement(
db,
Expand Down
Loading