Skip to content
This repository has been archived by the owner on May 14, 2018. It is now read-only.

Commit

Permalink
Merge 878d4cf into 6f8f92f
Browse files Browse the repository at this point in the history
  • Loading branch information
maennchen committed Jan 25, 2018
2 parents 6f8f92f + 878d4cf commit aa2661f
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 82 deletions.
3 changes: 1 addition & 2 deletions .formatter.exs
@@ -1,4 +1,3 @@
[
inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
inputs: [".formatter.exs", "mix.exs", "{config,lib,test}/**/*.{ex,exs}"]
]

40 changes: 27 additions & 13 deletions .travis.yml
Expand Up @@ -3,24 +3,38 @@ sudo: false
env:
- MIX_ENV=test
elixir:
- 1.5.1
- 1.5
- 1.6
otp_release:
- 18.3
- 19.3
- 20.0
before_script:
- mix compile
- travis_wait mix dialyzer --plt
script:
- mix test
- mix credo --strict
- mix dialyzer --halt-exit-status
after_success:
- "mix compile && mix coveralls.travis"
after_script:
- MIX_ENV=docs mix deps.get
- MIX_ENV=docs mix inch.report
- mix coveralls.travis
cache:
directories:
- ~/.mix
- ~/.hex

jobs:
include:
- stage: format
env:
- MIX_ENV=dev
script: mix format --check-formatted
elixir: 1.6
- stage: credo
env:
- MIX_ENV=dev
script: mix credo --strict
elixir: 1.6
- stage: dialyzer
env:
- MIX_ENV=dev
before_script: travis_wait mix dialyzer --plt
script: mix dialyzer --halt-exit-status
elixir: 1.6
- stage: inch
env:
- MIX_ENV=docs
script: mix inch.report
elixir: 1.6
18 changes: 9 additions & 9 deletions lib/quantum.ex
Expand Up @@ -73,16 +73,16 @@ defmodule Quantum do
defp remove_jobs_with_duplicate_names(job_list, quantum) do
job_list
|> Enum.reduce(%{}, fn %Job{name: name} = job, acc ->
if Enum.member?(Map.keys(acc), name) do
Logger.warn(
"Job with name '#{name}' of quantum '#{quantum}' not started due to duplicate job name"
)
if Enum.member?(Map.keys(acc), name) do
Logger.warn(
"Job with name '#{name}' of quantum '#{quantum}' not started due to duplicate job name"
)

acc
else
Map.put_new(acc, name, job)
end
end)
acc
else
Map.put_new(acc, name, job)
end
end)
|> Map.values()
end
end
48 changes: 28 additions & 20 deletions lib/quantum/execution_broadcaster.ex
Expand Up @@ -46,7 +46,9 @@ defmodule Quantum.ExecutionBroadcaster do

for {_, job} <- reboot_add_events do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Scheduling job for single reboot execution: #{inspect(job.name)}"
"[#{inspect(Node.self())}][#{__MODULE__}] Scheduling job for single reboot execution: #{
inspect(job.name)
}"
end)
end

Expand All @@ -70,13 +72,16 @@ defmodule Quantum.ExecutionBroadcaster do
state =
jobs_to_execute
|> (fn jobs ->
for job <- jobs do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Schedluling job for execution #{inspect(job.name)}"
end)
end
jobs
end).()
for job <- jobs do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Schedluling job for execution #{
inspect(job.name)
}"
end)
end

jobs
end).()
|> Enum.reduce(state, &add_job_to_state/2)
|> sort_state
|> reset_timer
Expand All @@ -100,12 +105,12 @@ defmodule Quantum.ExecutionBroadcaster do
jobs =
jobs
|> Enum.map(fn {date, job_list} ->
{date, Enum.reject(job_list, &(&1.name == name))}
end)
{date, Enum.reject(job_list, &(&1.name == name))}
end)
|> Enum.reject(fn
{_, []} -> true
{_, _} -> false
end)
{_, []} -> true
{_, _} -> false
end)

