Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully shut down workers #271

Merged
merged 2 commits into from
May 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Exq is a job processing library compatible with Resque / Sidekiq for the [Elixir
This means that if the system or worker is restarted while a job is in progress,
the job will be re_enqueued when the node is restarted and not lost.
* Exq provides an optional web UI that you can use to view several stats as well as rate of job processing.
* When shutting down Exq will attempt to let workers terminate gracefully,
with a configurable timeout.
* There is no time limit to how long a job can run for.

### Do you need Exq?
Expand Down Expand Up @@ -83,6 +85,9 @@ Other options include:
* The `concurrency` setting will let you configure the amount of concurrent workers that will be allowed, or :infinite to disable any throttling.
* The `name` option allows you to customize Exq's registered name, similar to using `Exq.start_link([name: Name])`. The default is Exq.
* If the option `start_on_application` is `false`, Exq won't be started automatically when booting up you Application. You can start it with `Exq.start_link/1`.
* The `shutdown_timeout` is the number of milliseconds to wait for workers to
finish processing jobs when the application is shutting down. It defaults to
5000 ms.

```elixir
config :exq,
Expand Down Expand Up @@ -157,7 +162,7 @@ You can add Exq into your OTP application list, and it will start an instance of

When using Exq through OTP, it will register a process under the name ```Elixir.Exq``` - you can use this atom where expecting a process name in the Exq module.

If you would like to control Exq startup, you can configure Exq to not start anything on application start. For example, if you are using Exq along with Phoenix, and your workers are accessing the database or other resources, it is recommended to disable Exq startup and manually add it to the supervision tree.
If you would like to control Exq startup, you can configure Exq to not start anything on application start. For example, if you are using Exq along with Phoenix, and your workers are accessing the database or other resources, it is recommended to disable Exq startup and manually add it to the supervision tree.

This can be done by setting `start_on_application` to false and adding it to your supervision tree:

Expand Down Expand Up @@ -298,6 +303,11 @@ To unsubscribe from a queue:
:ok = Exq.unsubscribe(Exq, "queue_to_unsubscribe")
```

To unsubscribe from all queues:
```elixir
:ok = Exq.unsubscribe_all(Exq)
```

## Middleware Support

If you'd like to customize worker execution and/or create plugins like Sidekiq/Resque have, Exq supports custom middleware. The first step would be to define the middleware in config.exs and add your middleware into the chain:
Expand Down
8 changes: 8 additions & 0 deletions lib/exq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ defmodule Exq do
GenServer.call(pid, {:unsubscribe, queue})
end

@doc """
Unsubscribe from all queues - ie. stop listening for jobs
* `pid` - PID for Exq Manager or Enqueuer to handle this
"""
def unsubscribe_all(pid) do
GenServer.call(pid, :unsubscribe_all)
end


@doc """
Get the job metadata
Expand Down
28 changes: 21 additions & 7 deletions lib/exq/manager/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ defmodule Exq.Manager.Server do
:ok
end

def server_name(nil), do: Config.get(:name)
def server_name(name), do: name


##===========================================================
## gen server callbacks
##===========================================================
Expand Down Expand Up @@ -180,17 +184,22 @@ defmodule Exq.Manager.Server do

def handle_call({:subscribe, queue}, _from, state) do
updated_state = add_queue(state, queue)
{:reply, :ok, updated_state,0}
{:reply, :ok, updated_state, 0}
end

def handle_call({:subscribe, queue, concurrency}, _from, state) do
updated_state = add_queue(state, queue, concurrency)
{:reply, :ok, updated_state,0}
{:reply, :ok, updated_state, 0}
end

def handle_call({:unsubscribe, queue}, _from, state) do
updated_state = remove_queue(state, queue)
{:reply, :ok, updated_state,0}
{:reply, :ok, updated_state, 0}
end

def handle_call(:unsubscribe_all, _from, state) do
updated_state = remove_all_queues(state)
{:reply, :ok, updated_state, 0}
end

def handle_cast({:re_enqueue_backup, queue}, state) do
Expand Down Expand Up @@ -321,8 +330,16 @@ defmodule Exq.Manager.Server do
%{state | queues: updated_queues}
end

def update_worker_count(work_table, queue, delta) do
defp remove_all_queues(state) do
true = :ets.delete_all_objects(state.work_table)
%{state | queues: []}
end

defp update_worker_count(work_table, queue, delta) do
:ets.update_counter(work_table, queue, {3, delta})
rescue
# The queue has been unsubscribed
_error in ArgumentError -> :ok
end

@doc """
Expand Down Expand Up @@ -363,7 +380,4 @@ defmodule Exq.Manager.Server do
end
end

defp server_name(nil), do: Config.get(:name)
defp server_name(name), do: name

end
1 change: 1 addition & 0 deletions lib/exq/support/mode.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ defmodule Exq.Support.Mode do
worker(Exq.Stats.Server, [opts]),
supervisor(Exq.Worker.Supervisor, [opts]),
worker(Exq.Manager.Server, [opts]),
worker(Exq.WorkerDrainer.Server, [opts]),
worker(Exq.Enqueuer.Server, [opts]),
worker(Exq.Api.Server, [opts])
]
Expand Down
4 changes: 4 additions & 0 deletions lib/exq/worker/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,8 @@ defmodule Exq.Worker.Supervisor do
def start_child(sup, args) do
Supervisor.start_child(sup, args)
end

