-
Notifications
You must be signed in to change notification settings - Fork 180
/
mock.ex
154 lines (125 loc) · 3.55 KB
/
mock.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
defmodule Exq.Mock do
alias Exq.Support.Config
alias Exq.Adapters.Queue.Redis
alias Exq.Support.Job
use GenServer
@timeout 30000
defmodule State do
defstruct default_mode: :redis, jobs: %{}, modes: %{}
end
### Public api
def start_link(options \\ []) do
queue_adapter = Config.get(:queue_adapter)
if queue_adapter != Exq.Adapters.Queue.Mock do
raise RuntimeError, """
Exq.Mock can only work if queue_adapter is set to Exq.Adapters.Queue.Mock
Add the following to your test config
config :exq, queue_adapter: Exq.Adapters.Queue.Mock
"""
end
GenServer.start_link(__MODULE__, options, name: __MODULE__)
end
def set_mode(mode) when mode in [:redis, :inline, :fake] do
GenServer.call(__MODULE__, {:mode, self(), mode}, @timeout)
end
def jobs do
GenServer.call(__MODULE__, {:jobs, self()}, @timeout)
end
### Private
@impl true
def init(options) do
{:ok, %State{default_mode: Keyword.get(options, :mode, :redis)}}
end
def enqueue(pid, queue, worker, args, options) do
{:ok, runnable} =
GenServer.call(
__MODULE__,
{:enqueue, self(), :enqueue, [pid, queue, worker, args, options]},
@timeout
)
runnable.()
end
def enqueue_at(pid, queue, time, worker, args, options) do
{:ok, runnable} =
GenServer.call(
__MODULE__,
{:enqueue, self(), :enqueue_at, [pid, queue, time, worker, args, options]},
@timeout
)
runnable.()
end
def enqueue_in(pid, queue, offset, worker, args, options) do
{:ok, runnable} =
GenServer.call(
__MODULE__,
{:enqueue, self(), :enqueue_in, [pid, queue, offset, worker, args, options]},
@timeout
)
runnable.()
end
@impl true
def handle_call({:enqueue, owner_pid, type, args}, _from, state) do
state = maybe_add_and_monitor_pid(state, owner_pid, state.default_mode)
case state.modes[owner_pid] do
:redis ->
runnable = fn -> apply(Redis, type, args) end
{:reply, {:ok, runnable}, state}
:inline ->
runnable = fn ->
job = to_job(args)
apply(job.class, :perform, job.args)
{:ok, job.jid}
end
{:reply, {:ok, runnable}, state}
:fake ->
job = to_job(args)
state = update_in(state.jobs[owner_pid], &((&1 || []) ++ [job]))
runnable = fn ->
{:ok, job.jid}
end
{:reply, {:ok, runnable}, state}
end
end
def handle_call({:mode, owner_pid, mode}, _from, state) do
state = maybe_add_and_monitor_pid(state, owner_pid, mode)
{:reply, :ok, state}
end
def handle_call({:jobs, owner_pid}, _from, state) do
jobs = state.jobs[owner_pid] || []
{:reply, jobs, state}
end
@impl true
def handle_info({:DOWN, _, _, pid, _}, state) do
{_, state} = pop_in(state.modes[pid])
{_, state} = pop_in(state.jobs[pid])
{:noreply, state}
end
defp to_job([_pid, queue, worker, args, _options]) do
%Job{
jid: UUID.uuid4(),
queue: queue,
class: worker,
args: args,
enqueued_at: DateTime.utc_now()
}
end
defp to_job([_pid, queue, _time_or_offset, worker, args, _options]) do
%Job{
jid: UUID.uuid4(),
queue: queue,
class: worker,
args: args,
enqueued_at: DateTime.utc_now()
}
end
defp maybe_add_and_monitor_pid(state, pid, mode) do
case state.modes do
%{^pid => _mode} ->
state
_ ->
Process.monitor(pid)
state = put_in(state.modes[pid], mode)
state
end
end
end