From 6fb89d222b6111a01a3b5a837d14ca51f7cbd828 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Thu, 23 Oct 2025 20:42:02 -0400 Subject: [PATCH 1/7] ENG-298 isolate the repetition logic from the refactoring --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 163 ++++++++++++++----- 1 file changed, 121 insertions(+), 42 deletions(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index d8635bbc8..438c77273 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -20,21 +20,21 @@ import { fetchEmbeddingsForNodes } from "./upsertNodesAsContentWithEmbeddings"; import { convertRoamNodeToLocalContent } from "./upsertNodesAsContentWithEmbeddings"; import { getRoamUrl } from "roamjs-components/dom"; import { render as renderToast } from "roamjs-components/components/Toast"; -import type { DGSupabaseClient } from "@repo/database/lib/client"; -import type { Json, CompositeTypes } from "@repo/database/dbTypes"; +import { createClient, type DGSupabaseClient } from "@repo/database/lib/client"; +import type { Json, CompositeTypes, Enums } from "@repo/database/dbTypes"; type LocalContentDataInput = Partial>; type AccountLocalInput = CompositeTypes<"account_local_input">; -const { createClient } = require("@repo/database/lib/client"); const SYNC_FUNCTION = "embedding"; const SYNC_INTERVAL = "45s"; const SYNC_TIMEOUT = "20s"; const BATCH_SIZE = 200; -const DEFAULT_TIME = "1970-01-01"; +const DEFAULT_TIME = new Date("1970-01-01"); type SyncTaskInfo = { - lastUpdateTime: string | null; + lastUpdateTime?: Date; + nextUpdateTime?: Date; spaceId: number; worker: string; shouldProceed: boolean; @@ -42,7 +42,8 @@ type SyncTaskInfo = { export const endSyncTask = async ( worker: string, - status: "complete" | "failed", + status: Enums<"task_status">, + showToast: boolean = false, ): Promise => { try { const supabaseClient = await getLoggedInClient(); @@ -60,13 +61,15 @@ export const endSyncTask = async ( }); if (error) { console.error("endSyncTask: Error calling end_sync_task:", error); - renderToast({ - id: "discourse-embedding-error", - content: "Failed to complete discourse node embeddings sync", - intent: "danger", - timeout: 5000, - }); - } else { + if (showToast) + renderToast({ + id: "discourse-embedding-error", + content: "Failed to complete discourse node embeddings sync", + intent: "danger", + timeout: 5000, + }); + return; + } else if (showToast) { if (status === "complete") { renderToast({ id: "discourse-embedding-complete", @@ -85,12 +88,13 @@ export const endSyncTask = async ( } } catch (error) { console.error("endSyncTask: Error calling end_sync_task:", error); - renderToast({ - id: "discourse-embedding-error", - content: "Failed to complete discourse node embeddings sync", - intent: "danger", - timeout: 5000, - }); + if (showToast) + renderToast({ + id: "discourse-embedding-error", + content: "Failed to complete discourse node embeddings sync", + intent: "danger", + timeout: 5000, + }); } }; @@ -101,7 +105,6 @@ export const proposeSyncTask = async (): Promise => { if (!context || !supabaseClient) { console.error("proposeSyncTask: Unable to obtain Supabase context."); return { - lastUpdateTime: null, spaceId: 0, worker: "", shouldProceed: false, @@ -111,7 +114,6 @@ export const proposeSyncTask = async (): Promise => { if (!worker) { console.error("proposeSyncTask: Unable to obtain user UID."); return { - lastUpdateTime: null, spaceId: 0, worker: "", shouldProceed: false, @@ -132,7 +134,7 @@ export const proposeSyncTask = async (): Promise => { console.error( `proposeSyncTask: propose_sync_task failed – ${error.message}`, ); - return { lastUpdateTime: null, spaceId, worker, shouldProceed: false }; + return { spaceId, worker, shouldProceed: false }; } if (typeof data === "string") { @@ -140,20 +142,29 @@ export const proposeSyncTask = async (): Promise => { const now = new Date(); if (timestamp > now) { - return { lastUpdateTime: null, spaceId, worker, shouldProceed: false }; + return { + nextUpdateTime: timestamp, + spaceId, + worker, + shouldProceed: false, + }; } else { - return { lastUpdateTime: data, spaceId, worker, shouldProceed: true }; + return { + lastUpdateTime: timestamp, + spaceId, + worker, + shouldProceed: true, + }; } } - return { lastUpdateTime: null, spaceId, worker, shouldProceed: true }; + return { spaceId, worker, shouldProceed: true }; } catch (error) { console.error( `proposeSyncTask: Unexpected error while contacting sync-task API:`, error, ); return { - lastUpdateTime: null, spaceId: 0, worker: "", shouldProceed: false, @@ -188,7 +199,6 @@ const upsertNodeSchemaToContent = async ({ ] `; - //@ts-ignore - backend to be added to roamjs-components const result = (await window.roamAlphaAPI.data.async.q( query, nodeTypesUids, @@ -369,29 +379,73 @@ const upsertUsers = async ( } }; -export const createOrUpdateDiscourseEmbedding = async () => { - const { shouldProceed, lastUpdateTime, worker } = await proposeSyncTask(); - - if (!shouldProceed) { - return; +const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes +let doSync = true; +let numFailures = 0; +const MAX_FAILURES = 5; +type TimeoutValue = ReturnType; +let activeTimeout: TimeoutValue | null = null; +// TODO: Maybe also pause sync while the window is not active? + +class FatalError extends Error {} + +export const setSyncActivity = (active: boolean) => { + doSync = active; + if (!active && activeTimeout !== null) { + clearTimeout(activeTimeout); + activeTimeout = null; + } else if (active && activeTimeout === null) { + activeTimeout = setTimeout( + // eslint-disable-next-line @typescript-eslint/no-misused-promises + createOrUpdateDiscourseEmbedding, + 100, + ); } +}; + +export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { + if (!doSync) return; + console.debug("starting createOrUpdateDiscourseEmbedding"); + let success = true; + const { shouldProceed, lastUpdateTime, nextUpdateTime, worker } = + await proposeSyncTask(); try { + if (!shouldProceed) { + if (nextUpdateTime === undefined) { + throw new Error("Can't obtain sync task"); + } + console.debug("postponed to ", nextUpdateTime); + if (doSync) { + activeTimeout = setTimeout( + createOrUpdateDiscourseEmbedding, // eslint-disable-line @typescript-eslint/no-misused-promises + nextUpdateTime.valueOf() - Date.now() + 100, + ); + } + return; + } const allUsers = await getAllUsers(); - const time = lastUpdateTime === null ? DEFAULT_TIME : lastUpdateTime; + const time = (lastUpdateTime || DEFAULT_TIME).toISOString(); const { allDgNodeTypes, dgNodeTypesWithSettings } = getDgNodeTypes(); const allNodeInstances = await getAllDiscourseNodesSince( time, dgNodeTypesWithSettings, ); + if (!createClient()) { + // not worth retrying + // TODO: Differentiate setup vs connetion error + throw new FatalError("Could not access supabase."); + } const supabaseClient = await getLoggedInClient(); - if (!supabaseClient) return null; + if (!supabaseClient) { + // TODO: Distinguish connection vs credentials error + throw new Error("Could not log in to client."); + } const context = await getSupabaseContext(); if (!context) { - console.error("No Supabase context found."); - await endSyncTask(worker, "failed"); - return; + // not worth retrying: setup error + throw new FatalError("Error connecting to client."); } await upsertUsers(allUsers, supabaseClient, context); await upsertNodesToSupabaseAsContentWithEmbeddings( @@ -407,11 +461,34 @@ export const createOrUpdateDiscourseEmbedding = async () => { context, }); await cleanupOrphanedNodes(supabaseClient, context); - await endSyncTask(worker, "complete"); + await endSyncTask(worker, "complete", showToast); } catch (error) { console.error("createOrUpdateDiscourseEmbedding: Process failed:", error); - await endSyncTask(worker, "failed"); - throw error; + await endSyncTask(worker, "failed", showToast); + success = false; + if (error instanceof FatalError) { + doSync = false; + return; + } + } + let timeout = BASE_SYNC_INTERVAL; + if (success) { + numFailures = 0; + } else { + numFailures += 1; + if (numFailures >= MAX_FAILURES) { + doSync = false; + return; + } + timeout *= 2 ** numFailures; + } + if (activeTimeout != null) { + clearTimeout(activeTimeout); + activeTimeout = null; + } + if (doSync) { + // eslint-disable-next-line @typescript-eslint/no-misused-promises + activeTimeout = setTimeout(createOrUpdateDiscourseEmbedding, timeout); } }; @@ -424,8 +501,10 @@ export const initializeSupabaseSync = async () => { .eq("url", getRoamUrl()) .maybeSingle(); if (!result.data) { - return; + doSync = false; } else { - createOrUpdateDiscourseEmbedding(); + doSync = true; + // eslint-disable-next-line @typescript-eslint/no-misused-promises + activeTimeout = setTimeout(createOrUpdateDiscourseEmbedding, 100, true); } }; From 21aee0b50df6e34189aab340246f388b8a929a10 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Thu, 23 Oct 2025 20:47:49 -0400 Subject: [PATCH 2/7] stop sync un unload --- apps/roam/src/index.ts | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/roam/src/index.ts b/apps/roam/src/index.ts index 5c852e57d..84a2050e9 100644 --- a/apps/roam/src/index.ts +++ b/apps/roam/src/index.ts @@ -27,7 +27,10 @@ import { installDiscourseFloatingMenu, removeDiscourseFloatingMenu, } from "./components/DiscourseFloatingMenu"; -import { initializeSupabaseSync } from "./utils/syncDgNodesToSupabase"; +import { + initializeSupabaseSync, + setSyncActivity, +} from "./utils/syncDgNodesToSupabase"; import { initPluginTimer } from "./utils/pluginTimer"; const initPostHog = () => { @@ -137,7 +140,7 @@ export default runExtension(async (onloadArgs) => { }, listActiveQueries: () => listActiveQueries(extensionAPI), isDiscourseNode: isDiscourseNode, - // @ts-ignore - we are still using roamjs-components global definition + // @ts-expect-error - we are still using roamjs-components global definition getDiscourseNodes: getDiscourseNodes, }; @@ -152,8 +155,9 @@ export default runExtension(async (onloadArgs) => { ], observers: observers, unload: () => { + setSyncActivity(false); window.roamjs.extension?.smartblocks?.unregisterCommand("QUERYBUILDER"); - // @ts-ignore - tldraw throws a warning on multiple loads + // @ts-expect-error - tldraw throws a warning on multiple loads delete window[Symbol.for("__signia__")]; document.removeEventListener( "roamjs:query-builder:action", From e5fb5d0b810e31b85ffc1ae6f446e03a4b0d9e98 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Fri, 24 Oct 2025 07:06:09 -0400 Subject: [PATCH 3/7] Not finding the Space is not a reason not to sync; it will get created with getContext --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index 438c77273..14fc9878f 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -494,13 +494,7 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { export const initializeSupabaseSync = async () => { const supabase = createClient(); - if (supabase === null) return; - const result = await supabase - .from("Space") - .select() - .eq("url", getRoamUrl()) - .maybeSingle(); - if (!result.data) { + if (supabase === null) { doSync = false; } else { doSync = true; From 9b6f0969988170a47777e646d5d690284020c37a Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Fri, 24 Oct 2025 07:45:39 -0400 Subject: [PATCH 4/7] reorder task preconditions, avoid ending a task that failed to start --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 88 ++++++++------------ 1 file changed, 34 insertions(+), 54 deletions(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index 14fc9878f..87413799c 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -32,11 +32,11 @@ const SYNC_TIMEOUT = "20s"; const BATCH_SIZE = 200; const DEFAULT_TIME = new Date("1970-01-01"); +class FatalError extends Error {} + type SyncTaskInfo = { lastUpdateTime?: Date; nextUpdateTime?: Date; - spaceId: number; - worker: string; shouldProceed: boolean; }; @@ -98,28 +98,13 @@ export const endSyncTask = async ( } }; -export const proposeSyncTask = async (): Promise => { +export const proposeSyncTask = async ( + worker: string, + supabaseClient: DGSupabaseClient, + context: SupabaseContext, +): Promise => { try { - const supabaseClient = await getLoggedInClient(); - const context = supabaseClient ? await getSupabaseContext() : null; - if (!context || !supabaseClient) { - console.error("proposeSyncTask: Unable to obtain Supabase context."); - return { - spaceId: 0, - worker: "", - shouldProceed: false, - }; - } - const worker = window.roamAlphaAPI.user.uid(); - if (!worker) { - console.error("proposeSyncTask: Unable to obtain user UID."); - return { - spaceId: 0, - worker: "", - shouldProceed: false, - }; - } - + const now = new Date(); const { data, error } = await supabaseClient.rpc("propose_sync_task", { s_target: context.spaceId, s_function: SYNC_FUNCTION, @@ -128,45 +113,36 @@ export const proposeSyncTask = async (): Promise => { timeout: SYNC_TIMEOUT, }); - const { spaceId } = context; - if (error) { console.error( `proposeSyncTask: propose_sync_task failed – ${error.message}`, ); - return { spaceId, worker, shouldProceed: false }; + return { shouldProceed: false }; } if (typeof data === "string") { const timestamp = new Date(data); - const now = new Date(); if (timestamp > now) { return { nextUpdateTime: timestamp, - spaceId, - worker, shouldProceed: false, }; } else { return { lastUpdateTime: timestamp, - spaceId, - worker, shouldProceed: true, }; } } - return { spaceId, worker, shouldProceed: true }; + return { shouldProceed: true }; } catch (error) { console.error( `proposeSyncTask: Unexpected error while contacting sync-task API:`, error, ); return { - spaceId: 0, - worker: "", shouldProceed: false, }; } @@ -387,8 +363,6 @@ type TimeoutValue = ReturnType; let activeTimeout: TimeoutValue | null = null; // TODO: Maybe also pause sync while the window is not active? -class FatalError extends Error {} - export const setSyncActivity = (active: boolean) => { doSync = active; if (!active && activeTimeout !== null) { @@ -407,14 +381,35 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { if (!doSync) return; console.debug("starting createOrUpdateDiscourseEmbedding"); let success = true; - const { shouldProceed, lastUpdateTime, nextUpdateTime, worker } = - await proposeSyncTask(); + let claimed = false; + const worker = window.roamAlphaAPI.user.uid(); try { + if (!worker) { + throw new FatalError("Unable to obtain user UID."); + } + if (!createClient()) { + // not worth retrying + // TODO: Differentiate setup vs connetion error + throw new FatalError("Could not access supabase."); + } + const supabaseClient = await getLoggedInClient(); + if (!supabaseClient) { + // TODO: Distinguish connection vs credentials error + throw new Error("Could not log in to client."); + } + const context = await getSupabaseContext(); + if (!context) { + // not worth retrying: setup error + throw new FatalError("Error connecting to client."); + } + const { shouldProceed, lastUpdateTime, nextUpdateTime } = + await proposeSyncTask(worker, supabaseClient, context); if (!shouldProceed) { if (nextUpdateTime === undefined) { throw new Error("Can't obtain sync task"); } + claimed = true; console.debug("postponed to ", nextUpdateTime); if (doSync) { activeTimeout = setTimeout( @@ -432,21 +427,6 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { time, dgNodeTypesWithSettings, ); - if (!createClient()) { - // not worth retrying - // TODO: Differentiate setup vs connetion error - throw new FatalError("Could not access supabase."); - } - const supabaseClient = await getLoggedInClient(); - if (!supabaseClient) { - // TODO: Distinguish connection vs credentials error - throw new Error("Could not log in to client."); - } - const context = await getSupabaseContext(); - if (!context) { - // not worth retrying: setup error - throw new FatalError("Error connecting to client."); - } await upsertUsers(allUsers, supabaseClient, context); await upsertNodesToSupabaseAsContentWithEmbeddings( allNodeInstances, @@ -464,8 +444,8 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { await endSyncTask(worker, "complete", showToast); } catch (error) { console.error("createOrUpdateDiscourseEmbedding: Process failed:", error); - await endSyncTask(worker, "failed", showToast); success = false; + if (worker && claimed) await endSyncTask(worker, "failed", showToast); if (error instanceof FatalError) { doSync = false; return; From e6a44f0583eb44500c368633fc5d78faee184a28 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Fri, 24 Oct 2025 07:50:10 -0400 Subject: [PATCH 5/7] clarify different interval values --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index 87413799c..eeaa3cba8 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -27,7 +27,10 @@ type LocalContentDataInput = Partial>; type AccountLocalInput = CompositeTypes<"account_local_input">; const SYNC_FUNCTION = "embedding"; +// Minimal interval between syncs of all clients for this task. const SYNC_INTERVAL = "45s"; +// Interval between syncs for each client individually +const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes const SYNC_TIMEOUT = "20s"; const BATCH_SIZE = 200; const DEFAULT_TIME = new Date("1970-01-01"); @@ -355,7 +358,6 @@ const upsertUsers = async ( } }; -const BASE_SYNC_INTERVAL = 5 * 60 * 1000; // 5 minutes let doSync = true; let numFailures = 0; const MAX_FAILURES = 5; From c3eae7718de793be4dd6739e6bcdc2a10c9cdc6e Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Fri, 24 Oct 2025 09:15:49 -0400 Subject: [PATCH 6/7] timing suggestions --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index eeaa3cba8..d430a0a51 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -416,7 +416,7 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { if (doSync) { activeTimeout = setTimeout( createOrUpdateDiscourseEmbedding, // eslint-disable-line @typescript-eslint/no-misused-promises - nextUpdateTime.valueOf() - Date.now() + 100, + Math.max(0, nextUpdateTime.valueOf() - Date.now()) + 100, ); } return; @@ -462,7 +462,8 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { doSync = false; return; } - timeout *= 2 ** numFailures; + const jitter = 0.9 + Math.random() * 0.2; // 0.9x–1.1x + timeout *= 2 ** numFailures * jitter; } if (activeTimeout != null) { clearTimeout(activeTimeout); From cee1023e7843ea52e40bc728038fbcd566484a18 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Fri, 24 Oct 2025 10:09:06 -0400 Subject: [PATCH 7/7] coderabbit correction --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index d430a0a51..349130b90 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -411,7 +411,6 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { if (nextUpdateTime === undefined) { throw new Error("Can't obtain sync task"); } - claimed = true; console.debug("postponed to ", nextUpdateTime); if (doSync) { activeTimeout = setTimeout( @@ -421,6 +420,7 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { } return; } + claimed = true; const allUsers = await getAllUsers(); const time = (lastUpdateTime || DEFAULT_TIME).toISOString(); const { allDgNodeTypes, dgNodeTypesWithSettings } = getDgNodeTypes();