def workers(sup) do
Supervisor.which_children(sup)
end
end
94 changes: 94 additions & 0 deletions lib/exq/worker_drainer/server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
defmodule Exq.WorkerDrainer.Server do
@moduledoc """
The WorkerDrainer server is responsible for gracefully draining
workers when the application is shutting down. When shutdown starts
it instructs the Manager to stop accepting new jobs and then waits
for all currently in progress jobs to complete.

If the jobs do not complete within an allowed timeout the WorkerDrainer
will shut down, allowing the rest of the supervision tree (including the
remaining workers) to then shut down.

The length of the grace period can be configured with the
`shutdown_timeout` option, which defaults to 5000 ms.

"""

use GenServer
alias Exq.{Worker, Manager}

defstruct name: Exq,
shutdown_timeout: 5000

def server_name(name) do
name = name || Exq.Support.Config.get(:name)
"#{name}.WorkerDrainer" |> String.to_atom
end

##===========================================================
## gen server callbacks
##===========================================================

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: server_name(opts[:name]))
end

def init(opts) do
Process.flag(:trap_exit, true)
state = struct(__MODULE__, opts)
{:ok, state}
end

def terminate(:shutdown, state) do
drain_workers(state)
end

def terminate({:shutdown, _}, state) do
drain_workers(state)
end

def terminate(:normal, state) do
drain_workers(state)
end

def terminate(_, _) do
:ok
end


##===========================================================
## functions
##===========================================================

defp drain_workers(state) do
timer_ref = :erlang.start_timer(state.shutdown_timeout, self,
:end_of_grace_period)
:ok =
state.name
|> Manager.Server.server_name()
|> Exq.unsubscribe_all()
state.name
|> Worker.Supervisor.supervisor_name()
|> Worker.Supervisor.workers()
|> Enum.map(&Process.monitor(elem(&1, 1)))
|> Enum.into(MapSet.new)
|> await_workers(timer_ref)
end

defp await_workers(%{map: refs}, _) when map_size(refs) == 0 do
:ok
end

defp await_workers(worker_refs, timer_ref) do
receive do
{:DOWN, downed_ref, _, _, _} ->
worker_refs
|> MapSet.delete(downed_ref)
|> await_workers(timer_ref)

# Not all workers finished within grace period
{:timeout, ^timer_ref, :end_of_grace_period} ->
:ok
end
end
end
26 changes: 24 additions & 2 deletions test/exq_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@ defmodule ExqTest do
stop_process(sup)
end

test "unregister all queues and run jobs" do
Process.register(self(), :exqtest)
{:ok, sup} = Exq.start_link(queues: ["q1", "q2"])
:ok = Exq.unsubscribe_all(Exq)
{:ok, _} = Exq.enqueue(Exq, "q1", ExqTest.PerformArgWorker, [1])
{:ok, _} = Exq.enqueue(Exq, "q2", ExqTest.PerformArgWorker, [2])
refute_receive {:worked, 1}
refute_receive {:worked, 2}
stop_process(sup)
end

test "throttle workers per queue" do
Process.register(self(), :exqtest)
{:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1", "q2"])
Expand Down Expand Up @@ -310,7 +321,19 @@ defmodule ExqTest do
stop_process(sup)
end

@tag :skip
test "waiting for workers to finish" do
Process.register(self(), :exqtest)
{:ok, sup} = Exq.start_link([])
{:ok, _} = Exq.enqueue(Exq, "default", ExqTest.SleepWorker, [100, :one])
{:ok, _} = Exq.enqueue(Exq, "default", ExqTest.SleepWorker, [100, :two])

wait()
stop_process(sup)

assert_received {"one"}
assert_received {"two"}
end

test "configure worker shutdown time" do
Process.register(self(), :exqtest)
{:ok, sup} = Exq.start_link([shutdown_timeout: 200])
Expand All @@ -324,7 +347,6 @@ defmodule ExqTest do
assert_received {"short"}
end

@tag :skip
test "handle supervisor tree shutdown properly with stats cleanup" do
Process.register(self(), :exqtest)

Expand Down