From 1611948a3966f347b044f66a76af5b9a5911afa3 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Sun, 23 Nov 2025 18:18:35 -0500 Subject: [PATCH 1/2] eng-1075 use a row lock, that ends naturally with transaction, instead of an advisory lock. --- .../20251123231626_use_for_update.sql | 74 +++++++++++++++++++ packages/database/supabase/schemas/sync.sql | 14 +++- 2 files changed, 84 insertions(+), 4 deletions(-) create mode 100644 packages/database/supabase/migrations/20251123231626_use_for_update.sql diff --git a/packages/database/supabase/migrations/20251123231626_use_for_update.sql b/packages/database/supabase/migrations/20251123231626_use_for_update.sql new file mode 100644 index 000000000..3b2039b59 --- /dev/null +++ b/packages/database/supabase/migrations/20251123231626_use_for_update.sql @@ -0,0 +1,74 @@ +CREATE OR REPLACE FUNCTION public.propose_sync_task(s_target bigint, s_function character varying, s_worker character varying, timeout interval, task_interval interval) +RETURNS timestamp with time zone +LANGUAGE plpgsql +SET search_path TO '' +AS $function$ +DECLARE s_id INTEGER; +DECLARE start_time TIMESTAMP WITH TIME ZONE := now(); +DECLARE t_status public.task_status; +DECLARE t_failure_count SMALLINT; +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; +DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE; +DECLARE result TIMESTAMP WITH TIME ZONE; +BEGIN + ASSERT timeout * 2 < task_interval; + ASSERT timeout >= '1s'::interval; + ASSERT task_interval >= '5s'::interval; + INSERT INTO public.sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at) + VALUES (s_target, s_function, 'active', s_worker, start_time, start_time+timeout) + ON CONFLICT (sync_target, sync_function) DO NOTHING + RETURNING id INTO s_id; + IF s_id IS NOT NULL THEN + -- totally new_row, I'm on the task + -- return last time it was run successfully + SELECT max(last_task_start) INTO result FROM public.sync_info + WHERE sync_target = s_target + AND sync_function = s_function + AND status = 'complete'; + RETURN result; + END IF; + -- now we know it pre-existed. Maybe already active. + SELECT id INTO STRICT s_id + FROM public.sync_info + WHERE sync_target = s_target AND sync_function = s_function + FOR UPDATE; + SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at + INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at + FROM public.sync_info + WHERE id = s_id; + + IF t_status = 'active' AND t_last_task_start >= coalesce(t_last_task_end, t_last_task_start) AND start_time > t_times_out_at THEN + t_status := 'timeout'; + t_failure_count := t_failure_count + 1; + END IF; + -- basic backoff + task_interval := task_interval * (1+t_failure_count); + IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN + -- we are ready to take on the task + SELECT max(last_task_start) INTO result FROM public.sync_info + WHERE sync_target = s_target + AND sync_function = s_function + AND status = 'complete'; + UPDATE public.sync_info + SET worker=s_worker, + status='active', + task_times_out_at = now() + timeout, + last_task_start = start_time, + failure_count=t_failure_count, + last_task_end = NULL + WHERE id=s_id; + ELSE + -- the task has been tried recently enough + IF t_status = 'timeout' THEN + UPDATE public.sync_info + SET status=t_status, failure_count=t_failure_count + WHERE id=s_id; + END IF; + result := coalesce(t_last_task_end, t_last_task_start) + task_interval; + END IF; + + RETURN result; +END; +$function$ +; diff --git a/packages/database/supabase/schemas/sync.sql b/packages/database/supabase/schemas/sync.sql index eee2d3b89..8457b5562 100644 --- a/packages/database/supabase/schemas/sync.sql +++ b/packages/database/supabase/schemas/sync.sql @@ -128,8 +128,10 @@ BEGIN RETURN result; END IF; -- now we know it pre-existed. Maybe already active. - SELECT id INTO STRICT s_id FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; - PERFORM pg_advisory_lock(s_id); + SELECT id INTO STRICT s_id + FROM public.sync_info + WHERE sync_target = s_target AND sync_function = s_function + FOR UPDATE; SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at FROM public.sync_info @@ -148,7 +150,12 @@ BEGIN AND sync_function = s_function AND status = 'complete'; UPDATE public.sync_info - SET worker=s_worker, status='active', task_times_out_at = now() + timeout, last_task_start = start_time, failure_count=t_failure_count + SET worker=s_worker, + status='active', + task_times_out_at = now() + timeout, + last_task_start = start_time, + failure_count=t_failure_count, + last_task_end = NULL WHERE id=s_id; ELSE -- the task has been tried recently enough @@ -160,7 +167,6 @@ BEGIN result := coalesce(t_last_task_end, t_last_task_start) + task_interval; END IF; - PERFORM pg_advisory_unlock(s_id); RETURN result; END; $$; From 7e267151e17e455266f6761be8e104ede507655e Mon Sep 17 00:00:00 2001 From: Marc-Antoine Parent Date: Sun, 23 Nov 2025 18:35:44 -0500 Subject: [PATCH 2/2] avoid stampede --- apps/roam/src/utils/syncDgNodesToSupabase.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index e155f25bd..6f2013839 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -423,7 +423,9 @@ export const createOrUpdateDiscourseEmbedding = async (showToast = false) => { if (doSync) { activeTimeout = setTimeout( createOrUpdateDiscourseEmbedding, // eslint-disable-line @typescript-eslint/no-misused-promises - Math.max(0, nextUpdateTime.valueOf() - Date.now()) + 100, + Math.max(0, nextUpdateTime.valueOf() - Date.now()) + + 100 + + Math.floor(Math.random() * 200), // avoid stampede ); } return;