Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,16 @@ and this project adheres to
[#1588](https://github.com/OpenFn/lightning/issues/1588)

### Fixed

- Tooltip gets stuck when switching pages
[#3559](https://github.com/OpenFn/lightning/pull/3559)
- Current run dataclip stuck when switching nodes
[#3560](https://github.com/OpenFn/lightning/pull/3560)
- Release `:claimed` runs back to `:available` state if their `run_channel` is
never joined because of worker timeout/network issues. We can do this because
we know that if the run channel never gets joined, the worker never receives
data and cannot begin running the run.
[#3565](https://github.com/OpenFn/lightning/issues/3565)

## [v2.14.4] - 2025-09-09

Expand Down
26 changes: 18 additions & 8 deletions lib/lightning/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ defmodule Lightning.Config do
end

@impl true
def grace_period do
def grace_period_seconds do
Application.get_env(:lightning, :run_grace_period_seconds)
end

@impl true
def default_max_run_duration do
def default_max_run_duration_seconds do
Application.get_env(:lightning, :max_run_duration_seconds)
end

Expand Down Expand Up @@ -324,6 +324,11 @@ defmodule Lightning.Config do
Application.get_env(:lightning, :per_workflow_claim_limit, 50)
end

@impl true
def run_channel_join_timeout_seconds do
Application.get_env(:lightning, :run_channel_join_timeout_seconds, 30)
end

@impl true
def metrics_run_performance_age_seconds do
metrics_config() |> Keyword.get(:run_performance_age_seconds)
Expand Down Expand Up @@ -397,11 +402,11 @@ defmodule Lightning.Config do
@callback apollo(key :: atom() | nil) :: map()
@callback check_flag?(atom()) :: boolean() | nil
@callback cors_origin() :: list()
@callback default_max_run_duration() :: integer()
@callback default_max_run_duration_seconds() :: integer()
@callback email_sender_name() :: String.t()
@callback get_extension_mod(key :: atom()) :: any()
@callback google(key :: atom()) :: any()
@callback grace_period() :: integer()
@callback grace_period_seconds() :: integer()
@callback instance_admin_email() :: String.t()
@callback kafka_alternate_storage_enabled?() :: boolean()
@callback kafka_alternate_storage_file_path() :: String.t()
Expand Down Expand Up @@ -447,6 +452,7 @@ defmodule Lightning.Config do
@callback external_metrics_module() :: module() | nil
@callback ai_assistant_modes() :: %{atom() => module()}
@callback per_workflow_claim_limit() :: pos_integer()
@callback run_channel_join_timeout_seconds() :: pos_integer()
@callback sentry() :: module()
@callback webhook_retry() :: Keyword.t()
@callback webhook_retry(key :: atom()) :: any()
Expand Down Expand Up @@ -493,15 +499,15 @@ defmodule Lightning.Config do

The returned value is in seconds.
"""
def grace_period do
impl().grace_period()
def grace_period_seconds do
impl().grace_period_seconds()
end

@doc """
Returns the default maximum run duration in seconds.
"""
def default_max_run_duration do
impl().default_max_run_duration()
def default_max_run_duration_seconds do
impl().default_max_run_duration_seconds()
end

def repo_connection_token_signer do
Expand Down Expand Up @@ -684,6 +690,10 @@ defmodule Lightning.Config do
impl().per_workflow_claim_limit()
end

def run_channel_join_timeout_seconds do
impl().run_channel_join_timeout_seconds()
end

def sentry do
impl().sentry()
end
Expand Down
4 changes: 4 additions & 0 deletions lib/lightning/config/bootstrap.ex
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,10 @@ defmodule Lightning.Config.Bootstrap do
:max_run_duration_seconds,
env!("WORKER_MAX_RUN_DURATION_SECONDS", :integer, 300)

config :lightning,
:run_channel_join_timeout_seconds,
env!("RUN_CHANNEL_JOIN_TIMEOUT_SECONDS", :integer, 30)

config :lightning,
:max_dataclip_size_bytes,
env!("MAX_DATACLIP_SIZE_MB", :integer, 10) * 1_000_000
Expand Down
2 changes: 1 addition & 1 deletion lib/lightning/extensions/usage_limiter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Lightning.Extensions.UsageLimiter do
def get_run_options(context) do
[
save_dataclips: Lightning.Projects.save_dataclips?(context.project_id),
run_timeout_ms: Lightning.Config.default_max_run_duration() * 1000
run_timeout_ms: Lightning.Config.default_max_run_duration_seconds() * 1000
]
end

Expand Down
25 changes: 25 additions & 0 deletions lib/lightning/runs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -345,4 +345,29 @@ defmodule Lightning.Runs do
|> order_by([{^order, :timestamp}])
|> Repo.stream()
end

@doc """
Rolls back claimed runs to available state.

This is used when a worker socket disconnects after runs have been claimed
but before the worker receives the response. The runs are set back to :available
so they can be claimed by another worker.
"""
@spec rollback_claimed_runs([Run.t()]) :: {:ok, non_neg_integer()}
def rollback_claimed_runs(runs) do
# Set the runs back to :available state so they can be claimed by another worker
run_ids = Enum.map(runs, & &1.id)

{count, _} =
from(r in Run, where: r.id in ^run_ids)
|> Repo.update_all(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 To avoid race conditions, it may be worth locking each record, checking that the record should be rolled back and then rolling it back.

set: [state: :available, claimed_at: nil, worker_name: nil]
)

Logger.info(
"Successfully rolled back #{count} claimed runs to :available state"
)

{:ok, count}
end
end
9 changes: 5 additions & 4 deletions lib/lightning/runs/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@ defmodule Lightning.Runs.Query do
def lost do
now = Lightning.current_time()

grace_period_ms = Lightning.Config.grace_period() * 1000
grace_period_seconds = Lightning.Config.grace_period_seconds()
grace_period_ms = grace_period_seconds * 1000

# TODO: Remove after live deployment rollouts are done. ====================
fallback_max = Lightning.Config.default_max_run_duration()
fallback_max_seconds = Lightning.Config.default_max_run_duration_seconds()

fallback_oldest_claim =
now
|> DateTime.add(-fallback_max, :second)
|> DateTime.add(-grace_period_ms, :millisecond)
|> DateTime.add(-fallback_max_seconds, :second)
|> DateTime.add(-grace_period_seconds, :second)

# ==========================================================================

Expand Down
2 changes: 1 addition & 1 deletion lib/lightning/workers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ defmodule Lightning.Workers do
defp calculate_token_expiry(run_timeout_ms) do
Lightning.current_time()
|> DateTime.add(run_timeout_ms, :millisecond)
|> DateTime.add(Lightning.Config.grace_period())
|> DateTime.add(Lightning.Config.grace_period_seconds(), :second)
|> DateTime.to_unix()
end

Expand Down
14 changes: 14 additions & 0 deletions lib/lightning_web/channels/run_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ defmodule LightningWeb.RunChannel do
Runs.get_project_id_for_run(run) do
Sentry.Context.set_extra_context(%{run_id: id})

# Notify the worker channel that this run channel has been joined
# Use PubSub for cross-node communication in clustered environments
case socket.assigns[:worker_id] do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 run_channel_test.exs does not seem to have been updated to cover this additional functionality?

nil ->
# No worker ID available, continue normally
:ok

worker_id ->
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 Are we vulnerable to a race condition here> E.g, we retrieve the run, broadcast a message that the timer can be reset and then send data back to the worker? In the gap between the broadcast and something acting on it, the timeout could kick in, unclaiming the run and potentially allowing situation where two workers are running it?

This feels like the 'ack-ack' scenario (although we can use the run channel response as the 'ack-ack' - and I think that addresses your question about the Worker A, Worker B scenario.

When a worker joins the run channel, I think that Lightning should confirm that the run is still claimed, lock the run and apply a change that makes rollback impossible (it feels like the responsible way to do this would be to introduce a new state), and then only send the :ok back to the worker.

If none of these conditions are true, the Worker gets an error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rorymckinley , would this be handled better by ensuring that a run channel can only be joined once? or by a single worker? (this was a thought I had in the initial PR.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eish, or would that prevent workers from re-connecting to the run channel if they got temporarily disconnected?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@taylordowns2000 Even if a single-connect is an option I do not think it would solve the problem entirely and may introduce a new problem:

Showing my workings, apologies for the repetition of what I said before.

I can't see any obvious place where we are checking the state of a run before we send stuff back to the worker (probably never needed it before?) - so race 1 would be that the run has already been rolled back (essentially Worker A sans Worker B). A single connect does not seem to solve this.

Race 2 would be that the run has not been rolled back but in between the broadcast and the action the run gets rolled back. Using the Highlander approach you suggest would prevent another worker picking up the run once it has rolled back but......

  • The first worker would think that it is ok to work on this, but Lightning thinks it is available. What happens when the worker sends through 'started' - would Lightning need a moment to compose itself?
  • If we only allow one connection to the run, would that block legitimate cases where we do want another Worker to connect. So, let us say that Worker A does connect too late, the run has been rolled back but not claimed yet. In this case, we would tell Worker A to fly a kite and we would want Worker B to be able to connect, but the Highlander principle would block it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, oops: I forgot to add that I do not know enough about workers to know know if it will reconnect mid-run to the run channel - @josephjclark ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eish. Yeah, it does sound like adding another state (:locked?) might be useful. Let's see what @stuartc and @josephjclark think here before I invest more time in this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:confirmed ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the worker will just reconnect when the connection is lost. The socket should handle this. I'll test locally later and see if I can confirm.

Lightning.broadcast(
"worker_channel:#{worker_id}",
{:run_channel_joined, id, worker_id}
)
end

{:ok,
socket
|> assign(%{
Expand Down
Loading