diff --git a/apps/roam/src/utils/syncDgNodesToSupabase.ts b/apps/roam/src/utils/syncDgNodesToSupabase.ts index a10e187a1..742f48498 100644 --- a/apps/roam/src/utils/syncDgNodesToSupabase.ts +++ b/apps/roam/src/utils/syncDgNodesToSupabase.ts @@ -258,13 +258,115 @@ export const convertDgToSupabaseConcepts = async ({ } }; +const chunk = (array: T[], size: number): T[][] => { + if (array.length === 0) return []; + if (size <= 2) throw new Error(`chunk size must be > 1 (got ${size})`); + const chunks: T[][] = []; + for (let i = 0; i < array.length; i += size) { + chunks.push(array.slice(i, i + size)); + } + return chunks; +}; + +const uploadNodesInBatches = async ({ + supabase, + context, + nodes, + content_as_document, +}: { + supabase: DGSupabaseClient; + context: SupabaseContext; + nodes: LocalContentDataInput[]; + content_as_document: boolean; +}): Promise => { + const v_space_id = context.spaceId; + const v_creator_id = context.userId; + const batches = chunk(nodes, BATCH_SIZE); + let successes = 0; + + for (let idx = 0; idx < batches.length; idx++) { + const batch = batches[idx]; + + const { error } = await supabase.rpc("upsert_content", { + data: batch as Json, + v_space_id, + v_creator_id, + content_as_document, + }); + + if (error) { + console.error(`upsert_content failed for batch ${idx + 1}:`, error); + break; + } + successes += batch.length; + } + return successes; +}; + +export const addMissingEmbeddings = async ( + supabase: DGSupabaseClient, + context: SupabaseContext, +) => { + const response = await supabase + .from("my_contents") + .select( + "id, text, emb:ContentEmbedding_openai_text_embedding_3_small_1536(target_id)", + ) + .eq("space_id", context.spaceId) + .is("emb", null) + .not("text", "is", null); + if (response.error) { + console.error(response.error); + return 0; + } + // Tell TS about the non-null values + const data = response.data as (Omit< + (typeof response.data)[number], + "text" | "id" + > & { + text: string; + id: number; + })[]; + let successes = 0; + const batches = chunk(data, BATCH_SIZE); + for (let idx = 0; idx < batches.length; idx++) { + const batch = batches[idx]; + try { + const nodesWithEmbeddings = await fetchEmbeddingsForNodes(batch); + const embeddings = nodesWithEmbeddings.map( + ({ id, embedding_inline: { model, vector } }) => ({ + target_id: id, + model, + vector: JSON.stringify(vector), + }), + ); + const result = await supabase + .from("ContentEmbedding_openai_text_embedding_3_small_1536") + .upsert(embeddings, { onConflict: "target_id" }) + .select(); + if (result.error) { + console.error(result.error); + break; + } + successes += batch.length; + } catch (e) { + console.error(e); + break; + } + } + if (successes < data.length) + console.warn( + `Tried sending content embeddings, ${successes}/${data.length} sent`, + ); + else console.log(`Done sending content embeddings`); + return successes; +}; + export const upsertNodesToSupabaseAsContentWithEmbeddings = async ( roamNodes: RoamDiscourseNodeData[], supabaseClient: DGSupabaseClient, context: SupabaseContext, ): Promise => { - const { userId } = context; - if (roamNodes.length === 0) { return; } @@ -272,53 +374,21 @@ export const upsertNodesToSupabaseAsContentWithEmbeddings = async ( nodes: roamNodes, }); - let nodesWithEmbeddings: LocalContentDataInput[]; - try { - nodesWithEmbeddings = await fetchEmbeddingsForNodes( - allNodeInstancesAsLocalContent, + const successes = await uploadNodesInBatches({ + supabase: supabaseClient, + context, + nodes: allNodeInstancesAsLocalContent, + content_as_document: true, + }); + if (successes < allNodeInstancesAsLocalContent.length) + console.warn( + `Tried sending content, ${successes}/${allNodeInstancesAsLocalContent.length} sent`, ); - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - console.error( - `upsertNodesToSupabaseAsContentWithEmbeddings: Embedding service failed – ${message}`, + else + console.log( + `Done sending ${allNodeInstancesAsLocalContent.length} contents`, ); - return; - } - - if (nodesWithEmbeddings.length !== allNodeInstancesAsLocalContent.length) { - console.error( - "upsertNodesToSupabaseAsContentWithEmbeddings: Mismatch between node and embedding counts.", - ); - return; - } - - const chunk = (array: T[], size: number): T[][] => { - const chunks: T[][] = []; - for (let i = 0; i < array.length; i += size) { - chunks.push(array.slice(i, i + size)); - } - return chunks; - }; - - const uploadBatches = async (batches: LocalContentDataInput[][]) => { - for (let idx = 0; idx < batches.length; idx++) { - const batch = batches[idx]; - - const { error } = await supabaseClient.rpc("upsert_content", { - data: batch as Json, - v_space_id: context.spaceId, - v_creator_id: userId, - content_as_document: true, - }); - - if (error) { - console.error(`upsert_content failed for batch ${idx + 1}:`, error); - throw error; - } - } - }; - - await uploadBatches(chunk(nodesWithEmbeddings, BATCH_SIZE)); + await addMissingEmbeddings(supabaseClient, context); }; const getDgNodeTypes = () => { diff --git a/apps/roam/src/utils/upsertNodesAsContentWithEmbeddings.ts b/apps/roam/src/utils/upsertNodesAsContentWithEmbeddings.ts index 829e792c2..2b48457d7 100644 --- a/apps/roam/src/utils/upsertNodesAsContentWithEmbeddings.ts +++ b/apps/roam/src/utils/upsertNodesAsContentWithEmbeddings.ts @@ -3,7 +3,7 @@ import { type RoamDiscourseNodeData } from "./getAllDiscourseNodesSince"; import { type SupabaseContext } from "./supabaseContext"; import { nextApiRoot } from "@repo/utils/execContext"; import type { DGSupabaseClient } from "@repo/database/lib/client"; -import type { Json, CompositeTypes } from "@repo/database/dbTypes"; +import type { Json, CompositeTypes, Enums } from "@repo/database/dbTypes"; type LocalContentDataInput = Partial>; @@ -38,9 +38,13 @@ export const convertRoamNodeToLocalContent = ({ }); }; -export const fetchEmbeddingsForNodes = async ( - nodes: LocalContentDataInput[], -): Promise => { +export const fetchEmbeddingsForNodes = async ( + nodes: T[], +): Promise< + (T & { + embedding_inline: { model: Enums<"EmbeddingName">; vector: number[] }; + })[] +> => { const allEmbeddings: number[][] = []; const allNodesTexts = nodes.map((node) => node.text || ""); @@ -98,77 +102,3 @@ export const fetchEmbeddingsForNodes = async ( }, })); }; - -const uploadBatches = async ( - batches: LocalContentDataInput[][], - supabaseClient: DGSupabaseClient, - context: SupabaseContext, -) => { - const { spaceId, userId } = context; - for (let idx = 0; idx < batches.length; idx++) { - const batch = batches[idx]; - const { error } = await supabaseClient.rpc("upsert_content", { - data: batch as unknown as Json, - v_space_id: spaceId, - v_creator_id: userId, - content_as_document: true, - }); - - if (error) { - console.error(`upsert_content failed for batch ${idx + 1}:`, error); - throw error; - } - } -}; - -export const upsertNodesToSupabaseAsContentWithEmbeddings = async ( - roamNodes: RoamDiscourseNodeData[], - supabaseClient: DGSupabaseClient, - context: SupabaseContext, -): Promise => { - if (!context?.userId) { - console.error("No Supabase context found."); - return; - } - - if (roamNodes.length === 0) { - return; - } - const localContentNodes = convertRoamNodeToLocalContent({ - nodes: roamNodes, - }); - - let nodesWithEmbeddings: LocalContentDataInput[]; - try { - nodesWithEmbeddings = await fetchEmbeddingsForNodes(localContentNodes); - } catch (error: unknown) { - const errorMessage = error instanceof Error ? error.message : String(error); - console.error( - `upsertNodesToSupabaseAsContentWithEmbeddings: Embedding service failed – ${errorMessage}`, - ); - return; - } - - if (nodesWithEmbeddings.length !== roamNodes.length) { - console.error( - "upsertNodesToSupabaseAsContentWithEmbeddings: Mismatch between node and embedding counts.", - ); - return; - } - - const batchSize = 200; - - const chunk = (array: T[], size: number): T[][] => { - const chunks: T[][] = []; - for (let i = 0; i < array.length; i += size) { - chunks.push(array.slice(i, i + size)); - } - return chunks; - }; - - await uploadBatches( - chunk(nodesWithEmbeddings, batchSize), - supabaseClient, - context, - ); -}; diff --git a/packages/database/src/dbTypes.ts b/packages/database/src/dbTypes.ts index 03e42dfe5..285c56403 100644 --- a/packages/database/src/dbTypes.ts +++ b/packages/database/src/dbTypes.ts @@ -1250,21 +1250,19 @@ export type Database = { Returns: { arity: number | null author_id: number | null - created: string | null + created: string description: string | null - epistemic_status: - | Database["public"]["Enums"]["EpistemicStatus"] - | null - id: number | null - is_schema: boolean | null - last_modified: string | null - literal_content: Json | null - name: string | null - reference_content: Json | null - refs: number[] | null + epistemic_status: Database["public"]["Enums"]["EpistemicStatus"] + id: number + is_schema: boolean + last_modified: string + literal_content: Json + name: string + reference_content: Json + refs: number[] represented_by_id: number | null schema_id: number | null - space_id: number | null + space_id: number }[] } concept_in_space: { @@ -1384,19 +1382,21 @@ export type Database = { Returns: { arity: number | null author_id: number | null - created: string + created: string | null description: string | null - epistemic_status: Database["public"]["Enums"]["EpistemicStatus"] - id: number - is_schema: boolean - last_modified: string - literal_content: Json - name: string - reference_content: Json - refs: number[] + epistemic_status: + | Database["public"]["Enums"]["EpistemicStatus"] + | null + id: number | null + is_schema: boolean | null + last_modified: string | null + literal_content: Json | null + name: string | null + reference_content: Json | null + refs: number[] | null represented_by_id: number | null schema_id: number | null - space_id: number + space_id: number | null }[] } is_my_account: { diff --git a/packages/database/supabase/migrations/20251017151558_update_content_text_delete_embeddings.sql b/packages/database/supabase/migrations/20251017151558_update_content_text_delete_embeddings.sql new file mode 100644 index 000000000..3dfeeb1b0 --- /dev/null +++ b/packages/database/supabase/migrations/20251017151558_update_content_text_delete_embeddings.sql @@ -0,0 +1,16 @@ +-- on update content text trigger +CREATE OR REPLACE FUNCTION public.after_update_content_text_trigger () +RETURNS TRIGGER +SET search_path = '' +LANGUAGE plpgsql +AS $$ +BEGIN + IF OLD.text != NEW.text THEN + DELETE FROM public."ContentEmbedding_openai_text_embedding_3_small_1536" WHERE target_id = NEW.id; + END IF; + RETURN NEW; +END; +$$ ; + +CREATE TRIGGER on_update_text_trigger AFTER UPDATE ON public."Content" +FOR EACH ROW EXECUTE FUNCTION public.after_update_content_text_trigger () ; diff --git a/packages/database/supabase/schemas/embedding.sql b/packages/database/supabase/schemas/embedding.sql index 5e02fddc1..76d1e7483 100644 --- a/packages/database/supabase/schemas/embedding.sql +++ b/packages/database/supabase/schemas/embedding.sql @@ -30,24 +30,24 @@ GRANT ALL ON TABLE public."ContentEmbedding_openai_text_embedding_3_small_1536" CREATE OR REPLACE VIEW public.my_contents_with_embedding_openai_text_embedding_3_small_1536 AS SELECT - ct.id, - ct.document_id, - ct.source_local_id, - ct.variant, - ct.author_id, - ct.creator_id, - ct.created, - ct.text, - ct.metadata, - ct.scale, - ct.space_id, - ct.last_modified, - ct.part_of_id, - emb.model, - emb.vector +ct.id, +ct.document_id, +ct.source_local_id, +ct.variant, +ct.author_id, +ct.creator_id, +ct.created, +ct.text, +ct.metadata, +ct.scale, +ct.space_id, +ct.last_modified, +ct.part_of_id, +emb.model, +emb.vector FROM public."Content" AS ct -JOIN public."ContentEmbedding_openai_text_embedding_3_small_1536" AS emb ON (ct.id=emb.target_id) -WHERE ct.space_id = any(public.my_space_ids()) AND NOT emb.obsolete; +JOIN public."ContentEmbedding_openai_text_embedding_3_small_1536" AS emb ON (ct.id = emb.target_id) +WHERE ct.space_id = any (public.my_space_ids ()) AND NOT emb.obsolete ; set search_path to public, extensions ; @@ -124,3 +124,20 @@ ALTER TABLE public."ContentEmbedding_openai_text_embedding_3_small_1536" ENABLE DROP POLICY IF EXISTS embedding_openai_te3s_1536_policy ON public."ContentEmbedding_openai_text_embedding_3_small_1536" ; CREATE POLICY embedding_openai_te3s_1536_policy ON public."ContentEmbedding_openai_text_embedding_3_small_1536" FOR ALL USING (public.content_in_space (target_id)) ; + +-- on update content text trigger +CREATE OR REPLACE FUNCTION public.after_update_content_text_trigger () +RETURNS TRIGGER +SET search_path = '' +LANGUAGE plpgsql +AS $$ +BEGIN + IF OLD.text != NEW.text THEN + DELETE FROM public."ContentEmbedding_openai_text_embedding_3_small_1536" WHERE target_id = NEW.id; + END IF; + RETURN NEW; +END; +$$ ; + +CREATE TRIGGER on_update_text_trigger AFTER UPDATE ON public."Content" +FOR EACH ROW EXECUTE FUNCTION public.after_update_content_text_trigger () ;