Skip to content

Commit

Permalink
Adding timeout support for ttl plus refinement
Browse files Browse the repository at this point in the history
  • Loading branch information
netboz committed Jan 22, 2023
1 parent 04273c7 commit 75be47d
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 116 deletions.
10 changes: 6 additions & 4 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -75,17 +75,19 @@ config :archethic,
Archethic.Crypto.NodeKeystore.Origin.TPMImpl
end)

## Format : {service, [{key, module, function, args, refresh_interval_ms, ttl_ms}, ...]}
## If ttl = :infinity, the cache will never expire
config :archethic, Archethic.Utils.HydratingCache.CachesManager,
uco_service: [
{Archethic.OracleChain.Services.UCOPrice.Providers.CoinPaprika,
Archethic.OracleChain.Services.UCOPrice.Providers.CoinPaprika, :fetch, [["usd", "eur"]],
30000},
30000, :infinity},
{Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko,
Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko, :fetch, [["usd", "eur"]],
30000},
Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko, :fetch, [["usd", "eur"]], 30000,
:infinity},
{Archethic.OracleChain.Services.UCOPrice.Providers.CoinMarketCap,
Archethic.OracleChain.Services.UCOPrice.Providers.CoinMarketCap, :fetch, [["usd", "eur"]],
30000}
30000, :infinity}
]

config :archethic, Archethic.Governance.Pools,
Expand Down
43 changes: 31 additions & 12 deletions lib/archethic/utils/hydrating_cache/cache_entry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
use Task
require Logger

def start_link([fun, key, ttl, refresh_interval]) do
:gen_statem.start_link(__MODULE__, [fun, key, ttl, refresh_interval], [])
def start_link([fun, key, refresh_interval, ttl]) do
:gen_statem.start_link(__MODULE__, [fun, key, refresh_interval, ttl], [])
end

@impl :gen_statem
def init([fun, key, ttl, refresh_interval]) do
def init([fun, key, refresh_interval, ttl]) do
# start hydrating at the needed refresh interval
timer = :timer.send_interval(refresh_interval, self(), :hydrate)

Expand Down Expand Up @@ -113,12 +113,12 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
{:next_state, :running, data}
end

def handle_event({:call, from}, {:register, fun, key, ttl, refresh_interval}, :running, data) do
def handle_event({:call, from}, {:register, fun, key, refresh_interval, ttl}, :running, data) do
## Registering a new hydrating function while previous one is running

## We stop the hydrating task if it is already running
case data.running_func_task do
pid when is_pid(pid) -> Process.exit(pid, :brutal_kill)
pid when is_pid(pid) -> Process.exit(pid, :normal)
_ -> :ok
end

Expand All @@ -141,7 +141,7 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
}, [{:reply, from, :ok}]}
end

def handle_event({:call, from}, {:register, fun, key, ttl, refresh_interval}, _state, data) do
def handle_event({:call, from}, {:register, fun, key, refresh_interval, ttl}, _state, data) do
## Setting hydrating function in other cases
## Hydrating function not running, we just stop the timers
_ = :timer.cancel(data.timer_func)
Expand All @@ -155,7 +155,15 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
:refresh_interval => refresh_interval
})

timer = :timer.send_interval(refresh_interval, self(), :hydrate)
timer =
case ttl do
:infinity ->
nil

value when is_number(value) ->
{:ok, t} = :timer.send_interval(refresh_interval, self(), :hydrate)
t
end

## We trigger the update ( to trigger or not could be set at registering option )
{:next_state, :running,
Expand All @@ -181,6 +189,7 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do

hydrating_task =
spawn(fn ->
Logger.info("Running hydrating function for key :#{inspect(data.key)}")
value = data.hydrating_func.()
:gen_statem.cast(me, {:new_value, data.key, value})
end)
Expand All @@ -196,7 +205,8 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
"Key :#{inspect(data.key)}, Hydrating func #{inspect(data.hydrating_func)} discarded"
)

{:next_state, state, %CacheEntry.StateData{data | value: :discarded}}
{:next_state, state,
%CacheEntry.StateData{data | value: {:error, :discarded}, timer_discard: nil}}
end

def handle_event(:cast, {:new_value, _key, {:ok, value}}, :running, data) do
Expand All @@ -210,9 +220,7 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
send(pid, {:ok, value})
end)

