Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: jobs, workorder live tests #1278

Merged
merged 7 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions lib/lightning/invocation/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,26 @@ defmodule Lightning.Invocation.Query do
end

@doc """
The last run for a job for a particular exit code, used in scheduler
The last run for a job for a particular exit reason, used in scheduler
"""
@spec runs_with_code(Ecto.Queryable.t(), integer()) :: Ecto.Queryable.t()
def runs_with_code(query, exit_code) do
from(q in query, where: q.exit_code == ^exit_code)
@spec runs_with_reason(Ecto.Queryable.t(), integer()) :: Ecto.Queryable.t()
def runs_with_reason(query, exit_reason) do
from(q in query, where: q.exit_reason == ^exit_reason)
end

@doc """
The last run for a job for a particular exit code, used in scheduler
The last successful run for a job, used in scheduler to enable downstream attempts
to access a previous attempt's state
"""
@spec last_successful_run_for_job(Job.t()) :: Ecto.Queryable.t()
def last_successful_run_for_job(%Job{id: id}) do
last_run_for_job(%Job{id: id})
|> runs_with_code(0)
|> runs_with_reason("success")
end

@doc """
By default, the dataclip body is not returned via a query. This query selects
the body specifically.
"""
def dataclip_with_body() do
from(d in Dataclip,
select: [:id, :body, :type, :project_id, :inserted_at, :updated_at]
)
end
def dataclip_with_body, do: from(d in Dataclip, select: %{d | body: d.body})
end
35 changes: 19 additions & 16 deletions lib/lightning/workflows/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Lightning.Workflows.Scheduler do
The Scheduler is responsible for finding jobs that are ready to run based on
their cron schedule, and then running them.
"""

use Oban.Worker,
queue: :scheduler,
priority: 1,
Expand All @@ -14,9 +15,8 @@ defmodule Lightning.Workflows.Scheduler do

alias Lightning.{
Invocation,
Pipeline,
Repo,
WorkOrderService,
WorkOrders,
Workflows
}

Expand All @@ -36,17 +36,14 @@ defmodule Lightning.Workflows.Scheduler do
date_time
|> Workflows.get_edges_for_cron_execution()
|> Enum.each(fn edge ->
{:ok, %{attempt_run: attempt_run}} = invoke_cronjob(edge)

Pipeline.new(%{attempt_run_id: attempt_run.id})
|> Oban.insert()
{:ok, _workorder} = invoke_cronjob(edge)
end)

:ok
end

@spec invoke_cronjob(Lightning.Workflows.Edge.t()) :: {:ok | :error, map()}
defp invoke_cronjob(%{target_job: job} = edge) do
defp invoke_cronjob(%{target_job: job, source_trigger: trigger} = _edge) do
case last_state_for_job(job.id) do
nil ->
Logger.debug(fn ->
Expand All @@ -60,16 +57,20 @@ defmodule Lightning.Workflows.Scheduler do
# The implementation would look like:
# default_state_for_job(id)
# %{id: uuid, type: :global, body: %{arbitrary: true}}
WorkOrderService.multi_for(
:cron,
edge,
Dataclip.new(%{

dataclip =
%{
type: :global,
body: %{},
project_id: job.workflow.project_id
})
)
|> Repo.transaction()
}
|> Dataclip.new()
|> Repo.insert!()

WorkOrders.create_for(trigger, %{
dataclip: dataclip,
workflow: job.workflow
})

dataclip ->
Logger.debug(fn ->
Expand All @@ -78,8 +79,10 @@ defmodule Lightning.Workflows.Scheduler do
# coveralls-ignore-stop
end)

WorkOrderService.multi_for(:cron, edge, dataclip)
|> Repo.transaction()
WorkOrders.create_for(trigger, %{
dataclip: dataclip,
workflow: job.workflow
})
end
end

Expand Down
9 changes: 8 additions & 1 deletion lib/lightning_web/live/run_live/components.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ defmodule LightningWeb.RunLive.Components do
solid
class="mr-1.5 h-5 w-5 flex-shrink-0 text-green-500"
/>
<% nil -> %>
<Heroicons.clock
solid
class="mr-1.5 h-5 w-5 flex-shrink-0 text-gray-500"
/>
<% val -> %>
<%= val %>
<% end %>
Expand All @@ -81,7 +86,7 @@ defmodule LightningWeb.RunLive.Components do
<span><%= @run.job.name %></span>
</.link>
<div class="flex gap-1">
<%= if @can_rerun_job do %>
<%= if @can_rerun_job && @run.exit_reason do %>
<span
id={@run.id}
class="text-indigo-400 hover:underline hover:underline-offset-2 hover:text-indigo-500 cursor-pointer"
Expand Down Expand Up @@ -356,6 +361,8 @@ defmodule LightningWeb.RunLive.Components do
<.failure_pill class="font-mono font-bold">fail</.failure_pill>
<% "success" -> %>
<.success_pill class="font-mono font-bold">success</.success_pill>
<% nil -> %>
<.pending_pill class="font-mono font-bold">running</.pending_pill>
<% val -> %>
<.other_state_pill class="font-mono font-bold">
<%= val %>
Expand Down
11 changes: 8 additions & 3 deletions lib/lightning_web/live/run_live/index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,16 @@ defmodule LightningWeb.RunLive.Index do
{:selection_toggled, {workorder, selected?}},
%{assigns: assigns} = socket
) do
selected_workorder = %Lightning.WorkOrder{
id: workorder.id,
workflow_id: workorder.workflow_id
}

work_orders =
if selected? do
[workorder | assigns.selected_work_orders]
[selected_workorder | assigns.selected_work_orders]
else
assigns.selected_work_orders -- [workorder]
assigns.selected_work_orders -- [selected_workorder]
end

{:noreply, assign(socket, selected_work_orders: work_orders)}
Expand Down Expand Up @@ -276,7 +281,7 @@ defmodule LightningWeb.RunLive.Index do
work_orders =
if selection do
Enum.map(page.entries, fn entry ->
Map.take(entry, [:id, :workflow_id])
%Lightning.WorkOrder{id: entry.id, workflow_id: entry.workflow_id}
end)
else
[]
Expand Down
2 changes: 1 addition & 1 deletion lib/lightning_web/live/run_live/workorder_component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ defmodule LightningWeb.RunLive.WorkOrderComponent do
for={selection_params(@work_order, @entry_selected)}
phx-change="toggle_selection"
phx-target={@myself}
id={"#{@work_order.id}-selection-form"}
id={"selection-form-#{@work_order.id}"}
>
<%= Phoenix.HTML.Form.checkbox(f, :selected,
id: "select_#{@work_order.id}",
Expand Down
114 changes: 67 additions & 47 deletions test/lightning/jobs_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
defmodule Lightning.JobsTest do
use Lightning.DataCase, async: true

alias Lightning.Attempt
alias Lightning.Invocation
alias Lightning.Jobs
alias Lightning.Repo
alias Lightning.Workflows.Job
Expand All @@ -17,34 +19,46 @@ defmodule Lightning.JobsTest do
assert Jobs.list_jobs() == [Jobs.get_job!(job.id)]
end

test "list_active_cron_jobs/0 returns all active jobs with cron triggers" do
test "list_active_cron_jobs/0 returns all jobs with active cron triggers" do
insert(:job)

workflow = insert(:workflow)

t =
enabled_trigger =
insert(:trigger,
workflow: workflow,
type: :cron,
enabled: true,
cron_expression: "5 0 * 8 *"
)

enabled_job = insert(:job, workflow: workflow)
job_1 = insert(:job, workflow: workflow)

insert(:edge,
workflow: workflow,
source_trigger_id: t.id,
target_job_id: enabled_job.id
source_trigger: enabled_trigger,
target_job: job_1
)

# disabled job
# disabled trigger
disabled_trigger =
insert(:trigger,
workflow: workflow,
type: :cron,
enabled: false,
cron_expression: "5 0 * 8 *"
)

job_2 = insert(:job, workflow: workflow)

insert(:edge,
workflow: workflow,
source_trigger_id: t.id,
target_job: build(:job, workflow: workflow, enabled: false)
source_trigger: disabled_trigger,
target_job: job_2
)

assert Jobs.list_active_cron_jobs() == [Jobs.get_job!(enabled_job.id)]
assert [active_job] = Jobs.list_active_cron_jobs()
assert active_job.id == job_1.id
end

test "get_job!/1 returns the job with given id" do
Expand Down Expand Up @@ -309,19 +323,12 @@ defmodule Lightning.JobsTest do

Scheduler.enqueue_cronjobs()

run = Repo.one(Lightning.Invocation.Run)
attempt = Repo.one(Lightning.Attempt)

assert run.job_id == job.id

run =
%Workflows.Job{id: job.id}
|> Lightning.Invocation.Query.last_successful_run_for_job()
|> Repo.one()
|> Repo.preload(:input_dataclip)
|> Repo.preload(:output_dataclip)
assert attempt.starting_trigger_id == trigger.id

assert run.input_dataclip.type == :global
assert run.input_dataclip.body == %{}
attempt = Repo.preload(attempt, [:dataclip])
assert attempt.dataclip.type == :global
end
end

Expand All @@ -339,41 +346,54 @@ defmodule Lightning.JobsTest do
workflow: job.workflow
})

edge =
insert(:edge, %{
workflow: job.workflow,
source_trigger: trigger,
target_job: job
})

{:ok, %{attempt_run: attempt_run}} =
Lightning.WorkOrderService.multi_for(:cron, edge, insert(:dataclip))
|> Repo.transaction()
insert(:edge, %{
workflow: job.workflow,
source_trigger: trigger,
target_job: job
})

Lightning.Pipeline.process(attempt_run)
dataclip = insert(:dataclip)

attempt =
insert(:attempt,
work_order:
build(:workorder,
workflow: job.workflow,
dataclip: dataclip,
trigger: trigger,
state: :success
),
starting_trigger: trigger,
state: :success,
dataclip: dataclip,
runs: [
build(:run,
exit_reason: "success",
job: job,
input_dataclip: dataclip,
output_dataclip:
build(:dataclip, type: :run_result, body: %{"changed" => true})
)
]
)

old =
%Workflows.Job{id: job.id}
|> Lightning.Invocation.Query.last_successful_run_for_job()
|> Repo.one()
|> Repo.preload(:input_dataclip)
|> Repo.preload(:output_dataclip)
[old_run] = attempt.runs

_result = Scheduler.enqueue_cronjobs()

new =
%Workflows.Job{id: job.id}
|> Lightning.Invocation.Query.last_successful_run_for_job()
new_attempt =
Attempt
|> last(:inserted_at)
|> preload(dataclip: ^Invocation.Query.dataclip_with_body())
|> Repo.one()
|> Repo.preload(:input_dataclip)
|> Repo.preload(:output_dataclip)

assert old.input_dataclip.type == :http_request
assert old.input_dataclip.body == %{}
assert attempt.dataclip.type == :http_request
assert old_run.input_dataclip.type == :http_request
assert old_run.input_dataclip.body == %{}

assert new.input_dataclip.type == :run_result
assert new.input_dataclip.body == old.output_dataclip.body
assert new.output_dataclip.body == %{"changed" => true}
refute new_attempt.id == attempt.id
assert new_attempt.dataclip.type == :run_result
assert new_attempt.dataclip.body == old_run.output_dataclip.body
end
end
end
Loading