diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b94dac21da..d84c0337200 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,13 @@ and this project adheres to backfilled by a background worker rather than computed synchronously on every insert, so log search is eventually-consistent (typically within a minute). [#4425](https://github.com/OpenFn/lightning/issues/4425) +- Dataclip inserts no longer roll back when building the full-text search vector + is slow. The `jsonb_to_tsvector` work that ran in an `AFTER INSERT` trigger + could hold the connection past the timeout and roll back the insert, losing + the whole run. The search vector is now built off the insert path by a + background `Lightning.Invocation.DataclipSearchVectorWorker` (sharing the + `search_indexing` queue with the log-lines worker), making dataclip search + eventually consistent. [#4800](https://github.com/OpenFn/lightning/issues/4800) - Channel join crashes when multiple users open the same workflow concurrently [#4802](https://github.com/OpenFn/lightning/issues/4802) - Fix `purge_deleted` Oban job crashing when a soft-deleted project has diff --git a/config/config.exs b/config/config.exs index 2881e963a58..729fb033e3c 100644 --- a/config/config.exs +++ b/config/config.exs @@ -189,6 +189,8 @@ config :lightning, :claim_work_mem, nil config :lightning, :log_lines_search_indexing, batch_size: 2_500, max_batches: 10 +config :lightning, :dataclip_search_indexing, batch_size: 250, max_batches: 10 + config :lightning, Lightning.Runtime.RuntimeManager, start: false config :lightning, LightningWeb.CollectionsController, diff --git a/config/test.exs b/config/test.exs index 4456d81180b..cb2d71914b7 100644 --- a/config/test.exs +++ b/config/test.exs @@ -137,6 +137,8 @@ config :lightning, :is_resettable_demo, true # the per-run budget guard, exercising the snowball follow-up path. config :lightning, :log_lines_search_indexing, batch_size: 2, max_batches: 2 +config :lightning, :dataclip_search_indexing, batch_size: 2, max_batches: 2 + config :lightning, :github_app, app_id: "111111", app_name: "test-github", diff --git a/lib/lightning/config.ex b/lib/lightning/config.ex index 734864e1018..65025b56f76 100644 --- a/lib/lightning/config.ex +++ b/lib/lightning/config.ex @@ -111,6 +111,20 @@ defmodule Lightning.Config do Application.get_env(:lightning, :log_lines_search_indexing, []) end + @impl true + def dataclip_search_indexing_batch_size do + dataclip_search_indexing_config() |> Keyword.fetch!(:batch_size) + end + + @impl true + def dataclip_search_indexing_max_batches do + dataclip_search_indexing_config() |> Keyword.fetch!(:max_batches) + end + + defp dataclip_search_indexing_config do + Application.get_env(:lightning, :dataclip_search_indexing, []) + end + @impl true def default_ecto_database_timeout do Application.get_env(:lightning, Lightning.Repo) |> Keyword.get(:timeout) @@ -499,6 +513,8 @@ defmodule Lightning.Config do @callback activity_cleanup_chunk_size() :: integer() @callback log_lines_search_indexing_batch_size() :: pos_integer() @callback log_lines_search_indexing_max_batches() :: pos_integer() + @callback dataclip_search_indexing_batch_size() :: pos_integer() + @callback dataclip_search_indexing_max_batches() :: pos_integer() @callback default_ecto_database_timeout() :: integer() @callback repo_connection_token_signer() :: Joken.Signer.t() @callback reset_password_token_validity_in_days() :: integer() @@ -621,6 +637,14 @@ defmodule Lightning.Config do impl().log_lines_search_indexing_max_batches() end + def dataclip_search_indexing_batch_size do + impl().dataclip_search_indexing_batch_size() + end + + def dataclip_search_indexing_max_batches do + impl().dataclip_search_indexing_max_batches() + end + def default_ecto_database_timeout do impl().default_ecto_database_timeout() end diff --git a/lib/lightning/config/bootstrap.ex b/lib/lightning/config/bootstrap.ex index 69daccf456f..39c0a4cd2bd 100644 --- a/lib/lightning/config/bootstrap.ex +++ b/lib/lightning/config/bootstrap.ex @@ -271,7 +271,8 @@ defmodule Lightning.Config.Bootstrap do # TODO - move this into an ENV? {"17 */2 * * *", Lightning.Projects, args: %{"type" => "data_retention"}}, {"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker}, - {"* * * * *", Lightning.LogLines.SearchVectorWorker} + {"* * * * *", Lightning.LogLines.SearchVectorWorker}, + {"* * * * *", Lightning.Invocation.DataclipSearchVectorWorker} ] cleanup_cron = @@ -302,7 +303,11 @@ defmodule Lightning.Config.Bootstrap do background: 1, history_exports: 1, ai_assistant: 10, - search_indexing: 1 + # Shared by Lightning.LogLines.SearchVectorWorker and + # Lightning.Invocation.DataclipSearchVectorWorker. Concurrency 2 gives + # each worker its own slot so their snowball re-enqueue chains run in + # parallel and never starve one another. + search_indexing: 2 ] # https://plausible.io/ is an open-source, privacy-friendly alternative to diff --git a/lib/lightning/invocation/dataclip_search_vector_worker.ex b/lib/lightning/invocation/dataclip_search_vector_worker.ex new file mode 100644 index 00000000000..656d57ed3f5 --- /dev/null +++ b/lib/lightning/invocation/dataclip_search_vector_worker.ex @@ -0,0 +1,95 @@ +defmodule Lightning.Invocation.DataclipSearchVectorWorker do + @moduledoc """ + Backfills the full-text `search_vector` on `dataclips` rows. + + Dataclips are inserted with `search_vector` left `NULL`; the vector is built + here rather than on the insert path. Building it inline was risky: + `jsonb_to_tsvector` over a large dataclip body is slow and runs inside the + transaction that persists the run, so a slow (or failing) vector build could + roll back the dataclip insert and lose the run (#4800). Deferring it keeps + `jsonb_to_tsvector` off that hot path. Search is eventually consistent as a + result, typically catching up within a minute. + + Two database objects support this: `safe_jsonb_to_tsvector(regconfig, jsonb)`, + which builds the vector from the dataclip body while tolerating NULL and + oversized input, and a partial index over `search_vector IS NULL`, which keeps + locating pending rows cheap as the table grows. Vectors use the + `english_nostop` config to match the read side (`Lightning.Invocation`), which + queries with `to_tsquery('english_nostop', ...)`. + + Each run drains pending rows newest-first, in batches up to a per-run budget + (batch size and max batches are configurable via `Lightning.Config`). A run + that exhausts its budget leaves backlog behind + and enqueues an immediate follow-up ("snowball"); otherwise the minute-ly cron + tick keeps pace. The worker shares the `search_indexing` queue with + `Lightning.LogLines.SearchVectorWorker`; that queue runs at concurrency 2, so + the two workers each get a slot and their snowball chains never starve one + another. The cron tick and the snowball carry distinct `trigger` args, so job + uniqueness allows one of each to queue but never a duplicate. + """ + + use Oban.Worker, + queue: :search_indexing, + priority: 1, + max_attempts: 10, + # Restrict uniqueness to queued states. Oban's defaults also dedup against + # :executing/:completed, so a running snowball would match itself and fail + # to enqueue its successor — breaking the chain after one hop. + unique: [period: 55, keys: [:trigger], states: [:available, :scheduled]] + + alias Lightning.Repo + + require Logger + + @drain_sql """ + WITH pending AS ( + SELECT id FROM dataclips + WHERE search_vector IS NULL + ORDER BY inserted_at DESC + LIMIT $1 FOR UPDATE SKIP LOCKED + ) + UPDATE dataclips d + SET search_vector = + safe_jsonb_to_tsvector('public.english_nostop'::regconfig, d.body) + FROM pending p WHERE d.id = p.id + """ + + @impl Oban.Worker + def perform(%Oban.Job{}) do + batch_size = Lightning.Config.dataclip_search_indexing_batch_size() + max_batches = Lightning.Config.dataclip_search_indexing_max_batches() + + {filled, budget_exhausted?} = drain(0, 0, batch_size, max_batches) + + Logger.info(fn -> + # coveralls-ignore-start + "Invocation.DataclipSearchVectorWorker filled #{filled} search_vector row(s)." + # coveralls-ignore-stop + end) + + if budget_exhausted? do + # Budget exhausted, so backlog likely remains: enqueue an immediate + # follow-up rather than waiting for the next cron tick. + Oban.insert(Lightning.Oban, __MODULE__.new(%{"trigger" => "snowball"})) + end + + {:ok, filled} + end + + # Drains up to max_batches batches, accumulating the number of rows filled. + # Returns {filled, budget_exhausted?}. Stops early when a batch fills fewer + # than batch_size rows (backlog drained). + defp drain(filled, batches, batch_size, max_batches) do + if batches >= max_batches do + {filled, true} + else + %{num_rows: num_rows} = Repo.query!(@drain_sql, [batch_size]) + + if num_rows < batch_size do + {filled + num_rows, false} + else + drain(filled + num_rows, batches + 1, batch_size, max_batches) + end + end + end +end diff --git a/priv/repo/migrations/20260530184652_add_safe_jsonb_to_tsvector_function.exs b/priv/repo/migrations/20260530184652_add_safe_jsonb_to_tsvector_function.exs new file mode 100644 index 00000000000..fe9317d2493 --- /dev/null +++ b/priv/repo/migrations/20260530184652_add_safe_jsonb_to_tsvector_function.exs @@ -0,0 +1,24 @@ +defmodule Lightning.Repo.Migrations.AddSafeJsonbToTsvectorFunction do + use Ecto.Migration + + def up do + # Deliberately not STRICT: a STRICT function returns NULL for a NULL doc, + # which would leave search_vector NULL forever and stuck in the pending + # index (e.g. a wiped dataclip with a NULL body). COALESCE instead so the + # result is always a non-NULL tsvector. + execute(""" + CREATE OR REPLACE FUNCTION safe_jsonb_to_tsvector(config regconfig, doc jsonb) + RETURNS tsvector LANGUAGE plpgsql IMMUTABLE AS $$ + BEGIN + RETURN jsonb_to_tsvector(config, COALESCE(doc, '{}'::jsonb), '"all"'); + EXCEPTION WHEN program_limit_exceeded THEN + RETURN ''::tsvector; + END; + $$; + """) + end + + def down do + execute("DROP FUNCTION IF EXISTS safe_jsonb_to_tsvector(regconfig, jsonb);") + end +end diff --git a/priv/repo/migrations/20260530184653_add_dataclips_pending_search_index.exs b/priv/repo/migrations/20260530184653_add_dataclips_pending_search_index.exs new file mode 100644 index 00000000000..690b0c58c11 --- /dev/null +++ b/priv/repo/migrations/20260530184653_add_dataclips_pending_search_index.exs @@ -0,0 +1,23 @@ +defmodule Lightning.Repo.Migrations.AddDataclipsPendingSearchIndex do + use Ecto.Migration + + @disable_ddl_transaction true + @disable_migration_lock true + + def up do + # Partial index that keeps "find pending rows" cheap for the background + # indexing worker. dataclips is an unpartitioned table, so this is a single + # CONCURRENTLY-built index (no per-partition build-and-attach dance). The + # index stays small because existing rows already have a non-NULL + # search_vector, so only freshly-inserted, not-yet-indexed rows match. + execute(""" + CREATE INDEX CONCURRENTLY IF NOT EXISTS dataclips_pending_search_idx + ON dataclips (inserted_at) + WHERE search_vector IS NULL + """) + end + + def down do + execute("DROP INDEX CONCURRENTLY IF EXISTS dataclips_pending_search_idx") + end +end diff --git a/priv/repo/migrations/20260530184654_drop_dataclip_search_vector_trigger.exs b/priv/repo/migrations/20260530184654_drop_dataclip_search_vector_trigger.exs new file mode 100644 index 00000000000..679b287a00d --- /dev/null +++ b/priv/repo/migrations/20260530184654_drop_dataclip_search_vector_trigger.exs @@ -0,0 +1,40 @@ +defmodule Lightning.Repo.Migrations.DropDataclipSearchVectorTrigger do + use Ecto.Migration + + def up do + execute("SET lock_timeout = '5s'") + execute("DROP TRIGGER IF EXISTS set_search_vector ON dataclips") + execute("DROP FUNCTION IF EXISTS update_dataclip_search_vector()") + end + + def down do + # Restore the program_limit_exceeded-catching version of the function + # (from 20250219122902), not the original naive one. + execute(""" + CREATE OR REPLACE FUNCTION update_dataclip_search_vector() + RETURNS trigger + LANGUAGE plpgsql + AS $function$ + BEGIN + BEGIN + UPDATE dataclips + SET search_vector = jsonb_to_tsvector('english_nostop', body, '"all"') + WHERE id = NEW.id; + EXCEPTION + WHEN program_limit_exceeded THEN + RAISE NOTICE 'Message too long for tsvector at id: %. Error: %', NEW.id, SQLERRM; + END; + + RETURN NEW; + END; + $function$; + """) + + execute(""" + CREATE TRIGGER set_search_vector + AFTER INSERT ON dataclips FOR EACH ROW + WHEN (NEW."search_vector" IS NULL) + EXECUTE PROCEDURE update_dataclip_search_vector(); + """) + end +end diff --git a/test/lightning/invocation/dataclip_search_vector_worker_test.exs b/test/lightning/invocation/dataclip_search_vector_worker_test.exs new file mode 100644 index 00000000000..b0fcaf4624e --- /dev/null +++ b/test/lightning/invocation/dataclip_search_vector_worker_test.exs @@ -0,0 +1,205 @@ +defmodule Lightning.Invocation.DataclipSearchVectorWorkerTest do + use Lightning.DataCase, async: false + + import Lightning.Factories + + alias Lightning.Invocation.DataclipSearchVectorWorker + alias Lightning.Repo + + # Reads back the pending/searchable state of a dataclip's search_vector. The + # dataclip body lives in jsonb and the vector is built from it with the + # `english_nostop` config to match the read side (`Lightning.Invocation`). + defp search_vector_state(id, term) do + %{rows: [[is_null, matches]]} = + Repo.query!( + """ + SELECT search_vector IS NULL, + COALESCE( + search_vector @@ to_tsquery('english_nostop', $2), + false + ) + FROM dataclips WHERE id = $1::uuid + """, + [Ecto.UUID.dump!(id), term] + ) + + %{null?: is_null, matches?: matches} + end + + defp search_vector_text(id) do + %{rows: [[vector]]} = + Repo.query!( + "SELECT search_vector::text FROM dataclips WHERE id = $1::uuid", + [Ecto.UUID.dump!(id)] + ) + + vector + end + + describe "perform/1" do + test "fills search_vector for pending dataclips so they become searchable" do + dataclip = + insert(:dataclip, body: %{"greeting" => "searchableword in body"}) + + # The insert path no longer builds the vector, so it starts NULL. + assert %{null?: true, matches?: false} = + search_vector_state(dataclip.id, "searchableword") + + assert {:ok, 1} = perform_job(DataclipSearchVectorWorker, %{}) + + assert %{null?: false, matches?: true} = + search_vector_state(dataclip.id, "searchableword") + end + + test "a NULL/wiped body becomes an empty vector that leaves the pending set" do + dataclip = insert(:dataclip, body: %{"foo" => "bar"}) + + # Mimic a wiped dataclip: body NULL, search_vector still pending. + Repo.query!( + "UPDATE dataclips SET body = NULL, search_vector = NULL WHERE id = $1::uuid", + [Ecto.UUID.dump!(dataclip.id)] + ) + + assert {:ok, 1} = perform_job(DataclipSearchVectorWorker, %{}) + + # Non-NULL empty vector: the row leaves the pending set (won't be retried + # forever) and matches nothing. + assert %{null?: false, matches?: false} = + search_vector_state(dataclip.id, "bar") + + assert search_vector_text(dataclip.id) == "" + end + + test "an oversized body becomes an empty vector without rolling back the batch" do + normal = insert(:dataclip, body: %{"note" => "normalsearchableword"}) + + # ~200k distinct words exceeds the 1MB tsvector limit; + # safe_jsonb_to_tsvector swallows the program_limit_exceeded and returns + # ''::tsvector rather than raising and aborting the whole batch. + oversized_value = + 1..200_000 + |> Enum.map_join(" ", &"w#{&1}") + + oversized = insert(:dataclip, body: %{"data" => oversized_value}) + + assert {:ok, 2} = perform_job(DataclipSearchVectorWorker, %{}) + + # The normal row in the same batch still got indexed. + assert %{null?: false, matches?: true} = + search_vector_state(normal.id, "normalsearchableword") + + # The oversized row is set to a non-NULL empty vector so it leaves the + # pending set, but matches nothing and the worker did not crash. + assert %{null?: false} = search_vector_state(oversized.id, "w1") + assert search_vector_text(oversized.id) == "" + end + + test "drains all pending dataclips across a batch" do + # Stay within the tiny test budget (batch_size: 2, max_batches: 2) so a + # single run drains everything: batch 1 fills 2, batch 2 fills 1 (< 2) and + # stops without tripping the budget. + dataclips = + for n <- 1..3 do + insert(:dataclip, body: %{"n" => "draindataclip#{n}"}) + end + + for dataclip <- dataclips do + assert %{null?: true} = + search_vector_state(dataclip.id, "draindataclip1") + end + + assert {:ok, 3} = perform_job(DataclipSearchVectorWorker, %{}) + + for dataclip <- dataclips do + assert %{null?: false} = + search_vector_state(dataclip.id, "draindataclip1") + end + end + + test "does not snowball when the per-run budget is not exhausted" do + for n <- 1..3, do: insert(:dataclip, body: %{"n" => "modest#{n}"}) + + Oban.Testing.with_testing_mode(:manual, fn -> + assert {:ok, 3} = perform_job(DataclipSearchVectorWorker, %{}) + + refute_enqueued(worker: DataclipSearchVectorWorker) + end) + end + + test "snowballs an immediate follow-up when the per-run budget is exhausted" do + # config/test.exs sets batch_size: 2, max_batches: 2. With 5 pending rows + # the run fills two full batches (4 rows), reaches max_batches, and trips + # the budget guard, leaving backlog behind and enqueuing a snowball. + for n <- 1..5, do: insert(:dataclip, body: %{"n" => "overflow#{n}"}) + + Oban.Testing.with_testing_mode(:manual, fn -> + assert {:ok, 4} = perform_job(DataclipSearchVectorWorker, %{}) + + assert_enqueued( + worker: DataclipSearchVectorWorker, + args: %{"trigger" => "snowball"} + ) + end) + end + + test "is idempotent: a second run with nothing pending fills 0 and snowballs nothing" do + for n <- 1..3, do: insert(:dataclip, body: %{"n" => "again#{n}"}) + + assert {:ok, 3} = perform_job(DataclipSearchVectorWorker, %{}) + + Oban.Testing.with_testing_mode(:manual, fn -> + assert {:ok, 0} = perform_job(DataclipSearchVectorWorker, %{}) + + refute_enqueued(worker: DataclipSearchVectorWorker) + end) + end + end + + describe "snowball uniqueness" do + # Guards the snowball chain: an executing job must be able to enqueue its + # successor. Oban's default unique states include :executing, so a snowball + # would otherwise match itself and the chain would die after one hop. The + # worker restricts uniqueness to [:available, :scheduled] to avoid this. + test "an executing snowball does not block enqueuing its successor" do + Oban.Testing.with_testing_mode(:manual, fn -> + {:ok, running} = + Oban.insert( + Lightning.Oban, + DataclipSearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + # Mimic Oban marking the job as executing while perform/1 runs. + from(j in Oban.Job, where: j.id == ^running.id) + |> Repo.update_all(set: [state: "executing"]) + + {:ok, successor} = + Oban.insert( + Lightning.Oban, + DataclipSearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + refute successor.conflict? + refute successor.id == running.id + end) + end + + test "two queued snowballs are deduped to one" do + Oban.Testing.with_testing_mode(:manual, fn -> + {:ok, first} = + Oban.insert( + Lightning.Oban, + DataclipSearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + {:ok, second} = + Oban.insert( + Lightning.Oban, + DataclipSearchVectorWorker.new(%{"trigger" => "snowball"}) + ) + + assert second.conflict? + assert second.id == first.id + end) + end + end +end diff --git a/test/lightning/invocation_test.exs b/test/lightning/invocation_test.exs index 2d278c1b22a..a0931df3895 100644 --- a/test/lightning/invocation_test.exs +++ b/test/lightning/invocation_test.exs @@ -1600,6 +1600,7 @@ defmodule Lightning.InvocationTest do ) flush_log_search_index() + flush_dataclip_search_index() %{ project: project, @@ -1803,6 +1804,9 @@ defmodule Lightning.InvocationTest do %{ project: project } do + # Positive control: the dataclip body vector is populated, so a known body + # token matches. Without this, a regression that leaves search_vector NULL + # would make the negative assertion below pass vacuously. assert [_found] = Invocation.search_workorders( project, @@ -1958,6 +1962,7 @@ defmodule Lightning.InvocationTest do ) flush_log_search_index() + flush_dataclip_search_index() %{ project: project, diff --git a/test/lightning/runs_test.exs b/test/lightning/runs_test.exs index b3a3477fd3b..cf5c6b0ad6f 100644 --- a/test/lightning/runs_test.exs +++ b/test/lightning/runs_test.exs @@ -385,6 +385,43 @@ defmodule Lightning.RunsTest do assert Jason.decode!(step.output_dataclip.body) == %{"foo" => "bar"} end + # Regression for #4800: dataclip inserts no longer build the search_vector + # synchronously (the AFTER INSERT trigger was dropped). Saving an output + # dataclip via the handler must succeed and the row must be retrievable with + # search_vector NULL — proving the insert path doesn't depend on building the + # vector, which is deferred to DataclipSearchVectorWorker. + test "saves the output dataclip with a NULL search_vector (deferred indexing)" do + dataclip = insert(:dataclip) + %{triggers: [trigger], jobs: [job]} = workflow = insert(:simple_workflow) + + %{runs: [run]} = + work_order_for(trigger, workflow: workflow, dataclip: dataclip) + |> insert() + + step = insert(:step, runs: [run], job: job, input_dataclip: dataclip) + output_dataclip_id = Ecto.UUID.generate() + + assert {:ok, _step} = + Runs.complete_step(%{ + step_id: step.id, + reason: "success", + output_dataclip: ~s({"deferred": "indexword"}), + output_dataclip_id: output_dataclip_id, + run_id: run.id, + project_id: workflow.project_id + }) + + %{rows: [[is_null]]} = + Repo.query!( + "SELECT search_vector IS NULL FROM dataclips WHERE id = $1::uuid", + [Ecto.UUID.dump!(output_dataclip_id)] + ) + + assert is_null, + "expected the saved dataclip's search_vector to be NULL " <> + "immediately after insert (deferred indexing)" + end + test "wipes the dataclip if erase_all retention policy is specified at the project level when the run is created" do %{triggers: [trigger], jobs: [job]} = workflow = insert(:simple_workflow) diff --git a/test/lightning_web/live/work_order_live_test.exs b/test/lightning_web/live/work_order_live_test.exs index 2fef50c8d9b..1b9631af98b 100644 --- a/test/lightning_web/live/work_order_live_test.exs +++ b/test/lightning_web/live/work_order_live_test.exs @@ -4,6 +4,7 @@ defmodule LightningWeb.WorkOrderLiveTest do import Phoenix.LiveViewTest import Lightning.Factories import Lightning.ApplicationHelpers, only: [dynamically_absorb_delay: 1] + import Lightning.TestUtils, only: [flush_dataclip_search_index: 0] alias Lightning.Runs alias Lightning.WorkOrders.Events @@ -1105,6 +1106,8 @@ defmodule LightningWeb.WorkOrderLiveTest do "step_id" => Ecto.UUID.generate() }) + flush_dataclip_search_index() + {:ok, view, _html} = live_async(conn, Routes.project_run_index_path(conn, :index, project.id)) diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex index 0484517767f..8f392d3120d 100644 --- a/test/support/test_utils.ex +++ b/test/support/test_utils.ex @@ -1,6 +1,7 @@ defmodule Lightning.TestUtils do @moduledoc false + alias Lightning.Invocation.DataclipSearchVectorWorker alias Lightning.LogLines.SearchVectorWorker @doc """ @@ -41,6 +42,28 @@ defmodule Lightning.TestUtils do indexed end + @doc """ + Drain the deferred `dataclips.search_vector` backlog synchronously. + + Dataclips are inserted with `search_vector` left NULL and indexed asynchronously + by `Lightning.Invocation.DataclipSearchVectorWorker`. In tests that insert + dataclips and then query dataclip body search, call this after inserting (and + before searching) so the vector is populated within the SQL sandbox. + + Runs the worker in-process via `Oban.Testing.perform_job/3`, so it sees the + uncommitted sandbox rows. Returns the number of rows indexed; a no-op (`0`) when + nothing is pending, so it is safe to call unconditionally. + """ + @spec flush_dataclip_search_index() :: non_neg_integer() + def flush_dataclip_search_index do + {:ok, indexed} = + Oban.Testing.perform_job(DataclipSearchVectorWorker, %{}, + repo: Lightning.Repo + ) + + indexed + end + @doc """ Merge the given setups into the given context. Just works a bit like `setup` on ExUnit.Case.