-
Notifications
You must be signed in to change notification settings - Fork 19
/
daemon.ex
243 lines (195 loc) · 6.84 KB
/
daemon.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# SPDX-FileCopyrightText: 2018 Frank Hunleth
#
# SPDX-License-Identifier: Apache-2.0
defmodule MuonTrap.Daemon do
@moduledoc """
Wrap an OS process in a GenServer so that it can be supervised.
For example, in your children list add MuonTrap.Daemon like this:
```elixir
children = [
{MuonTrap.Daemon, ["my_server", ["--options", "foo")], [cd: "/some_directory"]]}
]
opts = [strategy: :one_for_one, name: MyApplication.Supervisor]
Supervisor.start_link(children, opts)
```
In the `child_spec` tuple, the second element is a list that corresponds to
the `MuonTrap.cmd/3` parameters. I.e., The first item in the list is the
program to run, the second is a list of commandline arguments, and the third
is a list of options. The same options as `MuonTrap.cmd/3` are available with
the following additions:
* `:name` - Name the Daemon GenServer
* `:log_output` - When set, send output from the command to the Logger.
Specify the log level (e.g., `:debug`)
* `:log_prefix` - Prefix each log message with this string (defaults to the
program's path)
* `:stderr_to_stdout` - When set to `true`, redirect stderr to stdout.
Defaults to `false`.
* `:exit_status_to_reason` - Optional function to convert the exit status (a
number) to stop reason for the Daemon GenServer. Use if error exit codes
carry information or aren't errors.
If you want to run multiple `MuonTrap.Daemon`s under one supervisor, they'll
all need unique IDs. Use `Supervisor.child_spec/2` like this:
```elixir
Supervisor.child_spec({MuonTrap.Daemon, ["my_server", []]}, id: :server1)
```
"""
use GenServer
alias MuonTrap.Cgroups
require Logger
defstruct [
:buffer,
:command,
:port,
:cgroup_path,
:log_output,
:log_prefix,
:log_transform,
:exit_status_to_reason,
:output_byte_count
]
@max_data_to_buffer 256
@spec child_spec(keyword()) :: Supervisor.child_spec()
def child_spec([command, args]) do
child_spec([command, args, []])
end
def child_spec([command, args, opts]) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [command, args, opts]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
@doc """
Start/link a deamon GenServer for the specified command.
"""
@spec start_link(binary(), [binary()], keyword()) :: GenServer.on_start()
def start_link(command, args, opts \\ []) do
{genserver_opts, opts} =
case Keyword.pop(opts, :name) do
{nil, _opts} -> {[], opts}
{name, new_opts} -> {[name: name], new_opts}
end
GenServer.start_link(__MODULE__, [command, args, opts], genserver_opts)
end
@doc """
Get the value of the specified cgroup variable.
"""
@spec cgget(GenServer.server(), binary(), binary()) ::
{:ok, String.t()} | {:error, File.posix()}
def cgget(server, controller, variable_name) do
GenServer.call(server, {:cgget, controller, variable_name})
end
@doc """
Modify a cgroup variable.
"""
@spec cgset(GenServer.server(), binary(), binary(), binary()) :: :ok | {:error, File.posix()}
def cgset(server, controller, variable_name, value) do
GenServer.call(server, {:cgset, controller, variable_name, value})
end
@doc """
Return the OS pid to the muontrap executable.
"""
@spec os_pid(GenServer.server()) :: non_neg_integer() | :error
def os_pid(server) do
GenServer.call(server, :os_pid)
end
@doc """
Return statistics about the daemon
Statistics:
* `:output_byte_count` - bytes output by the process being run
"""
@spec statistics(GenServer.server()) :: %{output_byte_count: non_neg_integer()}
def statistics(server) do
GenServer.call(server, :statistics)
end
@impl GenServer
def init([command, args, opts]) do
options = MuonTrap.Options.validate(:daemon, command, args, opts)
port_options = MuonTrap.Port.port_options(options) ++ [:stream]
port = Port.open({:spawn_executable, to_charlist(MuonTrap.muontrap_path())}, port_options)
{:ok,
%__MODULE__{
buffer: "",
command: command,
port: port,
cgroup_path: Map.get(options, :cgroup_path),
log_output: Map.get(options, :log_output),
log_prefix: Map.get(options, :log_prefix, command <> ": "),
log_transform: Map.get(options, :log_transform, &Function.identity/1),
exit_status_to_reason:
Map.get(options, :exit_status_to_reason, fn _ -> :error_exit_status end),
output_byte_count: 0
}}
end
@impl GenServer
def handle_call({:cgget, controller, variable_name}, _from, %{cgroup_path: cgroup_path} = state) do
result = Cgroups.cgget(controller, cgroup_path, variable_name)
{:reply, result, state}
end
def handle_call(
{:cgset, controller, variable_name, value},
_from,
%{cgroup_path: cgroup_path} = state
) do
result = Cgroups.cgset(controller, cgroup_path, variable_name, value)
{:reply, result, state}
end
def handle_call(:os_pid, _from, state) do
os_pid =
case Port.info(state.port, :os_pid) do
{:os_pid, p} -> p
nil -> :error
end
{:reply, os_pid, state}
end
def handle_call(:statistics, _from, state) do
statistics = %{output_byte_count: state.output_byte_count}
{:reply, statistics, state}
end
@impl GenServer
def handle_info({port, {:data, message}}, %__MODULE__{port: port} = state) do
bytes_received = byte_size(message)
state = split_and_log(message, state)
MuonTrap.Port.report_bytes_handled(state.port, bytes_received)
{:noreply, %{state | output_byte_count: state.output_byte_count + bytes_received}}
end
def handle_info({port, {:exit_status, status}}, %__MODULE__{port: port} = state) do
reason =
case status do
0 ->
Logger.info("#{state.command}: Process exited successfully")
:normal
_failure ->
Logger.error("#{state.command}: Process exited with status #{status}")
state.exit_status_to_reason.(status)
end
{:stop, reason, state}
end
def handle_info(_message, state) do
{:noreply, state}
end
defp split_and_log(data, state) do
{lines, remainder} = process_data(state.buffer <> data)
Enum.each(lines, &log_line(&1, state))
%{state | buffer: remainder}
end
defp log_line(line, state) do
Logger.log(state.log_output, [state.log_prefix, state.log_transform.(line)])
end
@doc false
@spec process_data(binary()) :: {[String.t()], binary()}
def process_data(data) do
data |> String.split("\n") |> process_lines([])
end
defp process_lines([leftovers], acc) do
{Enum.reverse(acc), trim_buffer(leftovers)}
end
defp process_lines([line | rest], acc) do
process_lines(rest, [line | acc])
end
defp trim_buffer(data) when byte_size(data) > @max_data_to_buffer,
do: :binary.part(data, 0, @max_data_to_buffer)
defp trim_buffer(data), do: data
end