diff --git a/config/dev.exs b/config/dev.exs index 30d5e5f007..9497d3da11 100755 --- a/config/dev.exs +++ b/config/dev.exs @@ -75,17 +75,19 @@ config :archethic, Archethic.Crypto.NodeKeystore.Origin.TPMImpl end) +## Format : {service, [{key, module, function, args, refresh_interval_ms, ttl_ms}, ...]} +## If ttl = :infinity, the cache will never expire config :archethic, Archethic.Utils.HydratingCache.CachesManager, uco_service: [ {Archethic.OracleChain.Services.UCOPrice.Providers.CoinPaprika, Archethic.OracleChain.Services.UCOPrice.Providers.CoinPaprika, :fetch, [["usd", "eur"]], - 30000}, + 30000, :infinity}, {Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko, - Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko, :fetch, [["usd", "eur"]], - 30000}, + Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko, :fetch, [["usd", "eur"]], 30000, + :infinity}, {Archethic.OracleChain.Services.UCOPrice.Providers.CoinMarketCap, Archethic.OracleChain.Services.UCOPrice.Providers.CoinMarketCap, :fetch, [["usd", "eur"]], - 30000} + 30000, :infinity} ] config :archethic, Archethic.Governance.Pools, diff --git a/lib/archethic/utils/hydrating_cache/cache_entry.ex b/lib/archethic/utils/hydrating_cache/cache_entry.ex index 6b82926c56..9c7f144a6f 100644 --- a/lib/archethic/utils/hydrating_cache/cache_entry.ex +++ b/lib/archethic/utils/hydrating_cache/cache_entry.ex @@ -32,12 +32,12 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do use Task require Logger - def start_link([fun, key, ttl, refresh_interval]) do - :gen_statem.start_link(__MODULE__, [fun, key, ttl, refresh_interval], []) + def start_link([fun, key, refresh_interval, ttl]) do + :gen_statem.start_link(__MODULE__, [fun, key, refresh_interval, ttl], []) end @impl :gen_statem - def init([fun, key, ttl, refresh_interval]) do + def init([fun, key, refresh_interval, ttl]) do # start hydrating at the needed refresh interval timer = :timer.send_interval(refresh_interval, self(), :hydrate) @@ -113,12 +113,12 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do {:next_state, :running, data} end - def handle_event({:call, from}, {:register, fun, key, ttl, refresh_interval}, :running, data) do + def handle_event({:call, from}, {:register, fun, key, refresh_interval, ttl}, :running, data) do ## Registering a new hydrating function while previous one is running ## We stop the hydrating task if it is already running case data.running_func_task do - pid when is_pid(pid) -> Process.exit(pid, :brutal_kill) + pid when is_pid(pid) -> Process.exit(pid, :normal) _ -> :ok end @@ -141,7 +141,7 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do }, [{:reply, from, :ok}]} end - def handle_event({:call, from}, {:register, fun, key, ttl, refresh_interval}, _state, data) do + def handle_event({:call, from}, {:register, fun, key, refresh_interval, ttl}, _state, data) do ## Setting hydrating function in other cases ## Hydrating function not running, we just stop the timers _ = :timer.cancel(data.timer_func) @@ -155,7 +155,15 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do :refresh_interval => refresh_interval }) - timer = :timer.send_interval(refresh_interval, self(), :hydrate) + timer = + case ttl do + :infinity -> + nil + + value when is_number(value) -> + {:ok, t} = :timer.send_interval(refresh_interval, self(), :hydrate) + t + end ## We trigger the update ( to trigger or not could be set at registering option ) {:next_state, :running, @@ -181,6 +189,7 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do hydrating_task = spawn(fn -> + Logger.info("Running hydrating function for key :#{inspect(data.key)}") value = data.hydrating_func.() :gen_statem.cast(me, {:new_value, data.key, value}) end) @@ -196,7 +205,8 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do "Key :#{inspect(data.key)}, Hydrating func #{inspect(data.hydrating_func)} discarded" ) - {:next_state, state, %CacheEntry.StateData{data | value: :discarded}} + {:next_state, state, + %CacheEntry.StateData{data | value: {:error, :discarded}, timer_discard: nil}} end def handle_event(:cast, {:new_value, _key, {:ok, value}}, :running, data) do @@ -210,9 +220,7 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do send(pid, {:ok, value}) end) - ## We could do error control here, like unregistering the running func. - ## atm, we will keep it - _ = :timer.cancel(data.timer_discard) + ## Start timer to discard new value me = self() {:ok, new_timer} = :timer.send_after(data.ttl, me, :discarded) @@ -233,7 +241,18 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do "Key :#{inspect(data.key)}, Hydrating func #{inspect(data.hydrating_func)} got error value #{inspect({key, {:error, reason}})}" ) - {:next_state, :idle, %CacheEntry.StateData{data | running_func_task: :undefined}} + ## We reprogram the timer to hydrate, even if previous call failled. Error control could occur here + me = self() + {:ok, new_timer} = :timer.send_after(data.ttl, me, :discarded) + + {:next_state, :idle, + %CacheEntry.StateData{ + data + | running_func_task: :undefined, + getters: [], + timer_discard: nil, + timer_func: new_timer + }} end def handle_event(_type, _event, _state, data) do diff --git a/lib/archethic/utils/hydrating_cache/caches_manager.ex b/lib/archethic/utils/hydrating_cache/caches_manager.ex index d2cd76d0a7..0c2ba3d010 100644 --- a/lib/archethic/utils/hydrating_cache/caches_manager.ex +++ b/lib/archethic/utils/hydrating_cache/caches_manager.ex @@ -65,6 +65,8 @@ defmodule Archethic.Utils.HydratingCache.CachesManager do @impl true def handle_call({:new_service_sync, name, initial_keys}, _from, state) do + Logger.info("Starting new service sync : #{name}") + {:ok, pid} = DynamicSupervisor.start_child( state.caches_sup, @@ -76,7 +78,7 @@ defmodule Archethic.Utils.HydratingCache.CachesManager do @impl true def handle_cast({:new_service_async, name, keys, _requester}, state) do - IO.inspect("Starting new service genserver #{name}") + Logger.info("Starting new service async : #{name}") DynamicSupervisor.start_child(state.caches_sup, %{ id: name, diff --git a/lib/archethic/utils/hydrating_cache/hydrating_cache.ex b/lib/archethic/utils/hydrating_cache/hydrating_cache.ex index 444afc431b..63a29ca793 100644 --- a/lib/archethic/utils/hydrating_cache/hydrating_cache.ex +++ b/lib/archethic/utils/hydrating_cache/hydrating_cache.ex @@ -23,16 +23,18 @@ defmodule Archethic.Utils.HydratingCache do Registers a function that will be computed periodically to update the cache. Arguments: + - `hydrating_cache`: the pid of the hydrating cache. - `fun`: a 0-arity function that computes the value and returns either `{:ok, value}` or `{:error, reason}`. - `key`: associated with the function and is used to retrieve the stored value. - - `ttl` ("time to live"): how long (in milliseconds) the value is stored - before it is discarded if the value is not refreshed. - `refresh_interval`: how often (in milliseconds) the function is recomputed and the new value stored. `refresh_interval` must be strictly smaller than `ttl`. After the value is refreshed, the `ttl` counter is restarted. + - `ttl` ("time to live"): how long (in milliseconds) the value is stored + before it is discarded if the value is not refreshed. + The value is stored only if `{:ok, value}` is returned by `fun`. If `{:error, reason}` is returned, the value is not stored and `fun` must be retried on @@ -42,14 +44,14 @@ defmodule Archethic.Utils.HydratingCache do hydrating_cache :: pid(), fun :: (() -> {:ok, any()} | {:error, any()}), key :: any, - ttl :: non_neg_integer(), - refresh_interval :: non_neg_integer() + refresh_interval :: non_neg_integer(), + ttl :: non_neg_integer() ) :: :ok - def register_function(hydrating_cache, fun, key, ttl, refresh_interval) + def register_function(hydrating_cache, fun, key, refresh_interval, ttl) when is_function(fun, 0) and is_integer(ttl) and ttl > 0 and is_integer(refresh_interval) and refresh_interval < ttl do - GenServer.call(hydrating_cache, {:register, fun, key, ttl, refresh_interval}) + GenServer.call(hydrating_cache, {:register, fun, key, refresh_interval, ttl}) end @doc ~s""" @@ -69,25 +71,25 @@ defmodule Archethic.Utils.HydratingCache do :not_registered}` """ @spec get(atom(), any(), non_neg_integer(), Keyword.t()) :: result - def get(cache, key, timeout \\ 1_000, _opts \\ []) + def get(hydrating_cache, key, timeout \\ 1_000, _opts \\ []) when is_integer(timeout) and timeout > 0 do Logger.debug( - "Getting key #{inspect(key)} from hydrating cache #{inspect(cache)} for #{inspect(self())}" + "Getting key #{inspect(key)} from hydrating cache #{inspect(hydrating_cache)} for #{inspect(self())}" ) - case GenServer.call(cache, {:get, key}, timeout) do + case GenServer.call(hydrating_cache, {:get, key}, timeout) do {:ok, :answer_delayed} -> receive do {:ok, value} -> {:ok, value} other -> - Logger.info("Unexpected return value #{inspect(other)}") + Logger.warning("Unexpected return value #{inspect(other)}") {:error, :unexpected_value} after timeout -> - Logger.warn( - "Timeout waiting for delayed value for key #{inspect(key)} from hydrating cache #{inspect(cache)}" + Logger.warning( + "Timeout waiting for delayed value for key #{inspect(key)} from hydrating cache #{inspect(hydrating_cache)}" ) {:error, :timeout} @@ -120,10 +122,10 @@ defmodule Archethic.Utils.HydratingCache do initial_keys_worker_sup, keys, fn - {provider, mod, func, params, refresh_rate} -> + {provider, mod, func, params, refresh_rate, ttl} -> GenServer.cast( me, - {:register, fn -> apply(mod, func, params) end, provider, 75_000, refresh_rate} + {:register, fn -> apply(mod, func, params) end, provider, refresh_rate, ttl} ) other -> @@ -153,7 +155,7 @@ defmodule Archethic.Utils.HydratingCache do end end - def handle_call({:register, fun, key, ttl, refresh_interval}, _from, state) do + def handle_call({:register, fun, key, refresh_interval, ttl}, _from, state) do Logger.debug("Registering hydrating function for #{inspect(key)}") ## Called when asked to register a function case Map.get(state, key) do @@ -162,14 +164,14 @@ defmodule Archethic.Utils.HydratingCache do {:ok, pid} = DynamicSupervisor.start_child( state.keys_sup, - {CacheEntry, [fun, key, ttl, refresh_interval]} + {CacheEntry, [fun, key, refresh_interval, ttl]} ) {:reply, :ok, Map.put(state, key, pid)} pid -> ## Key already exists, no need to start fsm - case :gen_statem.call(pid, {:register, fun, key, ttl, refresh_interval}) do + case :gen_statem.call(pid, {:register, fun, key, refresh_interval, ttl}) do :ok -> {:reply, :ok, Map.put(state, key, pid)} @@ -197,7 +199,7 @@ defmodule Archethic.Utils.HydratingCache do end end - def handle_cast({:register, fun, key, ttl, refresh_interval}, state) do + def handle_cast({:register, fun, key, refresh_interval, ttl}, state) do case Map.get(state, key) do nil -> ## New key, we start a cache entry fsm @@ -205,14 +207,14 @@ defmodule Archethic.Utils.HydratingCache do {:ok, pid} = DynamicSupervisor.start_child( state.keys_sup, - {CacheEntry, [fun, key, ttl, refresh_interval]} + {CacheEntry, [fun, key, refresh_interval, ttl]} ) {:noreply, Map.put(state, key, pid)} pid -> ## Key already exists, no need to start fsm - _ = :gen_statem.call(pid, {:register, fun, key, ttl, refresh_interval}) + _ = :gen_statem.call(pid, {:register, fun, key, refresh_interval, ttl}) {:noreply, Map.put(state, key, pid)} end end diff --git a/test/archethic/utils/hydrating_cache/caches_manager_test.exs b/test/archethic/utils/hydrating_cache/caches_manager_test.exs index 4c0e850947..60fcc8a143 100644 --- a/test/archethic/utils/hydrating_cache/caches_manager_test.exs +++ b/test/archethic/utils/hydrating_cache/caches_manager_test.exs @@ -6,9 +6,9 @@ defmodule HydratingCacheTest do test "starting service from manager returns value once first hydrating have been done" do CachesManager.new_service_async("test_services", [ - {:key1, __MODULE__, :waiting_function, [2000], 6000}, - {:key2, __MODULE__, :waiting_function, [1000], 6000}, - {:key3, __MODULE__, :waiting_function, [2000], 6000} + {:key1, __MODULE__, :waiting_function, [2000], 6000, 8000}, + {:key2, __MODULE__, :waiting_function, [1000], 6000, 8000}, + {:key3, __MODULE__, :waiting_function, [2000], 6000, 8000} ]) ## wait a little so at least keys are registered diff --git a/test/archethic/utils/hydrating_cache/hydrating_cache_test.exs b/test/archethic/utils/hydrating_cache/hydrating_cache_test.exs index 1d4e591888..a080963ef9 100644 --- a/test/archethic/utils/hydrating_cache/hydrating_cache_test.exs +++ b/test/archethic/utils/hydrating_cache/hydrating_cache_test.exs @@ -18,8 +18,8 @@ defmodule HydratingCacheTest do {:ok, 1} end, "simple_func", - 50_000, - 10_000 + 10_000, + 15_000 ) assert result == :ok @@ -29,55 +29,10 @@ defmodule HydratingCacheTest do assert r == {:ok, 1} end - test "Getting value for key while function is running first time make process wait and return value" do - {:ok, pid} = HydratingCache.start_link(:test_service) - - result = - HydratingCache.register_function( - pid, - fn -> - Logger.info("Hydrating function Sleeping 3 secs") - :timer.sleep(3000) - {:ok, 1} - end, - "test_long_function", - 50_000, - 9000 - ) - - assert result == :ok - - r = HydratingCache.get(pid, "test_long_function", 10_000) - assert r == {:ok, 1} - end - - test "Getting value for key while function is running first time returns timeout after ttl" do - {:ok, pid} = HydratingCache.start_link(:test_service) - - result = - HydratingCache.register_function( - pid, - fn -> - Logger.info("Hydrating function Sleeping 3 secs") - :timer.sleep(3000) - {:ok, 1} - end, - "test_get_ttl", - 50_000, - 9000 - ) - - assert result == :ok - - ## get and wait up to 1 second - r = HydratingCache.get(pid, "test_get_ttl", 1000) - assert r == {:error, :timeout} - end - test "Hydrating function runs periodically" do {:ok, pid} = HydratingCache.start_link(:test_service) - :persistent_term.put("test", 0) + :persistent_term.put("test", 1) result = HydratingCache.register_function( @@ -90,16 +45,16 @@ defmodule HydratingCacheTest do {:ok, value} end, "test_inc", - 50000, - 1000 + 1_000, + 50_000 ) assert result == :ok - :timer.sleep(5000) + :timer.sleep(3000) {:ok, value} = HydratingCache.get(pid, "test_inc", 3000) - assert value >= 5 + assert value >= 3 end test "Update hydrating function while another one is running returns new hydrating value from new function" do @@ -113,8 +68,8 @@ defmodule HydratingCacheTest do {:ok, 1} end, "test_reregister", - 50000, - 10000 + 10_000, + 50_000 ) assert result == :ok @@ -126,8 +81,8 @@ defmodule HydratingCacheTest do {:ok, 2} end, "test_reregister", - 50000, - 10000 + 10_000, + 50_000 ) :timer.sleep(5000) @@ -146,8 +101,8 @@ defmodule HydratingCacheTest do {:ok, 1} end, "test_reregister", - 50000, - 10000 + 10_000, + 50_000 ) _ = @@ -158,8 +113,8 @@ defmodule HydratingCacheTest do {:ok, 2} end, "test_reregister", - 50000, - 10000 + 10_000, + 50_000 ) {:ok, value} = HydratingCache.get(pid, "test_reregister", 4000) @@ -178,8 +133,8 @@ defmodule HydratingCacheTest do {:ok, :result_timed} end, "timed_value", - 80000, - 70000 + 70_000, + 80_000 ) _ = @@ -189,8 +144,8 @@ defmodule HydratingCacheTest do {:ok, :result} end, "direct_value", - 80000, - 70000 + 70_000, + 80_000 ) ## We query the value with timeout smaller than timed function @@ -208,8 +163,8 @@ defmodule HydratingCacheTest do {:ok, :valid_result} end, "delayed_result", - 80000, - 70000 + 70_000, + 80_000 ) ## We query the value with timeout smaller than timed function @@ -227,13 +182,12 @@ defmodule HydratingCacheTest do {:ok, :valid_result} end, "delayed_result", - 80000, - 70000 + 70_000, + 80_000 ) ## We query the value with timeout smaller than timed function - result = HydratingCache.get(pid, "delayed_result", 1000) - assert result = {:error, :timeout} + assert {:error, :timeout} = HydratingCache.get(pid, "delayed_result", 1000) end test "Multiple process can wait for a delayed value" do @@ -249,8 +203,8 @@ defmodule HydratingCacheTest do {:ok, :valid_result} end, "delayed_result", - 80000, - 70000 + 70_000, + 80_000 ) ## We query the value with timeout smaller than timed function @@ -276,8 +230,8 @@ defmodule HydratingCacheTest do {:ok, :badmatch} end, :key, - 80000, - 70000 + 70_000, + 80_000 ) :timer.sleep(1000) @@ -290,11 +244,40 @@ defmodule HydratingCacheTest do {:ok, :value} end, :key, - 80000, - 70000 + 70_000, + 80_000 ) result = HydratingCache.get(pid, :key, 3000) assert result == {:ok, :value} end + + ## This could occur if hydrating function takes time to answer. + ## In this case, getting the value would return the old value, unless too + ## much time occur where it would be discarded because of ttl + test "value gets discarded after some time" do + {:ok, pid} = HydratingCache.start_link(:test_service) + + _ = + HydratingCache.register_function( + pid, + fn -> + case :persistent_term.get("flag", nil) do + 1 -> + :timer.sleep(3_000) + + nil -> + :persistent_term.put("flag", 1) + {:ok, :value} + end + end, + :key, + 500, + 1_000 + ) + + :timer.sleep(1_100) + result = HydratingCache.get(pid, :key, 3000) + assert result == {:error, :discarded} + end end