%{state | jobs: jobs}
end
Expand Down Expand Up @@ -144,13 +149,14 @@ defmodule Quantum.ExecutionBroadcaster do
defp add_to_state(%{jobs: jobs} = state, date, job) do
%{
state
| jobs: case Enum.find_index(jobs, fn {run_date, _} -> run_date == date end) do
nil ->
[{date, [job]} | jobs]

index ->
List.update_at(jobs, index, fn {run_date, old} -> {run_date, [job | old]} end)
end
| jobs:
case Enum.find_index(jobs, fn {run_date, _} -> run_date == date end) do
nil ->
[{date, [job]} | jobs]

index ->
List.update_at(jobs, index, fn {run_date, old} -> {run_date, [job | old]} end)
end
}
end

Expand All @@ -172,6 +178,7 @@ defmodule Quantum.ExecutionBroadcaster do
:eq ->
send(self(), :execute)
nil

_ ->
monotonic_time =
run_date
Expand All @@ -191,6 +198,7 @@ defmodule Quantum.ExecutionBroadcaster do
case NaiveDateTime.compare(run_date, old_date) do
:eq ->
state

_ ->
Process.cancel_timer(timer)
reset_timer(Map.put(state, :timer, nil))
Expand Down
22 changes: 13 additions & 9 deletions lib/quantum/executor.ex
Expand Up @@ -59,14 +59,14 @@ defmodule Quantum.Executor do
|> Enum.filter(&check_node(&1, task_supervisor, job))
|> Enum.map(&run(&1, job, task_supervisor))
|> Enum.each(fn {node, %Task{ref: ref}} ->
receive do
{^ref, _} ->
TaskRegistry.mark_finished(task_registry, job.name, node)
receive do
{^ref, _} ->
TaskRegistry.mark_finished(task_registry, job.name, node)

{:DOWN, ^ref, _, _, _} ->
TaskRegistry.mark_finished(task_registry, job.name, node)
end
end)
{:DOWN, ^ref, _, _, _} ->
TaskRegistry.mark_finished(task_registry, job.name, node)
end
end)

:ok
end
Expand All @@ -75,7 +75,9 @@ defmodule Quantum.Executor do
@spec run(Node.t(), Job.t(), GenServer.server()) :: {Node.t(), Task.t()}
defp run(node, job, task_supervisor) do
Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job.name)} started on node #{inspect(node)}"
"[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job.name)} started on node #{
inspect(node)
}"
end)

{
Expand All @@ -88,7 +90,9 @@ defmodule Quantum.Executor do
result = execute_task(job.task)

Logger.debug(fn ->
"[#{inspect(Node.self())}][#{__MODULE__}] Execution ended for job #{inspect(job.name)}, which yielded result: #{inspect(result)}"
"[#{inspect(Node.self())}][#{__MODULE__}] Execution ended for job #{inspect(job.name)}, which yielded result: #{
inspect(result)
}"
end)

:ok
Expand Down
6 changes: 3 additions & 3 deletions lib/quantum/executor_supervisor.ex
Expand Up @@ -12,9 +12,9 @@ defmodule Quantum.ExecutorSupervisor do
def start_link(name, execution_broadcaster, task_supervisor, task_registry) do
__MODULE__
|> ConsumerSupervisor.start_link(
{execution_broadcaster, task_supervisor, task_registry},
name: name
)
{execution_broadcaster, task_supervisor, task_registry},
name: name
)
|> Util.start_or_link()
end

Expand Down
1 change: 1 addition & 0 deletions lib/quantum/job.ex
Expand Up @@ -56,6 +56,7 @@ defmodule Quantum.Job do
{module, option} -> {module, option}
module -> {module, nil}
end

with run_strategy <- run_strategy_name.normalize_config!(options),
name <- make_ref(),
overlap when is_boolean(overlap) <- Keyword.fetch!(config, :overlap),
Expand Down
6 changes: 3 additions & 3 deletions lib/quantum/job_bradcaster.ex
Expand Up @@ -118,9 +118,9 @@ defmodule Quantum.JobBroadcaster do
messages =
jobs
|> Enum.filter(fn
{_name, %Job{state: :active}} -> true
{_name, _job} -> false
end)
{_name, %Job{state: :active}} -> true
{_name, _job} -> false
end)
|> Enum.map(fn {name, _job} -> {:remove, name} end)

{:noreply, messages, %{state | jobs: %{}}}
Expand Down
1 change: 1 addition & 0 deletions lib/quantum/normalizer.ex
Expand Up @@ -111,6 +111,7 @@ defmodule Quantum.Normalizer do
defp normalize_run_strategy(strategy) when is_atom(strategy) do
strategy.normalize_config!(nil)
end