## We could do error control here, like unregistering the running func.
## atm, we will keep it
_ = :timer.cancel(data.timer_discard)
## Start timer to discard new value

me = self()
{:ok, new_timer} = :timer.send_after(data.ttl, me, :discarded)
Expand All @@ -233,7 +241,18 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
"Key :#{inspect(data.key)}, Hydrating func #{inspect(data.hydrating_func)} got error value #{inspect({key, {:error, reason}})}"
)

{:next_state, :idle, %CacheEntry.StateData{data | running_func_task: :undefined}}
## We reprogram the timer to hydrate, even if previous call failled. Error control could occur here
me = self()
{:ok, new_timer} = :timer.send_after(data.ttl, me, :discarded)

{:next_state, :idle,
%CacheEntry.StateData{
data
| running_func_task: :undefined,
getters: [],
timer_discard: nil,
timer_func: new_timer
}}
end

def handle_event(_type, _event, _state, data) do
Expand Down
4 changes: 3 additions & 1 deletion lib/archethic/utils/hydrating_cache/caches_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ defmodule Archethic.Utils.HydratingCache.CachesManager do

@impl true
def handle_call({:new_service_sync, name, initial_keys}, _from, state) do
Logger.info("Starting new service sync : #{name}")

{:ok, pid} =
DynamicSupervisor.start_child(
state.caches_sup,
Expand All @@ -76,7 +78,7 @@ defmodule Archethic.Utils.HydratingCache.CachesManager do

@impl true
def handle_cast({:new_service_async, name, keys, _requester}, state) do
IO.inspect("Starting new service genserver #{name}")
Logger.info("Starting new service async : #{name}")

DynamicSupervisor.start_child(state.caches_sup, %{
id: name,
Expand Down
42 changes: 22 additions & 20 deletions lib/archethic/utils/hydrating_cache/hydrating_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ defmodule Archethic.Utils.HydratingCache do
Registers a function that will be computed periodically to update the cache.
Arguments:
- `hydrating_cache`: the pid of the hydrating cache.
- `fun`: a 0-arity function that computes the value and returns either
`{:ok, value}` or `{:error, reason}`.
- `key`: associated with the function and is used to retrieve the stored
value.
- `ttl` ("time to live"): how long (in milliseconds) the value is stored
before it is discarded if the value is not refreshed.
- `refresh_interval`: how often (in milliseconds) the function is
recomputed and the new value stored. `refresh_interval` must be strictly
smaller than `ttl`. After the value is refreshed, the `ttl` counter is
restarted.
- `ttl` ("time to live"): how long (in milliseconds) the value is stored
before it is discarded if the value is not refreshed.
The value is stored only if `{:ok, value}` is returned by `fun`. If `{:error,
reason}` is returned, the value is not stored and `fun` must be retried on
Expand All @@ -42,14 +44,14 @@ defmodule Archethic.Utils.HydratingCache do
hydrating_cache :: pid(),
fun :: (() -> {:ok, any()} | {:error, any()}),
key :: any,
ttl :: non_neg_integer(),
refresh_interval :: non_neg_integer()
refresh_interval :: non_neg_integer(),
ttl :: non_neg_integer()
) :: :ok
def register_function(hydrating_cache, fun, key, ttl, refresh_interval)
def register_function(hydrating_cache, fun, key, refresh_interval, ttl)
when is_function(fun, 0) and is_integer(ttl) and ttl > 0 and
is_integer(refresh_interval) and
refresh_interval < ttl do
GenServer.call(hydrating_cache, {:register, fun, key, ttl, refresh_interval})
GenServer.call(hydrating_cache, {:register, fun, key, refresh_interval, ttl})
end

@doc ~s"""
Expand All @@ -69,25 +71,25 @@ defmodule Archethic.Utils.HydratingCache do
:not_registered}`
"""
@spec get(atom(), any(), non_neg_integer(), Keyword.t()) :: result
def get(cache, key, timeout \\ 1_000, _opts \\ [])
def get(hydrating_cache, key, timeout \\ 1_000, _opts \\ [])
when is_integer(timeout) and timeout > 0 do
Logger.debug(
"Getting key #{inspect(key)} from hydrating cache #{inspect(cache)} for #{inspect(self())}"
"Getting key #{inspect(key)} from hydrating cache #{inspect(hydrating_cache)} for #{inspect(self())}"
)

case GenServer.call(cache, {:get, key}, timeout) do
case GenServer.call(hydrating_cache, {:get, key}, timeout) do
{:ok, :answer_delayed} ->
receive do
{:ok, value} ->
{:ok, value}

other ->
Logger.info("Unexpected return value #{inspect(other)}")
Logger.warning("Unexpected return value #{inspect(other)}")
{:error, :unexpected_value}
after
timeout ->
Logger.warn(
"Timeout waiting for delayed value for key #{inspect(key)} from hydrating cache #{inspect(cache)}"
Logger.warning(
"Timeout waiting for delayed value for key #{inspect(key)} from hydrating cache #{inspect(hydrating_cache)}"
)

{:error, :timeout}
Expand Down Expand Up @@ -120,10 +122,10 @@ defmodule Archethic.Utils.HydratingCache do
initial_keys_worker_sup,
keys,
fn
{provider, mod, func, params, refresh_rate} ->
{provider, mod, func, params, refresh_rate, ttl} ->
GenServer.cast(
me,
{:register, fn -> apply(mod, func, params) end, provider, 75_000, refresh_rate}
{:register, fn -> apply(mod, func, params) end, provider, refresh_rate, ttl}
)

other ->
Expand Down Expand Up @@ -153,7 +155,7 @@ defmodule Archethic.Utils.HydratingCache do
end
end

def handle_call({:register, fun, key, ttl, refresh_interval}, _from, state) do
def handle_call({:register, fun, key, refresh_interval, ttl}, _from, state) do
Logger.debug("Registering hydrating function for #{inspect(key)}")
## Called when asked to register a function
case Map.get(state, key) do
Expand All @@ -162,14 +164,14 @@ defmodule Archethic.Utils.HydratingCache do
{:ok, pid} =
DynamicSupervisor.start_child(
state.keys_sup,
{CacheEntry, [fun, key, ttl, refresh_interval]}
{CacheEntry, [fun, key, refresh_interval, ttl]}
)

{:reply, :ok, Map.put(state, key, pid)}

pid ->
## Key already exists, no need to start fsm
case :gen_statem.call(pid, {:register, fun, key, ttl, refresh_interval}) do
case :gen_statem.call(pid, {:register, fun, key, refresh_interval, ttl}) do
:ok ->
{:reply, :ok, Map.put(state, key, pid)}

Expand Down Expand Up @@ -197,22 +199,22 @@ defmodule Archethic.Utils.HydratingCache do
end
end

def handle_cast({:register, fun, key, ttl, refresh_interval}, state) do
def handle_cast({:register, fun, key, refresh_interval, ttl}, state) do
case Map.get(state, key) do
nil ->
## New key, we start a cache entry fsm

{:ok, pid} =
DynamicSupervisor.start_child(
state.keys_sup,
{CacheEntry, [fun, key, ttl, refresh_interval]}
{CacheEntry, [fun, key, refresh_interval, ttl]}
)

{:noreply, Map.put(state, key, pid)}

pid ->
## Key already exists, no need to start fsm
_ = :gen_statem.call(pid, {:register, fun, key, ttl, refresh_interval})
_ = :gen_statem.call(pid, {:register, fun, key, refresh_interval, ttl})
{:noreply, Map.put(state, key, pid)}
end
end
Expand Down
6 changes: 3 additions & 3 deletions test/archethic/utils/hydrating_cache/caches_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ defmodule HydratingCacheTest do

test "starting service from manager returns value once first hydrating have been done" do
CachesManager.new_service_async("test_services", [
{:key1, __MODULE__, :waiting_function, [2000], 6000},
{:key2, __MODULE__, :waiting_function, [1000], 6000},
{:key3, __MODULE__, :waiting_function, [2000], 6000}
{:key1, __MODULE__, :waiting_function, [2000], 6000, 8000},
{:key2, __MODULE__, :waiting_function, [1000], 6000, 8000},
{:key3, __MODULE__, :waiting_function, [2000], 6000, 8000}
])

## wait a little so at least keys are registered
Expand Down
Loading

0 comments on commit 75be47d

Please sign in to comment.