Skip to content

Commit

Permalink
Cleaning code
Browse files Browse the repository at this point in the history
  • Loading branch information
netboz committed Jan 21, 2023
1 parent b525ecc commit 04273c7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 61 deletions.
23 changes: 4 additions & 19 deletions lib/archethic/utils/hydrating_cache/cache_entry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do

@impl :gen_statem
def init([fun, key, ttl, refresh_interval]) do
# start hydrating at the needed refresh interval
timer = :timer.send_interval(refresh_interval, self(), :hydrate)
Logger.debug("Cache entry started for key #{inspect(key)}")

## Hydrate the value
{:ok, :running,
%CacheEntry.StateData{
Expand All @@ -62,20 +63,17 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
:idle,
data = %CacheEntry.StateData{:value => :"$$undefined"}
) do
IO.puts("Cache entry Call: value for key #{inspect(data.key)} when idle and no value")
## Value is requested while fsm is iddle, return the value

{:keep_state, data, [{:reply, from, {:error, :not_initialized}}]}
end

def handle_event({:call, from}, {:get, _requester}, :idle, data) do
IO.puts("Cache entry Call: value for key #{inspect(data.key)} is requested while idle")
## Value is requested while fsm is iddle, return the value
{:next_state, :idle, data, [{:reply, from, data.value}]}
end

def handle_event(:cast, {:get, requester}, :idle, data) do
IO.puts("Cache entry Cast: value for key #{inspect(data.key)} is requested while idle")
## Value is requested while fsm is iddle, return the value
send(requester, {:ok, data.value})
{:next_state, :idle, data}
Expand All @@ -89,10 +87,6 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
:running,
data = %CacheEntry.StateData{value: :"$$undefined"}
) do
IO.puts(
"Cache entry Call: value for key #{inspect(data.key)} is requested while running and no value #{inspect(requester)} self:#{inspect(self())}"
)

previous_getters = data.getters

