-
-
Notifications
You must be signed in to change notification settings - Fork 181
/
async_limiter.ex
114 lines (97 loc) · 2.54 KB
/
async_limiter.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
defmodule Ash.Actions.Read.AsyncLimiter do
@moduledoc """
A utility for limiting the number of concurrent async operations
Because this is an optimization, we opt to run something synchronously
if there is no async task available in the slot. The idea here is that
the *vast* majority of things we do async will be fast enough not to
warrant always waiting for an async slot to be free. We may add in some
smarter heuristics later (i.e choosing to wait for a task instead of
doing the work sync), but for now this is a good start.
"""
use Agent
def start_link(limit) do
Agent.start_link(fn -> {1, limit} end)
end
def async_or_inline(
%{resource: resource, context: %{private: %{async_limiter: async_limiter}}} = query,
opts,
func
)
when not is_nil(async_limiter) do
if Ash.DataLayer.data_layer_can?(resource, :async_engine) && !in_transaction?(query) do
claimed? =
Agent.get_and_update(async_limiter, fn
{limit, limit} ->
{false, {limit, limit}}
{count, limit} ->
{true, {count + 1, limit}}
end)
if claimed? do
try do
Ash.ProcessHelpers.async(
fn ->
func.()
end,
opts
)
after
release(async_limiter)
end
else
func.()
end
else
func.()
end
end
def async_or_inline(_, _opts, func) do
func.()
end
def await_all(list) do
list
|> Enum.map(fn
%Task{} = task ->
Task.await(task, :infinity)
other ->
other
end)
end
def await_at_least_one([]), do: {[], []}
def await_at_least_one(list) do
list
|> Enum.map(fn
%Task{} = task ->
case Task.yield(task, 0) do
{:ok, {:__exception__, e, stacktrace}} ->
reraise e, stacktrace
{:ok, term} ->
term
{:exit, term} ->
{:error, term}
nil ->
task
end
other ->
other
end)
|> Enum.split_with(&(!match?(%Task{}, &1)))
|> case do
{[], remaining} ->
await_at_least_one(remaining)
{complete, remaining} ->
{complete, remaining}
end
end
defp in_transaction?(query) do
Enum.any?(
List.wrap(query.resource) ++ List.wrap(query.action.touches_resources),
&Ash.DataLayer.in_transaction?(&1)
)
end
defp release(async_limiter) do
Agent.update(async_limiter, fn
{count, limit} ->
{count - 1, limit}
end)
end
end