Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ and this project adheres to
backfilled by a background worker rather than computed synchronously on every
insert, so log search is eventually-consistent (typically within a minute).
[#4425](https://github.com/OpenFn/lightning/issues/4425)
- Dataclip inserts no longer roll back when building the full-text search vector
is slow. The `jsonb_to_tsvector` work that ran in an `AFTER INSERT` trigger
could hold the connection past the timeout and roll back the insert, losing
the whole run. The search vector is now built off the insert path by a
background `Lightning.Invocation.DataclipSearchVectorWorker` (sharing the
`search_indexing` queue with the log-lines worker), making dataclip search
eventually consistent. [#4800](https://github.com/OpenFn/lightning/issues/4800)
- Channel join crashes when multiple users open the same workflow concurrently
[#4802](https://github.com/OpenFn/lightning/issues/4802)
- Fix `purge_deleted` Oban job crashing when a soft-deleted project has
Expand Down
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ config :lightning, :claim_work_mem, nil

config :lightning, :log_lines_search_indexing, batch_size: 2_500, max_batches: 10

config :lightning, :dataclip_search_indexing, batch_size: 250, max_batches: 10

config :lightning, Lightning.Runtime.RuntimeManager, start: false

config :lightning, LightningWeb.CollectionsController,
Expand Down
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ config :lightning, :is_resettable_demo, true
# the per-run budget guard, exercising the snowball follow-up path.
config :lightning, :log_lines_search_indexing, batch_size: 2, max_batches: 2

config :lightning, :dataclip_search_indexing, batch_size: 2, max_batches: 2

config :lightning, :github_app,
app_id: "111111",
app_name: "test-github",
Expand Down
24 changes: 24 additions & 0 deletions lib/lightning/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ defmodule Lightning.Config do
Application.get_env(:lightning, :log_lines_search_indexing, [])
end

@impl true
def dataclip_search_indexing_batch_size do
dataclip_search_indexing_config() |> Keyword.fetch!(:batch_size)
end

@impl true
def dataclip_search_indexing_max_batches do
dataclip_search_indexing_config() |> Keyword.fetch!(:max_batches)
end

defp dataclip_search_indexing_config do
Application.get_env(:lightning, :dataclip_search_indexing, [])
end

@impl true
def default_ecto_database_timeout do
Application.get_env(:lightning, Lightning.Repo) |> Keyword.get(:timeout)
Expand Down Expand Up @@ -499,6 +513,8 @@ defmodule Lightning.Config do
@callback activity_cleanup_chunk_size() :: integer()
@callback log_lines_search_indexing_batch_size() :: pos_integer()
@callback log_lines_search_indexing_max_batches() :: pos_integer()
@callback dataclip_search_indexing_batch_size() :: pos_integer()
@callback dataclip_search_indexing_max_batches() :: pos_integer()
@callback default_ecto_database_timeout() :: integer()
@callback repo_connection_token_signer() :: Joken.Signer.t()
@callback reset_password_token_validity_in_days() :: integer()
Expand Down Expand Up @@ -621,6 +637,14 @@ defmodule Lightning.Config do
impl().log_lines_search_indexing_max_batches()
end

def dataclip_search_indexing_batch_size do
impl().dataclip_search_indexing_batch_size()
end

def dataclip_search_indexing_max_batches do
impl().dataclip_search_indexing_max_batches()
end

def default_ecto_database_timeout do
impl().default_ecto_database_timeout()
end
Expand Down
9 changes: 7 additions & 2 deletions lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ defmodule Lightning.Config.Bootstrap do
# TODO - move this into an ENV?
{"17 */2 * * *", Lightning.Projects, args: %{"type" => "data_retention"}},
{"*/10 * * * *", Lightning.KafkaTriggers.DuplicateTrackingCleanupWorker},
{"* * * * *", Lightning.LogLines.SearchVectorWorker}
{"* * * * *", Lightning.LogLines.SearchVectorWorker},
{"* * * * *", Lightning.Invocation.DataclipSearchVectorWorker}
]

cleanup_cron =
Expand Down Expand Up @@ -302,7 +303,11 @@ defmodule Lightning.Config.Bootstrap do
background: 1,
history_exports: 1,
ai_assistant: 10,
search_indexing: 1
# Shared by Lightning.LogLines.SearchVectorWorker and
# Lightning.Invocation.DataclipSearchVectorWorker. Concurrency 2 gives
# each worker its own slot so their snowball re-enqueue chains run in
# parallel and never starve one another.
search_indexing: 2
]

# https://plausible.io/ is an open-source, privacy-friendly alternative to
Expand Down
95 changes: 95 additions & 0 deletions lib/lightning/invocation/dataclip_search_vector_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
defmodule Lightning.Invocation.DataclipSearchVectorWorker do
@moduledoc """
Backfills the full-text `search_vector` on `dataclips` rows.

Dataclips are inserted with `search_vector` left `NULL`; the vector is built
here rather than on the insert path. Building it inline was risky:
`jsonb_to_tsvector` over a large dataclip body is slow and runs inside the
transaction that persists the run, so a slow (or failing) vector build could
roll back the dataclip insert and lose the run (#4800). Deferring it keeps
`jsonb_to_tsvector` off that hot path. Search is eventually consistent as a
result, typically catching up within a minute.

Two database objects support this: `safe_jsonb_to_tsvector(regconfig, jsonb)`,
which builds the vector from the dataclip body while tolerating NULL and
oversized input, and a partial index over `search_vector IS NULL`, which keeps
locating pending rows cheap as the table grows. Vectors use the
`english_nostop` config to match the read side (`Lightning.Invocation`), which
queries with `to_tsquery('english_nostop', ...)`.

Each run drains pending rows newest-first, in batches up to a per-run budget
(batch size and max batches are configurable via `Lightning.Config`). A run
that exhausts its budget leaves backlog behind
and enqueues an immediate follow-up ("snowball"); otherwise the minute-ly cron
tick keeps pace. The worker shares the `search_indexing` queue with
`Lightning.LogLines.SearchVectorWorker`; that queue runs at concurrency 2, so
the two workers each get a slot and their snowball chains never starve one
another. The cron tick and the snowball carry distinct `trigger` args, so job
uniqueness allows one of each to queue but never a duplicate.
"""

use Oban.Worker,
queue: :search_indexing,
priority: 1,
max_attempts: 10,
# Restrict uniqueness to queued states. Oban's defaults also dedup against
# :executing/:completed, so a running snowball would match itself and fail
# to enqueue its successor — breaking the chain after one hop.
unique: [period: 55, keys: [:trigger], states: [:available, :scheduled]]

alias Lightning.Repo

require Logger

@drain_sql """
WITH pending AS (
SELECT id FROM dataclips
WHERE search_vector IS NULL
ORDER BY inserted_at DESC
LIMIT $1 FOR UPDATE SKIP LOCKED
)
UPDATE dataclips d
SET search_vector =
safe_jsonb_to_tsvector('public.english_nostop'::regconfig, d.body)
FROM pending p WHERE d.id = p.id
"""

@impl Oban.Worker
def perform(%Oban.Job{}) do
batch_size = Lightning.Config.dataclip_search_indexing_batch_size()
max_batches = Lightning.Config.dataclip_search_indexing_max_batches()

{filled, budget_exhausted?} = drain(0, 0, batch_size, max_batches)

Logger.info(fn ->
# coveralls-ignore-start
"Invocation.DataclipSearchVectorWorker filled #{filled} search_vector row(s)."
# coveralls-ignore-stop
end)

if budget_exhausted? do
# Budget exhausted, so backlog likely remains: enqueue an immediate
# follow-up rather than waiting for the next cron tick.
Oban.insert(Lightning.Oban, __MODULE__.new(%{"trigger" => "snowball"}))
end

{:ok, filled}
end

# Drains up to max_batches batches, accumulating the number of rows filled.
# Returns {filled, budget_exhausted?}. Stops early when a batch fills fewer
# than batch_size rows (backlog drained).
defp drain(filled, batches, batch_size, max_batches) do
if batches >= max_batches do
{filled, true}
else
%{num_rows: num_rows} = Repo.query!(@drain_sql, [batch_size])

if num_rows < batch_size do
{filled + num_rows, false}
else
drain(filled + num_rows, batches + 1, batch_size, max_batches)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Lightning.Repo.Migrations.AddSafeJsonbToTsvectorFunction do
use Ecto.Migration

def up do
# Deliberately not STRICT: a STRICT function returns NULL for a NULL doc,
# which would leave search_vector NULL forever and stuck in the pending
# index (e.g. a wiped dataclip with a NULL body). COALESCE instead so the
# result is always a non-NULL tsvector.
execute("""
CREATE OR REPLACE FUNCTION safe_jsonb_to_tsvector(config regconfig, doc jsonb)
RETURNS tsvector LANGUAGE plpgsql IMMUTABLE AS $$
BEGIN
RETURN jsonb_to_tsvector(config, COALESCE(doc, '{}'::jsonb), '"all"');
EXCEPTION WHEN program_limit_exceeded THEN
RETURN ''::tsvector;
END;
$$;
""")
end

def down do
execute("DROP FUNCTION IF EXISTS safe_jsonb_to_tsvector(regconfig, jsonb);")
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Lightning.Repo.Migrations.AddDataclipsPendingSearchIndex do
use Ecto.Migration

@disable_ddl_transaction true
@disable_migration_lock true

def up do
# Partial index that keeps "find pending rows" cheap for the background
# indexing worker. dataclips is an unpartitioned table, so this is a single
# CONCURRENTLY-built index (no per-partition build-and-attach dance). The
# index stays small because existing rows already have a non-NULL
# search_vector, so only freshly-inserted, not-yet-indexed rows match.
execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS dataclips_pending_search_idx
ON dataclips (inserted_at)
WHERE search_vector IS NULL
""")
end

def down do
execute("DROP INDEX CONCURRENTLY IF EXISTS dataclips_pending_search_idx")
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
defmodule Lightning.Repo.Migrations.DropDataclipSearchVectorTrigger do
use Ecto.Migration

def up do
execute("SET lock_timeout = '5s'")
execute("DROP TRIGGER IF EXISTS set_search_vector ON dataclips")
execute("DROP FUNCTION IF EXISTS update_dataclip_search_vector()")
end

def down do
# Restore the program_limit_exceeded-catching version of the function
# (from 20250219122902), not the original naive one.
execute("""
CREATE OR REPLACE FUNCTION update_dataclip_search_vector()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
BEGIN
UPDATE dataclips
SET search_vector = jsonb_to_tsvector('english_nostop', body, '"all"')
WHERE id = NEW.id;
EXCEPTION
WHEN program_limit_exceeded THEN
RAISE NOTICE 'Message too long for tsvector at id: %. Error: %', NEW.id, SQLERRM;
END;

RETURN NEW;
END;
$function$;
""")

execute("""
CREATE TRIGGER set_search_vector
AFTER INSERT ON dataclips FOR EACH ROW
WHEN (NEW."search_vector" IS NULL)
EXECUTE PROCEDURE update_dataclip_search_vector();
""")
end
end
Loading
Loading