{:keep_state, %CacheEntry.StateData{data | getters: previous_getters ++ [requester]},
Expand All @@ -102,10 +96,6 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
## Call for value while hydrating function is running and we have a previous value
## We return the value to caller
def handle_event({:call, from}, {:get, _requester}, :running, data) do
IO.puts(
"Cache entry Call: value for key #{inspect(data.key)} is requested while running and value"
)

{:next_state, :running, data, [{:reply, from, data.value}]}
end

Expand Down Expand Up @@ -152,7 +142,6 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
end

def handle_event({:call, from}, {:register, fun, key, ttl, refresh_interval}, _state, data) do
Logger.info("Registering new hydrating function #{inspect(fun)} for key #{inspect(key)}")
## Setting hydrating function in other cases
## Hydrating function not running, we just stop the timers
_ = :timer.cancel(data.timer_func)
Expand Down Expand Up @@ -210,18 +199,14 @@ defmodule Archethic.Utils.HydratingCache.CacheEntry do
{:next_state, state, %CacheEntry.StateData{data | value: :discarded}}
end

def handle_event(:cast, {:new_value, key, {:ok, value}}, :running, data) do
def handle_event(:cast, {:new_value, _key, {:ok, value}}, :running, data) do
## Stop timer on value ttl
_ = :timer.cancel(data.timer_discard)

## We got result from hydrating function
Logger.debug(
"Key :#{inspect(data.key)}, Hydrating func #{inspect(data.hydrating_func)} got value #{inspect({key, value})}"
)

## notify waiiting getters
## notify waiting getters
Enum.each(data.getters, fn {pid, _ref} ->
IO.puts("Sending value to #{inspect(pid)}")
send(pid, {:ok, value})
end)

Expand Down
8 changes: 5 additions & 3 deletions lib/archethic/utils/hydrating_cache/caches_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ defmodule Archethic.Utils.HydratingCache.CachesManager do
end

@doc """
Start a new hydrating cache process to hold the values from a service.
This is a synchronous call, it will block until the cache is ready ( all keys are hydrated )
Start a new hydrating cache process to hold the values from a service.
This is a synchronous call, it will block until the cache is started and
hydrating process for initial keys is started.
"""
@spec new_service_sync(
name :: String.t(),
initial_keys :: list()
) :: {:error, any} | {:ok, pid}
def new_service_sync(name, initial_keys) do
Logger.info("Starting new service sync #{name}")
GenServer.call(__MODULE__, {:new_service_sync, name, initial_keys})
end

Expand All @@ -28,7 +30,7 @@ defmodule Archethic.Utils.HydratingCache.CachesManager do
This is an asynchronous call, it will return immediately.
"""
def new_service_async(name, keys) do
IO.inspect("Starting new service #{name}")
Logger.info("Starting new service async #{name}")
GenServer.cast(__MODULE__, {:new_service_async, name, keys, self()})
end

Expand Down
44 changes: 5 additions & 39 deletions lib/archethic/utils/hydrating_cache/hydrating_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,13 @@ defmodule Archethic.Utils.HydratingCache do

case GenServer.call(cache, {:get, key}, timeout) do
{:ok, :answer_delayed} ->
Logger.debug(
"waiting for delayed value for key #{inspect(key)} from hydrating cache #{inspect(cache)} #{inspect(self())}"
)

receive do
{:ok, value} ->
Logger.debug(
"Got delayed value #{inspect(value)} for key #{inspect(key)} from hydrating cache #{inspect(cache)}"
)

{:ok, value}

other ->
Logger.info("Unexpected return value #{inspect(other)}")
# code
{:error, :unexpected_value}
after
timeout ->
Logger.warn(
Expand All @@ -102,10 +94,6 @@ defmodule Archethic.Utils.HydratingCache do
end

other_result ->
Logger.debug(
"Got value #{inspect(other_result)} for key #{inspect(key)} from hydrating cache #{inspect(cache)}"
)

other_result
end
end
Expand All @@ -127,28 +115,17 @@ defmodule Archethic.Utils.HydratingCache do
{:ok, initial_keys_worker_sup} = Task.Supervisor.start_link()

## Registering initial keys
Logger.info("Init Registering initial keys for #{inspect(name)}")

_ =
Task.Supervisor.async_stream_nolink(
initial_keys_worker_sup,
keys,
fn
{provider, mod, func, params, refresh_rate} ->
Logger.debug(
"Init asking Registering hydrating function. Provider: #{inspect(provider)} Hydrating function:
#{inspect(mod)}.#{inspect(func)}(#{inspect(params)}) Refresh rate: #{inspect(refresh_rate)}"
GenServer.cast(
me,
{:register, fn -> apply(mod, func, params) end, provider, 75_000, refresh_rate}
)

g =
GenServer.cast(
me,
{:register, fn -> apply(mod, func, params) end, provider, 75_000, refresh_rate}
)

Logger.debug("Finished requesting registration for #{inspect(provider)}")
g

other ->
Logger.error("Hydrating cache: Invalid configuration entry: #{inspect(other)}")
end,
Expand All @@ -157,10 +134,8 @@ defmodule Archethic.Utils.HydratingCache do
|> Stream.filter(&match?({:ok, {:ok, _}}, &1))
|> Enum.to_list()

Logger.info("Hydrating cache: Init Finished registering initial keys")
## stop the initial keys worker supervisor
Supervisor.stop(initial_keys_worker_sup)
Logger.info("Hydrating cache: Init Finished stopping initial keys worker supervisor")
{:ok, %{:keys => keys, keys_sup: keys_sup}}
end

Expand All @@ -169,15 +144,11 @@ defmodule Archethic.Utils.HydratingCache do
def handle_call({:get, key}, from, state) do
case Map.get(state, key, :undefined) do
:undefined ->
Logger.debug("HydratingCache no entry for #{inspect(state)}")
Logger.warning("HydratingCache no entry for #{inspect(state)}")
{:reply, {:error, :not_registered}, state}

pid ->
Logger.debug("HydratingCache found entry #{inspect(pid)} for #{inspect(self())}")
value = :gen_statem.call(pid, {:get, from})

Logger.debug("Cache entry returned #{inspect(value)}")

{:reply, value, state}
end
end
Expand All @@ -188,15 +159,12 @@ defmodule Archethic.Utils.HydratingCache do
case Map.get(state, key) do
nil ->
## New key, we start a cache entry fsm
Logger.debug("Starting cache entry for #{inspect(key)}")

{:ok, pid} =
DynamicSupervisor.start_child(
state.keys_sup,
{CacheEntry, [fun, key, ttl, refresh_interval]}
)

Logger.debug("Started cache entry for #{inspect({key, pid})}")
{:reply, :ok, Map.put(state, key, pid)}

pid ->
Expand Down Expand Up @@ -233,15 +201,13 @@ defmodule Archethic.Utils.HydratingCache do
case Map.get(state, key) do
nil ->
## New key, we start a cache entry fsm
Logger.debug("CAST Starting cache entry for #{inspect(key)}")

{:ok, pid} =
DynamicSupervisor.start_child(
state.keys_sup,
{CacheEntry, [fun, key, ttl, refresh_interval]}
)

Logger.debug("CAST Started cache entry for #{inspect({key, pid})}")
{:noreply, Map.put(state, key, pid)}

pid ->
Expand Down

0 comments on commit 04273c7

Please sign in to comment.