defp normalize_run_strategy({strategy, options}) when is_atom(strategy) do
strategy.normalize_config!(options)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/quantum/task_stages_supervisor.ex
Expand Up @@ -11,7 +11,7 @@ defmodule Quantum.TaskStagesSupervisor do
@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts) do
__MODULE__
|> Supervisor.start_link(opts, [name: Keyword.fetch!(opts, :task_stages_supervisor)])
|> Supervisor.start_link(opts, name: Keyword.fetch!(opts, :task_stages_supervisor))
|> Util.start_or_link()
end

Expand Down Expand Up @@ -50,7 +50,7 @@ defmodule Quantum.TaskStagesSupervisor do
Keyword.fetch!(opts, :task_supervisor),
Keyword.fetch!(opts, :task_registry)
}
},
}
],
strategy: :rest_for_one
)
Expand Down
22 changes: 11 additions & 11 deletions test/quantum/executor_test.exs
Expand Up @@ -93,9 +93,9 @@ defmodule Quantum.ExecutorTest do
job =
TestScheduler.new_job()
|> Job.set_task(fn ->
Process.sleep(50)
send(caller, :executed)
end)
Process.sleep(50)
send(caller, :executed)
end)
|> Job.set_overlap(false)

capture_log(fn ->
Expand All @@ -116,16 +116,16 @@ defmodule Quantum.ExecutorTest do
job =
TestScheduler.new_job()
|> Job.set_task(fn ->
Process.sleep(50)
send(caller, :executed)
end)
send(caller, :starting)
Process.sleep(50)
send(caller, :executed)
end)
|> Job.set_overlap(false)

capture_log(fn ->
Executor.start_link({task_supervisor, task_registry}, {:execute, job})

# Wait until running
Process.sleep(25)
assert_receive :starting

assert :already_running = TaskRegistry.mark_running(task_registry, job.name, Node.self())

Expand All @@ -147,10 +147,10 @@ defmodule Quantum.ExecutorTest do

# Mute Error
capture_log(fn ->
Executor.start_link({task_supervisor, task_registry}, {:execute, job})
Executor.start_link({task_supervisor, task_registry}, {:execute, job})

Process.sleep(50)
end)
Process.sleep(50)
end)

assert :marked_running = TaskRegistry.mark_running(task_registry, job.name, Node.self())
end
Expand Down
6 changes: 3 additions & 3 deletions test/quantum/scheduler_test.exs
Expand Up @@ -90,7 +90,7 @@ defmodule Quantum.SchedulerTest do
assert Enum.any?(Scheduler.jobs(), fn {_, %Job{schedule: schedule, task: task}} ->
schedule == spec && task == fun
end)
end)
end)
end

@tag schedulers: [Scheduler]
Expand All @@ -111,7 +111,7 @@ defmodule Quantum.SchedulerTest do
:test_job,
%{job | run_strategy: %Random{nodes: :cluster}}
})
end)
end)
end

@tag schedulers: [Scheduler]
Expand All @@ -132,7 +132,7 @@ defmodule Quantum.SchedulerTest do
:ticker,
%{job | run_strategy: %Random{nodes: :cluster}}
})
end)
end)
end

@tag schedulers: [Scheduler]
Expand Down
13 changes: 6 additions & 7 deletions test/quantum_startup_test.exs
Expand Up @@ -16,13 +16,12 @@ defmodule QuantumStartupTest do
@tag :startup
test "prevent duplicate job names on startup" do
capture_log(fn ->
test_jobs =
[
{:test_job, [schedule: ~e[1 * * * *], task: fn -> :ok end]},
{:test_job, [schedule: ~e[2 * * * *], task: fn -> :ok end]},
{"3 * * * *", fn -> :ok end},
{"4 * * * *", fn -> :ok end}
]
test_jobs = [
{:test_job, [schedule: ~e[1 * * * *], task: fn -> :ok end]},
{:test_job, [schedule: ~e[2 * * * *], task: fn -> :ok end]},
{"3 * * * *", fn -> :ok end},
{"4 * * * *", fn -> :ok end}
]

Application.put_env(:quantum_startup_test, QuantumStartupTest.Scheduler, jobs: test_jobs)

Expand Down

0 comments on commit aa2661f

Please sign in to comment.