-
Notifications
You must be signed in to change notification settings - Fork 181
/
server.ex
204 lines (171 loc) · 5.31 KB
/
server.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
defmodule Exq.Worker.Server do
@moduledoc """
Worker process is responsible for the parsing and execution of a Job.
It then broadcasts results to Stats / Manager.
Currently uses the `terminate` callback to track job success/failure.
## Initialization:
* `job_serialized` - Full JSON payload of the Job.
* `manager` - Manager process pid.
* `queue` - The queue the job came from.
* `stats` - Stats process pid.
* `namespace` - Redis namespace
* `host` - Host name
Expects :work message after initialization to kickoff work.
"""
use GenServer
alias Exq.Middleware.Server, as: Middleware
alias Exq.Middleware.Pipeline
alias Exq.Worker.Metadata
defmodule State do
defstruct job_serialized: nil,
manager: nil,
queue: nil,
namespace: nil,
stats: nil,
host: nil,
redis: nil,
middleware: nil,
pipeline: nil,
metadata: nil,
middleware_state: nil
end
def start_link(
job_serialized,
manager,
queue,
stats,
namespace,
host,
redis,
middleware,
metadata
) do
GenServer.start_link(
__MODULE__,
{job_serialized, manager, queue, stats, namespace, host, redis, middleware, metadata},
[]
)
end
@doc """
Kickoff work associated with worker.
"""
def work(pid) do
GenServer.cast(pid, :work)
end
## ===========================================================
## GenServer callbacks
## ===========================================================
def init({job_serialized, manager, queue, stats, namespace, host, redis, middleware, metadata}) do
{
:ok,
%State{
job_serialized: job_serialized,
manager: manager,
queue: queue,
stats: stats,
namespace: namespace,
host: host,
redis: redis,
middleware: middleware,
metadata: metadata
}
}
end
@doc """
Kickoff work associated with worker.
This step handles:
* Parsing of JSON object
* Preparation of target module
Calls :dispatch to then call target module.
"""
def handle_cast(:work, state) do
state = %{state | middleware_state: Middleware.all(state.middleware)}
state = %{state | pipeline: before_work(state)}
case state |> Map.fetch!(:pipeline) |> Map.get(:terminated, false) do
# case done to run the after hooks
true -> nil
_ -> GenServer.cast(self(), :dispatch)
end
{:noreply, state}
end
# Dispatch work to the target module (call :perform method of target).
def handle_cast(:dispatch, state) do
dispatch_work(
state.pipeline.assigns.worker_module,
state.pipeline.assigns.job,
state.metadata
)
{:noreply, state}
end
# Worker done with normal termination message.
def handle_cast({:done, result}, state) do
state =
if !has_pipeline_after_work_ran?(state.pipeline) do
%{state | pipeline: pipeline_after_processed_work(state, result)}
else
state
end
{:stop, :normal, state}
end
def handle_info({:DOWN, _, _, _, :normal}, state) do
state =
if !has_pipeline_after_work_ran?(state.pipeline) do
error = "Worker shutdown"
%{state | pipeline: pipeline_after_failed_work(state, error, error)}
else
state
end
{:stop, :normal, state}
end
def handle_info({:DOWN, _, :process, _, error}, state) do
error_message =
error
|> Inspect.Algebra.to_doc(%Inspect.Opts{})
|> Inspect.Algebra.format(%Inspect.Opts{}.width)
|> to_string
state =
if !has_pipeline_after_work_ran?(state.pipeline) do
%{state | pipeline: pipeline_after_failed_work(state, error_message, error)}
else
state
end
{:stop, :normal, state}
end
def handle_info(_info, state) do
{:noreply, state}
end
## ===========================================================
## Internal Functions
## ===========================================================
def dispatch_work(worker_module, job, metadata) do
# trap exit so that link can still track dispatch without crashing
Process.flag(:trap_exit, true)
worker = self()
{:ok, pid} =
Task.start_link(fn ->
:ok = Metadata.associate(metadata, self(), job)
result = apply(worker_module, :perform, job.args)
GenServer.cast(worker, {:done, result})
end)
Process.monitor(pid)
end
defp before_work(state) do
%Pipeline{event: :before_work, worker_pid: self()}
|> Pipeline.assign_worker_state(state)
|> Pipeline.chain(state.middleware_state)
end
defp pipeline_after_processed_work(state, result) do
%Pipeline{event: :after_processed_work, worker_pid: self(), assigns: state.pipeline.assigns}
|> Pipeline.assign(:result, result)
|> Pipeline.chain(state.middleware_state)
end
defp pipeline_after_failed_work(state, error_message, error) do
%Pipeline{event: :after_failed_work, worker_pid: self(), assigns: state.pipeline.assigns}
|> Pipeline.assign(:error_message, error_message)
|> Pipeline.assign(:error, error)
|> Pipeline.chain(state.middleware_state)
end
defp has_pipeline_after_work_ran?(pipeline) do
Map.has_key?(pipeline, :result) || Map.has_key?(pipeline, :error)
end
end