/
dirty_worker.ex
134 lines (110 loc) · 4.03 KB
/
dirty_worker.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
defmodule FarmbotExt.API.DirtyWorker do
@moduledoc "Handles uploading/downloading of data from the API."
alias FarmbotCore.Asset.{Private, Repo}
alias FarmbotExt.{API, API.DirtyWorker}
import API.View, only: [render: 2]
require Logger
use GenServer
@timeout 10000
@doc false
def child_spec(module) when is_atom(module) do
%{
id: {DirtyWorker, module},
start: {__MODULE__, :start_link, [[module: module, timeout: @timeout]]},
type: :worker,
restart: :permanent,
shutdown: 500
}
end
@doc "Start an instance of a DirtyWorker"
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
@impl GenServer
def init(args) do
# Logger.disable(self())
module = Keyword.fetch!(args, :module)
timeout = Keyword.get(args, :timeout, @timeout)
{:ok, %{module: module, timeout: timeout}, timeout}
end
@impl GenServer
def handle_info(:timeout, %{module: module} = state) do
dirty = Private.list_dirty(module)
local = Private.list_local(module)
{:noreply, state, {:continue, Enum.uniq(dirty ++ local)}}
end
@impl GenServer
def handle_continue([], state) do
{:noreply, state, state.timeout}
end
def handle_continue([dirty | rest], %{module: module} = state) do
Logger.info("[#{module} #{dirty.local_id} #{inspect(self())}] Handling dirty data")
case http_request(dirty, state) do
# Valid data
{:ok, %{status: s, body: body}} when s > 199 and s < 300 ->
Logger.debug(
"[#{module} #{dirty.local_id} #{inspect(self())}] HTTP request complete: #{s} ok"
)
dirty |> module.changeset(body) |> handle_changeset(rest, state)
# Invalid data
{:ok, %{status: s, body: %{} = body}} when s > 399 and s < 500 ->
Logger.debug(
"[#{module} #{dirty.local_id} #{inspect(self())}] HTTP request complete: #{s} error+body"
)
changeset = module.changeset(dirty)
Enum.reduce(body, changeset, fn {key, val}, changeset ->
Ecto.Changeset.add_error(changeset, key, val)
end)
|> handle_changeset(rest, state)
# Invalid data, but the API didn't say why
{:ok, %{status: s, body: _body}} when s > 399 and s < 500 ->
Logger.debug(
"[#{module} #{dirty.local_id} #{inspect(self())}] HTTP request complete: #{s} error"
)
module.changeset(dirty)
|> Map.put(:valid?, false)
|> handle_changeset(rest, state)
# HTTP Error. (500, network error, timeout etc.)
error ->
Logger.error(
"[#{module} #{dirty.local_id} #{inspect(self())}] HTTP Error: #{state.module} #{
inspect(error)
}"
)
{:noreply, state, @timeout}
end
end
# If the changeset was valid, update the record.
def handle_changeset(%{valid?: true} = changeset, rest, state) do
Logger.info("Successfully synced: #{state.module}")
Repo.update!(changeset)
|> Private.mark_clean!()
{:noreply, state, {:continue, rest}}
end
# If the changeset was invalid, delete the record.
# TODO(Connor) - Update the dirty field here, upload to rollbar?
def handle_changeset(%{valid?: false, data: data} = changeset, rest, state) do
message =
Enum.map(changeset.errors, fn
{key, {msg, _meta}} when is_binary(key) -> "\t#{key}: #{msg}"
{key, msg} when is_binary(key) -> "\t#{key}: #{msg}"
end)
|> Enum.join("\n")
Logger.error("Failed to sync: #{state.module} \n #{message}")
_ = Repo.delete!(data)
{:noreply, state, {:continue, rest}}
end
defp http_request(%{id: nil} = dirty, state) do
Logger.debug("#{state.module} clean request (post)")
path = state.module.path()
data = render(state.module, dirty)
API.post(API.client(), path, data)
end
defp http_request(dirty, state) do
Logger.debug("#{state.module} dirty request (patch)")
# IO.inspect(dirty, label: "PATCH")
path = Path.join(state.module.path(), to_string(dirty.id))
data = render(state.module, dirty)
API.patch(API.client(), path, data)
end
end