Skip to content

Commit

Permalink
wip: job tests
Browse files Browse the repository at this point in the history
  • Loading branch information
midigofrank committed Nov 6, 2023
1 parent 44fa59b commit 9ebb6ca
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 32 deletions.
2 changes: 1 addition & 1 deletion lib/lightning/workflows/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Lightning.Workflows.Query do
from(e in Edge,
join: j in assoc(e, :target_job),
join: t in assoc(e, :source_trigger),
where: t.type == :cron and t.enabled,
where: t.type == :cron and t.enabled and j.enabled,
preload: [:source_trigger, [target_job: :workflow]]
)
end
Expand Down
33 changes: 19 additions & 14 deletions lib/lightning/workflows/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule Lightning.Workflows.Scheduler do
The Scheduler is responsible for finding jobs that are ready to run based on
their cron schedule, and then running them.
"""
alias Lightning.WorkOrders

use Oban.Worker,
queue: :scheduler,
priority: 1,
Expand Down Expand Up @@ -36,17 +38,14 @@ defmodule Lightning.Workflows.Scheduler do
date_time
|> Workflows.get_edges_for_cron_execution()
|> Enum.each(fn edge ->
{:ok, %{attempt_run: attempt_run}} = invoke_cronjob(edge)

Pipeline.new(%{attempt_run_id: attempt_run.id})
|> Oban.insert()
{:ok, _workorder} = invoke_cronjob(edge)
end)

:ok
end

@spec invoke_cronjob(Lightning.Workflows.Edge.t()) :: {:ok | :error, map()}
defp invoke_cronjob(%{target_job: job} = edge) do
defp invoke_cronjob(%{target_job: job, source_trigger: trigger} = _edge) do
case last_state_for_job(job.id) do
nil ->
Logger.debug(fn ->
Expand All @@ -60,16 +59,20 @@ defmodule Lightning.Workflows.Scheduler do
# The implementation would look like:
# default_state_for_job(id)
# %{id: uuid, type: :global, body: %{arbitrary: true}}
WorkOrderService.multi_for(
:cron,
edge,
Dataclip.new(%{

dataclip =
%{
type: :global,
body: %{},
project_id: job.workflow.project_id
})
)
|> Repo.transaction()
}
|> Dataclip.new()
|> Repo.insert!()

WorkOrders.create_for(trigger, %{
dataclip: dataclip,
workflow: job.workflow
})

dataclip ->
Logger.debug(fn ->
Expand All @@ -78,8 +81,10 @@ defmodule Lightning.Workflows.Scheduler do
# coveralls-ignore-stop
end)

WorkOrderService.multi_for(:cron, edge, dataclip)
|> Repo.transaction()
WorkOrders.create_for(trigger, %{
dataclip: dataclip,
workflow: job.workflow
})
end
end

Expand Down
46 changes: 29 additions & 17 deletions test/lightning/jobs_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ defmodule Lightning.JobsTest do
target_job: build(:job, workflow: workflow, enabled: false)
)

assert Jobs.list_active_cron_jobs() == [Jobs.get_job!(enabled_job.id)]
assert [active_job] = Jobs.list_active_cron_jobs()
assert active_job.id == enabled_job.id
end

test "get_job!/1 returns the job with given id" do
Expand Down Expand Up @@ -309,19 +310,12 @@ defmodule Lightning.JobsTest do

Scheduler.enqueue_cronjobs()

run = Repo.one(Lightning.Invocation.Run)
attempt = Repo.one(Lightning.Attempt)

assert run.job_id == job.id
assert attempt.starting_trigger_id == trigger.id

run =
%Workflows.Job{id: job.id}
|> Lightning.Invocation.Query.last_successful_run_for_job()
|> Repo.one()
|> Repo.preload(:input_dataclip)
|> Repo.preload(:output_dataclip)

assert run.input_dataclip.type == :global
assert run.input_dataclip.body == %{}
attempt = Repo.preload(attempt, [:dataclip])
assert attempt.dataclip.type == :global
end
end

Expand All @@ -346,11 +340,29 @@ defmodule Lightning.JobsTest do
target_job: job
})

{:ok, %{attempt_run: attempt_run}} =
Lightning.WorkOrderService.multi_for(:cron, edge, insert(:dataclip))
|> Repo.transaction()

Lightning.Pipeline.process(attempt_run)
dataclip = insert(:dataclip)

attempt =
insert(:attempt,
workorder:
build(:workorder,
workflow: job.workflow,
dataclip: dataclip,
trigger: trigger,
state: :success
),
starting_trigger: trigger,
state: :success,
dataclip: dataclip,
runs: [
build(:run,
exit_code: 0,
job: job,
input_dataclip: dataclip,
output_dataclip: build(:dataclip, body: %{"changed" => true})
)
]
)

old =
%Workflows.Job{id: job.id}
Expand Down

0 comments on commit 9ebb6ca

Please sign in to comment.