/
farmware_runtime.ex
297 lines (251 loc) · 9.19 KB
/
farmware_runtime.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
defmodule FarmbotCore.FarmwareRuntime do
@moduledoc """
Handles execution of Farmware plugins.
"""
alias FarmbotCeleryScript.AST
alias FarmbotCore.FarmwareRuntime.PipeWorker
alias FarmbotCore.AssetWorker.FarmbotCore.Asset.FarmwareInstallation
alias FarmbotCore.Asset.FarmwareInstallation.Manifest
alias FarmbotCore.BotState.FileSystem
alias FarmbotCore.Project
import FarmwareInstallation, only: [install_dir: 1]
alias FarmbotCore.{Asset, JSON}
import FarmbotCore.Config, only: [get_config_value: 3]
require Logger
@error_timeout_ms 5000
@runtime_dir Application.get_env(:farmbot_core, __MODULE__)[:runtime_dir]
@runtime_dir ||
Mix.raise("""
config :farmbot_core, FarmwareRuntime,
runtime_dir: "/tmp/farmware_runtime"
""")
@muontrap_opts Application.get_env(:farmbot_core, __MODULE__)[:muontrap_opts]
@muontrap_opts @muontrap_opts || []
@packet_header_token 0xFBFB
@packet_header_byte_size 10
alias __MODULE__, as: State
defstruct [
:cmd,
:mon,
:context,
:rpc,
:request_pipe,
:request_pipe_handle,
:response_pipe,
:response_pipe_handle
]
@opaque pipe_handle :: pid()
@type t :: %State{
request_pipe: Path.t(),
request_pipe_handle: pipe_handle,
response_pipe: Path.t(),
response_pipe_handle: pipe_handle,
cmd: pid(),
mon: pid() | nil,
rpc: map(),
context: :get_header | :get_payload | :process_payload | :send_response
}
@doc """
Calls the Farmware Runtime asking for any RPCs that need to be
processed. If an RPC was ready, the Farmware will not process
any more RPCs until the current one is done.
"""
def process_rpc(pid) do
GenServer.call(pid, :process_rpc)
end
@doc """
Calls the Farmware Runtime telling it that an RPC has been processed.
"""
def rpc_processed(pid, response) do
GenServer.call(pid, {:rpc_processed, response})
end
@doc "Start a Farmware"
def start_link(%Manifest{} = manifest, env \\ %{}) do
package = manifest.package
GenServer.start_link(__MODULE__, [manifest, env], name: String.to_atom(package))
end
def init([manifest, env]) do
package = manifest.package
request_pipe =
Path.join([
@runtime_dir,
package <> "-" <> Ecto.UUID.generate() <> "-farmware-request-pipe"
])
response_pipe =
Path.join([
@runtime_dir,
package <> "-" <> Ecto.UUID.generate() <> "-farmware-response-pipe"
])
env = build_env(manifest, env, request_pipe, response_pipe)
# Create pipe dir if it doesn't exist
_ = File.mkdir_p(@runtime_dir)
# Open a pipe
{:ok, req} = PipeWorker.start_link(request_pipe)
{:ok, resp} = PipeWorker.start_link(response_pipe)
exec = System.find_executable(manifest.executable)
installation_path = install_dir(manifest)
opts =
Keyword.merge(@muontrap_opts,
env: env,
cd: installation_path,
into: IO.stream(:stdio, :line)
)
# Start the plugin.
{cmd, _} = spawn_monitor(MuonTrap, :cmd, [exec, [manifest.args], opts])
state = %State{
cmd: cmd,
mon: nil,
context: :get_header,
rpc: nil,
request_pipe: request_pipe,
request_pipe_handle: req,
response_pipe: response_pipe,
response_pipe_handle: resp
}
send self(), :timeout
{:ok, state}
end
def terminate(_reason, state) do
if state.cmd && Process.alive?(state.cmd), do: Process.exit(state.cmd, :kill)
if state.request_pipe_handle do
PipeWorker.close(state.request_pipe_handle)
end
if state.response_pipe_handle do
PipeWorker.close(state.response_pipe_handle)
end
end
# If we are in the `process_request` state, send the RPC out to be buffered.
# This moves us to the `send_response` state. (which has _no_ timeout)
def handle_call(:process_rpc, {pid, _} = _from, %{context: :process_request, rpc: rpc} = state) do
# Link the calling process
# so the Farmware can exit if the rpc never gets processed.
_ = Process.link(pid)
{:reply, {:ok, rpc}, %{state | rpc: nil, context: :send_response}}
end
# If not in the `process_request` state, noop
def handle_call(:process_rpc, _from, state) do
{:reply, {:error, :no_rpc}, state}
end
def handle_call({:rpc_processed, result}, {pid, _} = _from, %{context: :send_response} = state) do
# Unlink the calling process
_ = Process.unlink(pid)
ipc = add_header(result)
reply = PipeWorker.write(state.response_pipe_handle, ipc)
# Make sure to `timeout` after this one to go back to the
# get_header context. This will cause another rpc to be processed.
send self(), :timeout
{:reply, reply, %{state | rpc: nil, context: :get_header}}
end
# get_request does two reads. One to get the header,
# and a second to get the entire binary payload.
def handle_info(:timeout, %{context: :get_header} = state) do
state = async_request_pipe_read(state, @packet_header_byte_size)
{:noreply, state}
end
# Timeout set by `handle_packet/2`. This will mean the CSVM
# didn't pick up the scheduled AST in a reasonable amount of time.
def handle_info(:timeout, %{context: :process_request} = state) do
Logger.error("Timeout waiting for #{inspect(state.rpc)} to be processed")
{:stop, {:error, :rpc_timeout}, state}
end
# farmware exit
def handle_info({:DOWN, _ref, :process, cmd, _reason}, %{cmd: cmd} = state) do
Logger.debug("Farmware exit")
{:stop, :normal, state}
end
# successful result of an io:read/2 in :get_header context
def handle_info(
{PipeWorker, _ref,
{:ok,
<<@packet_header_token::size(16), _reserved::size(32),
payload_size::integer-big-size(32)>>}},
%{context: :get_header} = state
) do
state = async_request_pipe_read(state, payload_size)
{:noreply, %{state | context: :get_payload}}
end
# error result of an io:read/2 in :get_header context
def handle_info({PipeWorker, _ref, {:ok, data}}, %{context: :get_header} = state) do
Logger.error("Bad header: #{inspect(data, base: :hex, limit: :infinity)}")
{:stop, {:unhandled_packet, data}, state}
end
# error result of an io:read/2 in :get_header context
def handle_info({PipeWorker, _ref, error}, %{context: :get_header} = state) do
Logger.error("Bad header: #{inspect(error)}")
{:stop, error, state}
end
# successful result of an io:read/2 in :get_payload context
def handle_info({PipeWorker, _ref, {:ok, packet}}, %{context: :get_payload} = state) do
handle_packet(packet, state)
end
# error result of an io:read/2 in :get_header context
def handle_info({PipeWorker, _ref, error}, %{context: :get_payload} = state) do
Logger.error("Bad payload: #{inspect(error)}")
{:stop, error, state}
end
# Pipe reads are done async because reading will block the entire
# process from receiving more messages as well as
# prevent the processes from terminating.
# this means if a Farmware never opens the pipe
# (a valid use case), When the Farmware completes
# the pipe will still be waiting for information
# and prevent the pipes from closing.
defp async_request_pipe_read(state, size) do
mon = PipeWorker.read(state.request_pipe_handle, size)
%{state | mon: mon}
end
# When a packet arives, buffer it until
# the controlling process (the CSVM) picks it up.
# there is a timeout for how long a packet will wait to be collected,
# but no time limit to how long it will take to
# process the packet.
def handle_packet(packet, state) do
with {:ok, data} <- JSON.decode(packet),
{:ok, rpc} <- decode_ast(data) do
{:noreply, %{state | rpc: rpc, context: :process_request}, @error_timeout_ms}
else
error -> {:stop, error, state}
end
end
defp decode_ast(data) do
try do
case AST.decode(data) do
%{kind: :rpc_request} = ast ->
{:ok, ast}
%{} = ast ->
Logger.error("Got bad ast: #{inspect(ast)}")
{:error, :bad_ast}
end
rescue
_ -> {:error, :bad_ast}
end
end
# RPC ENV is passed in to `start_link` and overwrites everything
# except the `base` data.
defp build_env(manifest, rpc_env, request_pipe, response_pipe) do
token = get_config_value(:string, "authorization", "token")
images_dir = "/tmp/images"
installation_path = install_dir(manifest)
state_root_dir = Application.get_env(:farmbot_core, FileSystem)[:root_dir]
base =
Map.new()
|> Map.put("FARMWARE_API_V2_REQUEST_PIPE", request_pipe)
|> Map.put("FARMWARE_API_V2_RESPONSE_PIPE", response_pipe)
|> Map.put("FARMBOT_API_TOKEN", token)
|> Map.put("FARMBOT_OS_IMAGES_DIR", images_dir)
|> Map.put("FARMBOT_OS_VERSION", Project.version())
|> Map.put("FARMBOT_OS_STATE_DIR", state_root_dir)
|> Map.put("PYTHONPATH", installation_path)
Asset.list_farmware_env()
|> Map.new(fn %{key: key, value: val} -> {key, val} end)
|> Map.merge(rpc_env)
|> Map.merge(base)
end
defp add_header(%AST{} = rpc) do
payload = rpc |> Map.from_struct() |> JSON.encode!()
header =
<<@packet_header_token::size(16)>> <>
:binary.copy(<<0x00>>, 4) <> <<byte_size(payload)::big-size(32)>>
header <> payload
end
end