diff --git a/.formatter.exs b/.formatter.exs index 650bb88..b11432d 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -1,4 +1,3 @@ [ - inputs: ["mix.exs", "{config,lib,test}/**/*.{ex,exs}"] + inputs: [".formatter.exs", "mix.exs", "{config,lib,test}/**/*.{ex,exs}"] ] - diff --git a/.travis.yml b/.travis.yml index 173cfdd..0e3be8e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,24 +3,38 @@ sudo: false env: - MIX_ENV=test elixir: - - 1.5.1 + - 1.5 + - 1.6 otp_release: - - 18.3 - 19.3 - 20.0 before_script: - - mix compile - - travis_wait mix dialyzer --plt -script: - - mix test - - mix credo --strict - - mix dialyzer --halt-exit-status -after_success: - - "mix compile && mix coveralls.travis" -after_script: - - MIX_ENV=docs mix deps.get - - MIX_ENV=docs mix inch.report + - mix coveralls.travis cache: directories: - ~/.mix - ~/.hex + +jobs: + include: + - stage: format + env: + - MIX_ENV=dev + script: mix format --check-formatted + elixir: 1.6 + - stage: credo + env: + - MIX_ENV=dev + script: mix credo --strict + elixir: 1.6 + - stage: dialyzer + env: + - MIX_ENV=dev + before_script: travis_wait mix dialyzer --plt + script: mix dialyzer --halt-exit-status + elixir: 1.6 + - stage: inch + env: + - MIX_ENV=docs + script: mix inch.report + elixir: 1.6 diff --git a/lib/quantum.ex b/lib/quantum.ex index f98ebc8..d31fc69 100644 --- a/lib/quantum.ex +++ b/lib/quantum.ex @@ -73,16 +73,16 @@ defmodule Quantum do defp remove_jobs_with_duplicate_names(job_list, quantum) do job_list |> Enum.reduce(%{}, fn %Job{name: name} = job, acc -> - if Enum.member?(Map.keys(acc), name) do - Logger.warn( - "Job with name '#{name}' of quantum '#{quantum}' not started due to duplicate job name" - ) + if Enum.member?(Map.keys(acc), name) do + Logger.warn( + "Job with name '#{name}' of quantum '#{quantum}' not started due to duplicate job name" + ) - acc - else - Map.put_new(acc, name, job) - end - end) + acc + else + Map.put_new(acc, name, job) + end + end) |> Map.values() end end diff --git a/lib/quantum/execution_broadcaster.ex b/lib/quantum/execution_broadcaster.ex index 799fc3e..d56a7df 100644 --- a/lib/quantum/execution_broadcaster.ex +++ b/lib/quantum/execution_broadcaster.ex @@ -46,7 +46,9 @@ defmodule Quantum.ExecutionBroadcaster do for {_, job} <- reboot_add_events do Logger.debug(fn -> - "[#{inspect(Node.self())}][#{__MODULE__}] Scheduling job for single reboot execution: #{inspect(job.name)}" + "[#{inspect(Node.self())}][#{__MODULE__}] Scheduling job for single reboot execution: #{ + inspect(job.name) + }" end) end @@ -70,13 +72,16 @@ defmodule Quantum.ExecutionBroadcaster do state = jobs_to_execute |> (fn jobs -> - for job <- jobs do - Logger.debug(fn -> - "[#{inspect(Node.self())}][#{__MODULE__}] Schedluling job for execution #{inspect(job.name)}" - end) - end - jobs - end).() + for job <- jobs do + Logger.debug(fn -> + "[#{inspect(Node.self())}][#{__MODULE__}] Schedluling job for execution #{ + inspect(job.name) + }" + end) + end + + jobs + end).() |> Enum.reduce(state, &add_job_to_state/2) |> sort_state |> reset_timer @@ -100,12 +105,12 @@ defmodule Quantum.ExecutionBroadcaster do jobs = jobs |> Enum.map(fn {date, job_list} -> - {date, Enum.reject(job_list, &(&1.name == name))} - end) + {date, Enum.reject(job_list, &(&1.name == name))} + end) |> Enum.reject(fn - {_, []} -> true - {_, _} -> false - end) + {_, []} -> true + {_, _} -> false + end) %{state | jobs: jobs} end @@ -144,13 +149,14 @@ defmodule Quantum.ExecutionBroadcaster do defp add_to_state(%{jobs: jobs} = state, date, job) do %{ state - | jobs: case Enum.find_index(jobs, fn {run_date, _} -> run_date == date end) do - nil -> - [{date, [job]} | jobs] - - index -> - List.update_at(jobs, index, fn {run_date, old} -> {run_date, [job | old]} end) - end + | jobs: + case Enum.find_index(jobs, fn {run_date, _} -> run_date == date end) do + nil -> + [{date, [job]} | jobs] + + index -> + List.update_at(jobs, index, fn {run_date, old} -> {run_date, [job | old]} end) + end } end @@ -172,6 +178,7 @@ defmodule Quantum.ExecutionBroadcaster do :eq -> send(self(), :execute) nil + _ -> monotonic_time = run_date @@ -191,6 +198,7 @@ defmodule Quantum.ExecutionBroadcaster do case NaiveDateTime.compare(run_date, old_date) do :eq -> state + _ -> Process.cancel_timer(timer) reset_timer(Map.put(state, :timer, nil)) diff --git a/lib/quantum/executor.ex b/lib/quantum/executor.ex index 503a76a..c22680b 100644 --- a/lib/quantum/executor.ex +++ b/lib/quantum/executor.ex @@ -59,14 +59,14 @@ defmodule Quantum.Executor do |> Enum.filter(&check_node(&1, task_supervisor, job)) |> Enum.map(&run(&1, job, task_supervisor)) |> Enum.each(fn {node, %Task{ref: ref}} -> - receive do - {^ref, _} -> - TaskRegistry.mark_finished(task_registry, job.name, node) + receive do + {^ref, _} -> + TaskRegistry.mark_finished(task_registry, job.name, node) - {:DOWN, ^ref, _, _, _} -> - TaskRegistry.mark_finished(task_registry, job.name, node) - end - end) + {:DOWN, ^ref, _, _, _} -> + TaskRegistry.mark_finished(task_registry, job.name, node) + end + end) :ok end @@ -75,7 +75,9 @@ defmodule Quantum.Executor do @spec run(Node.t(), Job.t(), GenServer.server()) :: {Node.t(), Task.t()} defp run(node, job, task_supervisor) do Logger.debug(fn -> - "[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job.name)} started on node #{inspect(node)}" + "[#{inspect(Node.self())}][#{__MODULE__}] Task for job #{inspect(job.name)} started on node #{ + inspect(node) + }" end) { @@ -88,7 +90,9 @@ defmodule Quantum.Executor do result = execute_task(job.task) Logger.debug(fn -> - "[#{inspect(Node.self())}][#{__MODULE__}] Execution ended for job #{inspect(job.name)}, which yielded result: #{inspect(result)}" + "[#{inspect(Node.self())}][#{__MODULE__}] Execution ended for job #{inspect(job.name)}, which yielded result: #{ + inspect(result) + }" end) :ok diff --git a/lib/quantum/executor_supervisor.ex b/lib/quantum/executor_supervisor.ex index 0bed7f2..b2d1352 100644 --- a/lib/quantum/executor_supervisor.ex +++ b/lib/quantum/executor_supervisor.ex @@ -12,9 +12,9 @@ defmodule Quantum.ExecutorSupervisor do def start_link(name, execution_broadcaster, task_supervisor, task_registry) do __MODULE__ |> ConsumerSupervisor.start_link( - {execution_broadcaster, task_supervisor, task_registry}, - name: name - ) + {execution_broadcaster, task_supervisor, task_registry}, + name: name + ) |> Util.start_or_link() end diff --git a/lib/quantum/job.ex b/lib/quantum/job.ex index d0fb2a3..4c26a22 100644 --- a/lib/quantum/job.ex +++ b/lib/quantum/job.ex @@ -56,6 +56,7 @@ defmodule Quantum.Job do {module, option} -> {module, option} module -> {module, nil} end + with run_strategy <- run_strategy_name.normalize_config!(options), name <- make_ref(), overlap when is_boolean(overlap) <- Keyword.fetch!(config, :overlap), diff --git a/lib/quantum/job_bradcaster.ex b/lib/quantum/job_bradcaster.ex index 928233d..ca2e2aa 100644 --- a/lib/quantum/job_bradcaster.ex +++ b/lib/quantum/job_bradcaster.ex @@ -118,9 +118,9 @@ defmodule Quantum.JobBroadcaster do messages = jobs |> Enum.filter(fn - {_name, %Job{state: :active}} -> true - {_name, _job} -> false - end) + {_name, %Job{state: :active}} -> true + {_name, _job} -> false + end) |> Enum.map(fn {name, _job} -> {:remove, name} end) {:noreply, messages, %{state | jobs: %{}}} diff --git a/lib/quantum/normalizer.ex b/lib/quantum/normalizer.ex index 769f0a3..d865f78 100644 --- a/lib/quantum/normalizer.ex +++ b/lib/quantum/normalizer.ex @@ -111,6 +111,7 @@ defmodule Quantum.Normalizer do defp normalize_run_strategy(strategy) when is_atom(strategy) do strategy.normalize_config!(nil) end + defp normalize_run_strategy({strategy, options}) when is_atom(strategy) do strategy.normalize_config!(options) end diff --git a/lib/quantum/task_stages_supervisor.ex b/lib/quantum/task_stages_supervisor.ex index eb93f0a..e49a522 100644 --- a/lib/quantum/task_stages_supervisor.ex +++ b/lib/quantum/task_stages_supervisor.ex @@ -11,7 +11,7 @@ defmodule Quantum.TaskStagesSupervisor do @spec start_link(Keyword.t()) :: GenServer.on_start() def start_link(opts) do __MODULE__ - |> Supervisor.start_link(opts, [name: Keyword.fetch!(opts, :task_stages_supervisor)]) + |> Supervisor.start_link(opts, name: Keyword.fetch!(opts, :task_stages_supervisor)) |> Util.start_or_link() end @@ -50,7 +50,7 @@ defmodule Quantum.TaskStagesSupervisor do Keyword.fetch!(opts, :task_supervisor), Keyword.fetch!(opts, :task_registry) } - }, + } ], strategy: :rest_for_one ) diff --git a/test/quantum/executor_test.exs b/test/quantum/executor_test.exs index e22c57c..21d6ac9 100644 --- a/test/quantum/executor_test.exs +++ b/test/quantum/executor_test.exs @@ -93,9 +93,9 @@ defmodule Quantum.ExecutorTest do job = TestScheduler.new_job() |> Job.set_task(fn -> - Process.sleep(50) - send(caller, :executed) - end) + Process.sleep(50) + send(caller, :executed) + end) |> Job.set_overlap(false) capture_log(fn -> @@ -116,16 +116,16 @@ defmodule Quantum.ExecutorTest do job = TestScheduler.new_job() |> Job.set_task(fn -> - Process.sleep(50) - send(caller, :executed) - end) + send(caller, :starting) + Process.sleep(50) + send(caller, :executed) + end) |> Job.set_overlap(false) capture_log(fn -> Executor.start_link({task_supervisor, task_registry}, {:execute, job}) - # Wait until running - Process.sleep(25) + assert_receive :starting assert :already_running = TaskRegistry.mark_running(task_registry, job.name, Node.self()) @@ -147,10 +147,10 @@ defmodule Quantum.ExecutorTest do # Mute Error capture_log(fn -> - Executor.start_link({task_supervisor, task_registry}, {:execute, job}) + Executor.start_link({task_supervisor, task_registry}, {:execute, job}) - Process.sleep(50) - end) + Process.sleep(50) + end) assert :marked_running = TaskRegistry.mark_running(task_registry, job.name, Node.self()) end diff --git a/test/quantum/scheduler_test.exs b/test/quantum/scheduler_test.exs index abcac39..3da756b 100644 --- a/test/quantum/scheduler_test.exs +++ b/test/quantum/scheduler_test.exs @@ -90,7 +90,7 @@ defmodule Quantum.SchedulerTest do assert Enum.any?(Scheduler.jobs(), fn {_, %Job{schedule: schedule, task: task}} -> schedule == spec && task == fun end) - end) + end) end @tag schedulers: [Scheduler] @@ -111,7 +111,7 @@ defmodule Quantum.SchedulerTest do :test_job, %{job | run_strategy: %Random{nodes: :cluster}} }) - end) + end) end @tag schedulers: [Scheduler] @@ -132,7 +132,7 @@ defmodule Quantum.SchedulerTest do :ticker, %{job | run_strategy: %Random{nodes: :cluster}} }) - end) + end) end @tag schedulers: [Scheduler] diff --git a/test/quantum_startup_test.exs b/test/quantum_startup_test.exs index c080e80..ab9da1c 100644 --- a/test/quantum_startup_test.exs +++ b/test/quantum_startup_test.exs @@ -16,13 +16,12 @@ defmodule QuantumStartupTest do @tag :startup test "prevent duplicate job names on startup" do capture_log(fn -> - test_jobs = - [ - {:test_job, [schedule: ~e[1 * * * *], task: fn -> :ok end]}, - {:test_job, [schedule: ~e[2 * * * *], task: fn -> :ok end]}, - {"3 * * * *", fn -> :ok end}, - {"4 * * * *", fn -> :ok end} - ] + test_jobs = [ + {:test_job, [schedule: ~e[1 * * * *], task: fn -> :ok end]}, + {:test_job, [schedule: ~e[2 * * * *], task: fn -> :ok end]}, + {"3 * * * *", fn -> :ok end}, + {"4 * * * *", fn -> :ok end} + ] Application.put_env(:quantum_startup_test, QuantumStartupTest.Scheduler, jobs: test_jobs)