diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f946e7d74..75ad253894 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,10 +34,16 @@ and this project adheres to [#1588](https://github.com/OpenFn/lightning/issues/1588) ### Fixed + - Tooltip gets stuck when switching pages [#3559](https://github.com/OpenFn/lightning/pull/3559) - Current run dataclip stuck when switching nodes [#3560](https://github.com/OpenFn/lightning/pull/3560) +- Release `:claimed` runs back to `:available` state if their `run_channel` is + never joined because of worker timeout/network issues. We can do this because + we know that if the run channel never gets joined, the worker never receives + data and cannot begin running the run. + [#3565](https://github.com/OpenFn/lightning/issues/3565) ## [v2.14.4] - 2025-09-09 diff --git a/lib/lightning/config.ex b/lib/lightning/config.ex index 1fcdf3d1f3..6f557239a9 100644 --- a/lib/lightning/config.ex +++ b/lib/lightning/config.ex @@ -61,12 +61,12 @@ defmodule Lightning.Config do end @impl true - def grace_period do + def grace_period_seconds do Application.get_env(:lightning, :run_grace_period_seconds) end @impl true - def default_max_run_duration do + def default_max_run_duration_seconds do Application.get_env(:lightning, :max_run_duration_seconds) end @@ -324,6 +324,11 @@ defmodule Lightning.Config do Application.get_env(:lightning, :per_workflow_claim_limit, 50) end + @impl true + def run_channel_join_timeout_seconds do + Application.get_env(:lightning, :run_channel_join_timeout_seconds, 30) + end + @impl true def metrics_run_performance_age_seconds do metrics_config() |> Keyword.get(:run_performance_age_seconds) @@ -397,11 +402,11 @@ defmodule Lightning.Config do @callback apollo(key :: atom() | nil) :: map() @callback check_flag?(atom()) :: boolean() | nil @callback cors_origin() :: list() - @callback default_max_run_duration() :: integer() + @callback default_max_run_duration_seconds() :: integer() @callback email_sender_name() :: String.t() @callback get_extension_mod(key :: atom()) :: any() @callback google(key :: atom()) :: any() - @callback grace_period() :: integer() + @callback grace_period_seconds() :: integer() @callback instance_admin_email() :: String.t() @callback kafka_alternate_storage_enabled?() :: boolean() @callback kafka_alternate_storage_file_path() :: String.t() @@ -447,6 +452,7 @@ defmodule Lightning.Config do @callback external_metrics_module() :: module() | nil @callback ai_assistant_modes() :: %{atom() => module()} @callback per_workflow_claim_limit() :: pos_integer() + @callback run_channel_join_timeout_seconds() :: pos_integer() @callback sentry() :: module() @callback webhook_retry() :: Keyword.t() @callback webhook_retry(key :: atom()) :: any() @@ -493,15 +499,15 @@ defmodule Lightning.Config do The returned value is in seconds. """ - def grace_period do - impl().grace_period() + def grace_period_seconds do + impl().grace_period_seconds() end @doc """ Returns the default maximum run duration in seconds. """ - def default_max_run_duration do - impl().default_max_run_duration() + def default_max_run_duration_seconds do + impl().default_max_run_duration_seconds() end def repo_connection_token_signer do @@ -684,6 +690,10 @@ defmodule Lightning.Config do impl().per_workflow_claim_limit() end + def run_channel_join_timeout_seconds do + impl().run_channel_join_timeout_seconds() + end + def sentry do impl().sentry() end diff --git a/lib/lightning/config/bootstrap.ex b/lib/lightning/config/bootstrap.ex index eeacf8ce39..a1b60e5bbb 100644 --- a/lib/lightning/config/bootstrap.ex +++ b/lib/lightning/config/bootstrap.ex @@ -278,6 +278,10 @@ defmodule Lightning.Config.Bootstrap do :max_run_duration_seconds, env!("WORKER_MAX_RUN_DURATION_SECONDS", :integer, 300) + config :lightning, + :run_channel_join_timeout_seconds, + env!("RUN_CHANNEL_JOIN_TIMEOUT_SECONDS", :integer, 30) + config :lightning, :max_dataclip_size_bytes, env!("MAX_DATACLIP_SIZE_MB", :integer, 10) * 1_000_000 diff --git a/lib/lightning/extensions/usage_limiter.ex b/lib/lightning/extensions/usage_limiter.ex index ffeb8a42d5..009263cc2b 100644 --- a/lib/lightning/extensions/usage_limiter.ex +++ b/lib/lightning/extensions/usage_limiter.ex @@ -17,7 +17,7 @@ defmodule Lightning.Extensions.UsageLimiter do def get_run_options(context) do [ save_dataclips: Lightning.Projects.save_dataclips?(context.project_id), - run_timeout_ms: Lightning.Config.default_max_run_duration() * 1000 + run_timeout_ms: Lightning.Config.default_max_run_duration_seconds() * 1000 ] end diff --git a/lib/lightning/runs.ex b/lib/lightning/runs.ex index 75d2d82ed4..bd46476944 100644 --- a/lib/lightning/runs.ex +++ b/lib/lightning/runs.ex @@ -345,4 +345,29 @@ defmodule Lightning.Runs do |> order_by([{^order, :timestamp}]) |> Repo.stream() end + + @doc """ + Rolls back claimed runs to available state. + + This is used when a worker socket disconnects after runs have been claimed + but before the worker receives the response. The runs are set back to :available + so they can be claimed by another worker. + """ + @spec rollback_claimed_runs([Run.t()]) :: {:ok, non_neg_integer()} + def rollback_claimed_runs(runs) do + # Set the runs back to :available state so they can be claimed by another worker + run_ids = Enum.map(runs, & &1.id) + + {count, _} = + from(r in Run, where: r.id in ^run_ids) + |> Repo.update_all( + set: [state: :available, claimed_at: nil, worker_name: nil] + ) + + Logger.info( + "Successfully rolled back #{count} claimed runs to :available state" + ) + + {:ok, count} + end end diff --git a/lib/lightning/runs/query.ex b/lib/lightning/runs/query.ex index 563cef5981..f8dc840b13 100644 --- a/lib/lightning/runs/query.ex +++ b/lib/lightning/runs/query.ex @@ -21,15 +21,16 @@ defmodule Lightning.Runs.Query do def lost do now = Lightning.current_time() - grace_period_ms = Lightning.Config.grace_period() * 1000 + grace_period_seconds = Lightning.Config.grace_period_seconds() + grace_period_ms = grace_period_seconds * 1000 # TODO: Remove after live deployment rollouts are done. ==================== - fallback_max = Lightning.Config.default_max_run_duration() + fallback_max_seconds = Lightning.Config.default_max_run_duration_seconds() fallback_oldest_claim = now - |> DateTime.add(-fallback_max, :second) - |> DateTime.add(-grace_period_ms, :millisecond) + |> DateTime.add(-fallback_max_seconds, :second) + |> DateTime.add(-grace_period_seconds, :second) # ========================================================================== diff --git a/lib/lightning/workers.ex b/lib/lightning/workers.ex index 0c0e2ff30c..0342cf6c8a 100644 --- a/lib/lightning/workers.ex +++ b/lib/lightning/workers.ex @@ -82,7 +82,7 @@ defmodule Lightning.Workers do defp calculate_token_expiry(run_timeout_ms) do Lightning.current_time() |> DateTime.add(run_timeout_ms, :millisecond) - |> DateTime.add(Lightning.Config.grace_period()) + |> DateTime.add(Lightning.Config.grace_period_seconds(), :second) |> DateTime.to_unix() end diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex index 76b88cbe09..7c0266d794 100644 --- a/lib/lightning_web/channels/run_channel.ex +++ b/lib/lightning_web/channels/run_channel.ex @@ -31,6 +31,20 @@ defmodule LightningWeb.RunChannel do Runs.get_project_id_for_run(run) do Sentry.Context.set_extra_context(%{run_id: id}) + # Notify the worker channel that this run channel has been joined + # Use PubSub for cross-node communication in clustered environments + case socket.assigns[:worker_id] do + nil -> + # No worker ID available, continue normally + :ok + + worker_id -> + Lightning.broadcast( + "worker_channel:#{worker_id}", + {:run_channel_joined, id, worker_id} + ) + end + {:ok, socket |> assign(%{ diff --git a/lib/lightning_web/channels/worker_channel.ex b/lib/lightning_web/channels/worker_channel.ex index 48373784e4..e8f90e7a73 100644 --- a/lib/lightning_web/channels/worker_channel.ex +++ b/lib/lightning_web/channels/worker_channel.ex @@ -1,6 +1,29 @@ defmodule LightningWeb.WorkerChannel do @moduledoc """ Websocket channel to handle when workers join or claim something to run. + + ## Claim Timeout Mechanism + + When a worker claims runs, the system starts a timeout to ensure the client + joins the corresponding run channels. If the client doesn't join the run + channels within the configured timeout period (default 30 seconds), the + claimed runs are automatically rolled back to :available state so they can + be claimed by another worker. + + This prevents runs from being stuck in :claimed state if the client + disconnects or fails to join the run channels after receiving the claim reply. + + The mechanism supports multiple concurrent claims by tracking each individual run + with its own timeout. This prevents newer claims from overwriting timeout coverage + of earlier claims. + + The mechanism works across a cluster of Lightning nodes using Phoenix.PubSub + for cross-node communication. When a run channel is joined on any node, it + broadcasts a message that the worker channel (potentially on a different node) + can receive and use to cancel the timeout. + + The timeout can be configured via the `:run_channel_join_timeout_seconds` application + environment variable. """ use LightningWeb, :channel @@ -22,7 +45,17 @@ defmodule LightningWeb.WorkerChannel do debounce_time_ms: socket.assigns[:work_listener_debounce_time] ) - {:ok, assign(socket, work_listener_pid: pid)} + # Subscribe to run channel join notifications for this worker + # This allows cross-node communication in clustered environments + worker_id = socket.assigns[:worker_id] || socket.id + Lightning.subscribe("worker_channel:#{worker_id}") + + {:ok, + assign(socket, + work_listener_pid: pid, + worker_id: worker_id, + pending_run_timeouts: %{} + )} end def join("worker:queue", _payload, _socket) do @@ -36,9 +69,10 @@ defmodule LightningWeb.WorkerChannel do socket ) do case Runs.claim(demand, sanitise_worker_name(worker_name)) do - {:ok, runs} -> - runs = - runs + {:ok, original_runs} -> + # Prepare the response data + response_runs = + original_runs |> Enum.map(fn run -> opts = run_options(run) @@ -50,7 +84,46 @@ defmodule LightningWeb.WorkerChannel do } end) - {:reply, {:ok, %{runs: runs}}, socket} + # Check if the socket is still alive + if Process.alive?(socket.transport_pid) do + # Only start timeout if we actually have runs to track + socket = + if Enum.count(original_runs) > 0 do + # Start a timeout for each individual run + timeout_ms = + Lightning.Config.run_channel_join_timeout_seconds() * 1000 + + pending_run_timeouts = socket.assigns[:pending_run_timeouts] || %{} + + # Create a timeout for each run + updated_timeouts = + Enum.reduce(original_runs, pending_run_timeouts, fn run, acc -> + timeout_ref = + :timer.send_after( + timeout_ms, + self(), + {:run_timeout, run.id, run} + ) + + Map.put(acc, run.id, timeout_ref) + end) + + assign(socket, pending_run_timeouts: updated_timeouts) + else + socket + end + + {:reply, {:ok, %{runs: response_runs}}, socket} + else + # Socket is no longer alive, roll back the transaction by setting runs back to :available + Logger.warning( + "Worker socket disconnected before claim reply, rolling back transaction for runs: #{Enum.map_join(original_runs, ", ", & &1.id)}" + ) + + Runs.rollback_claimed_runs(original_runs) + + {:noreply, socket} + end {:error, changeset} -> {:reply, {:error, LightningWeb.ChangesetJSON.errors(changeset)}, socket} @@ -71,6 +144,54 @@ defmodule LightningWeb.WorkerChannel do {:noreply, assign(socket, work_listener_pid: nil)} end + def handle_info({:run_timeout, run_id, run}, socket) do + # Timeout occurred - client didn't join run channel in time + Logger.warning( + "Run timeout reached for run #{run_id}, rolling back transaction" + ) + + {:ok, count} = Runs.rollback_claimed_runs([run]) + Logger.info("Successfully rolled back #{count} run (#{run_id})") + + # Remove this run's timeout from pending_run_timeouts + pending_run_timeouts = socket.assigns[:pending_run_timeouts] || %{} + updated_timeouts = Map.delete(pending_run_timeouts, run_id) + socket = assign(socket, pending_run_timeouts: updated_timeouts) + + {:noreply, socket} + end + + def handle_info({:run_channel_joined, run_id}, socket) do + # Client successfully joined a run channel, cancel the timeout for this specific run + pending_run_timeouts = socket.assigns[:pending_run_timeouts] || %{} + + case Map.get(pending_run_timeouts, run_id) do + nil -> + # No timeout for this run, nothing to do + {:noreply, socket} + + timeout_ref -> + # Cancel the timeout and remove it from tracking + :timer.cancel(timeout_ref) + updated_timeouts = Map.delete(pending_run_timeouts, run_id) + socket = assign(socket, pending_run_timeouts: updated_timeouts) + {:noreply, socket} + end + end + + def handle_info({:run_channel_joined, run_id, worker_id}, socket) do + # Handle PubSub message for run channel join notification + # Only process if this message is for this worker + case socket.assigns[:worker_id] do + ^worker_id -> + handle_info({:run_channel_joined, run_id}, socket) + + _ -> + # Message not for this worker, ignore + {:noreply, socket} + end + end + @impl true def terminate(_reason, socket) do work_listener_pid = socket.assigns[:work_listener_pid] @@ -91,6 +212,37 @@ defmodule LightningWeb.WorkerChannel do end end end + + # Unsubscribe from PubSub + case socket.assigns[:worker_id] do + nil -> + :ok + + worker_id -> + Lightning.unsubscribe("worker_channel:#{worker_id}") + end + + # Clean up any pending run timeouts + pending_run_timeouts = socket.assigns[:pending_run_timeouts] || %{} + + unless Enum.empty?(pending_run_timeouts) do + Enum.each(pending_run_timeouts, fn {run_id, timeout_ref} -> + # Cancel the timeout + :timer.cancel(timeout_ref) + + Logger.warning( + "Worker channel terminating with pending run timeout for run #{run_id}" + ) + end) + + run_ids = Map.keys(pending_run_timeouts) + + if not Enum.empty?(run_ids) do + Logger.warning( + "Worker channel terminating with pending run timeouts for runs: #{Enum.join(run_ids, ", ")}" + ) + end + end end defp sanitise_worker_name(""), do: nil diff --git a/lib/lightning_web/components/layout_components.ex b/lib/lightning_web/components/layout_components.ex index d19aa07aa0..772afaf364 100644 --- a/lib/lightning_web/components/layout_components.ex +++ b/lib/lightning_web/components/layout_components.ex @@ -22,7 +22,7 @@ defmodule LightningWeb.LayoutComponents do )} <% else %> Enum.sort_by(& &1.name)} selected_item={assigns[:project]} placeholder="Go to project" url_func={fn project -> ~p"/projects/#{project.id}/w" end} diff --git a/test/lightning/failure_alert_test.exs b/test/lightning/failure_alert_test.exs index 7a1b053e32..e5cf45d7a0 100644 --- a/test/lightning/failure_alert_test.exs +++ b/test/lightning/failure_alert_test.exs @@ -288,7 +288,7 @@ defmodule Lightning.FailureAlertTest do Lightning.Config.worker_token_signer() ) - expect(Lightning.MockConfig, :default_max_run_duration, fn -> 1 end) + expect(Lightning.MockConfig, :default_max_run_duration_seconds, fn -> 1 end) run_options = UsageLimiter.get_run_options(%Context{ diff --git a/test/lightning/runs/query_test.exs b/test/lightning/runs/query_test.exs index 904cefca47..a5151ee239 100644 --- a/test/lightning/runs/query_test.exs +++ b/test/lightning/runs/query_test.exs @@ -19,10 +19,12 @@ defmodule Lightning.Runs.QueryTest do now = Lightning.current_time() - default_max_run_duration = Lightning.Config.default_max_run_duration() - grace_period = Lightning.Config.grace_period() + default_max_run_duration_seconds = + Lightning.Config.default_max_run_duration_seconds() - default_max = grace_period + default_max_run_duration + grace_period_seconds = Lightning.Config.grace_period_seconds() + + default_max = grace_period_seconds + default_max_run_duration_seconds run_to_be_marked_lost = insert(:run, @@ -102,10 +104,12 @@ defmodule Lightning.Runs.QueryTest do now = Lightning.current_time() - default_max_run_duration = Lightning.Config.default_max_run_duration() - grace_period = Lightning.Config.grace_period() + default_max_run_duration_seconds = + Lightning.Config.default_max_run_duration_seconds() + + grace_period_seconds = Lightning.Config.grace_period_seconds() - default_max = grace_period + default_max_run_duration + default_max = grace_period_seconds + default_max_run_duration_seconds should_be_lost = insert(:run, diff --git a/test/lightning_web/channels/run_channel_test.exs b/test/lightning_web/channels/run_channel_test.exs index 55bb1057a6..c1119cf297 100644 --- a/test/lightning_web/channels/run_channel_test.exs +++ b/test/lightning_web/channels/run_channel_test.exs @@ -26,7 +26,7 @@ defmodule LightningWeb.RunChannelTest do end ) - Mox.stub(Lightning.MockConfig, :default_max_run_duration, fn -> 1 end) + Mox.stub(Lightning.MockConfig, :default_max_run_duration_seconds, fn -> 1 end) :ok end diff --git a/test/lightning_web/channels/worker_channel_test.exs b/test/lightning_web/channels/worker_channel_test.exs index 805f40dbde..d34e7ad5ec 100644 --- a/test/lightning_web/channels/worker_channel_test.exs +++ b/test/lightning_web/channels/worker_channel_test.exs @@ -135,5 +135,86 @@ defmodule LightningWeb.WorkerChannelTest do assert %{worker_name: nil} = Repo.reload!(run) end + + test "rolls back claimed runs when timeout is reached", %{socket: socket} do + %{triggers: [trigger]} = + workflow = insert(:simple_workflow) |> with_snapshot() + + {:ok, %{runs: [%{id: run_id} = run]}} = + WorkOrders.create_for(trigger, + workflow: workflow, + dataclip: params_with_assocs(:dataclip) + ) + + # Set a very short timeout for testing (100ms) + Application.put_env(:lightning, :run_channel_join_timeout_seconds, 0.1) + + ref = + push(socket, "claim", %{"demand" => 1, "worker_name" => "my.pod.name"}) + + assert_reply ref, :ok, %{runs: [%{"id" => ^run_id}]} + + # Verify the run is claimed + run = Repo.reload!(run) + assert run.state == :claimed + + # Manually trigger the timeout by sending the message directly + send(socket.channel_pid, {:claim_timeout, [run]}) + + # Wait a bit for the message to be processed + Process.sleep(100) + + # Verify the run has been rolled back to available + run = Repo.reload!(run) + assert run.state == :available + assert run.claimed_at == nil + assert run.worker_name == nil + + # Clean up + Application.delete_env(:lightning, :run_channel_join_timeout_seconds) + end + + test "cancels timeout when run channel is joined", %{socket: socket} do + %{triggers: [trigger]} = + workflow = insert(:simple_workflow) |> with_snapshot() + + {:ok, %{runs: [%{id: run_id} = run]}} = + WorkOrders.create_for(trigger, + workflow: workflow, + dataclip: params_with_assocs(:dataclip) + ) + + # Set a short timeout for testing + Application.put_env(:lightning, :run_channel_join_timeout_seconds, 1) + + ref = + push(socket, "claim", %{"demand" => 1, "worker_name" => "my.pod.name"}) + + assert_reply ref, :ok, %{runs: [%{"id" => ^run_id, "token" => _token}]} + + # Verify the run is claimed + run = Repo.reload!(run) + assert run.state == :claimed + + # Immediately simulate joining the run channel by sending the notification message + # This should cancel the timeout before it fires + # Use the PubSub message format for cross-node communication + worker_id = socket.assigns[:worker_id] || socket.id + + Lightning.broadcast( + "worker_channel:#{worker_id}", + {:run_channel_joined, run_id, worker_id} + ) + + # Wait a bit for the message to be processed + Process.sleep(100) + + # Verify the run is still claimed (timeout was cancelled) + run = Repo.reload!(run) + assert run.state == :claimed + + # Clean up + Application.delete_env(:lightning, :run_channel_join_timeout_seconds) + end end end diff --git a/test/support/stub_usage_limiter.ex b/test/support/stub_usage_limiter.ex index af753d4e46..f28826a095 100644 --- a/test/support/stub_usage_limiter.ex +++ b/test/support/stub_usage_limiter.ex @@ -37,7 +37,7 @@ defmodule Lightning.Extensions.StubUsageLimiter do @impl true def get_run_options(_context), - do: [run_timeout_ms: Config.default_max_run_duration()] + do: [run_timeout_ms: Config.default_max_run_duration_seconds()] @impl true def get_data_retention_periods(_context) do