Skip to content

Commit

Permalink
Improve task monitoring and tests
Browse files Browse the repository at this point in the history
The renewal of the hydrating message is done after the task is completed
The timers are persisted in the state to have a better monitoring
The tests are now using :erlang.trace to avoid undeterministic behaviors
based on time.
  • Loading branch information
samuelmanzanera authored and Neylix committed Mar 24, 2023
1 parent 3c895d8 commit ef4047c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 28 deletions.
49 changes: 32 additions & 17 deletions lib/archethic/oracle_chain/services/hydrating_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,18 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
"""
use GenServer

require Logger

defmodule State do
@moduledoc false
defstruct([
:mfa,
:ttl,
:ttl_ref,
:ttl_timer,
:refresh_interval,
:value,
:hydrating_task
:hydrating_task,
:hydrating_timer
])
end

Expand All @@ -41,14 +44,15 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
ttl = Keyword.get(options, :ttl, :infinity)

# start hydrating as soon as init is done
Process.send_after(self(), :hydrate, 0)
hydrating_timer = Process.send_after(self(), :hydrate, 0)

## Hydrate the value
{:ok,
%State{
mfa: mfa,
ttl: ttl,
refresh_interval: refresh_interval
refresh_interval: refresh_interval,
hydrating_timer: hydrating_timer
}}
end

Expand All @@ -63,7 +67,6 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
def handle_info(
:hydrate,
state = %State{
refresh_interval: refresh_interval,
mfa: {m, f, a}
}
) do
Expand All @@ -72,49 +75,61 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
try do
{:ok, apply(m, f, a)}
rescue
_ ->
:error
e ->
{:error, e}
end
end)

# start a new hydrate timer
Process.send_after(self(), :hydrate, refresh_interval)

{:noreply, %State{state | hydrating_task: hydrating_task}}
end

def handle_info(
{ref, result},
state = %State{ttl_ref: ttl_ref, ttl: ttl, hydrating_task: %Task{ref: ref_task}}
state = %State{
mfa: {m, f, a},
refresh_interval: refresh_interval,
ttl_timer: ttl_timer,
ttl: ttl,
hydrating_task: %Task{ref: ref_task}
}
)
when ref == ref_task do
# cancel current ttl if any
if is_reference(ttl_ref) do
Process.cancel_timer(ttl_ref)
if is_reference(ttl_timer) do
Process.cancel_timer(ttl_timer)
end

# start new ttl timer
ttl_ref =
ttl_timer =
if is_integer(ttl) do
Process.send_after(self(), :discard_value, ttl)
else
nil
end

new_state = %{state | ttl_ref: ttl_ref, hydrating_task: nil}
# start a new hydrate timer
hydrating_timer = Process.send_after(self(), :hydrate, refresh_interval)

new_state = %{
state
| ttl_timer: ttl_timer,
hydrating_task: nil,
hydrating_timer: hydrating_timer
}

case result do
{:ok, value} ->
{:noreply, %{new_state | value: value}}

_ ->
{:error, reason} ->
Logger.error("#{m}.#{f}.#{inspect(a)} returns an error: #{inspect(reason)}")
{:noreply, new_state}
end
end

def handle_info({:DOWN, _ref, :process, _, _}, state), do: {:noreply, state}

def handle_info(:discard_value, state) do
{:noreply, %State{state | value: nil, ttl_ref: nil}}
{:noreply, %State{state | value: nil, ttl_timer: nil}}
end
end
28 changes: 17 additions & 11 deletions test/archethic/oracle_chain/services/hydrating_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule ArchethicCache.OracleChain.Services.HydratingCacheTest do
alias Archethic.OracleChain.Services.HydratingCache

use ExUnit.Case
@moduletag capture_log: true

test "should receive the same value until next refresh" do
{:ok, pid} =
Expand All @@ -11,7 +12,8 @@ defmodule ArchethicCache.OracleChain.Services.HydratingCacheTest do
ttl: :infinity
)

# 1ms required just so it has the time to receive the :hydrate msg
:erlang.trace(pid, true, [:receive])
assert_receive {:trace, ^pid, :receive, :hydrate}
Process.sleep(1)
{:ok, date} = HydratingCache.get(pid)

Expand All @@ -27,7 +29,9 @@ defmodule ArchethicCache.OracleChain.Services.HydratingCacheTest do
Process.sleep(10)
assert {:ok, ^date} = HydratingCache.get(pid)

Process.sleep(100)
assert_receive {:trace, ^pid, :receive, {:DOWN, _, _, _, _}}
assert_receive {:trace, ^pid, :receive, :hydrate}
Process.sleep(1)
{:ok, date2} = HydratingCache.get(pid)
assert date != date2
end
Expand All @@ -40,9 +44,11 @@ defmodule ArchethicCache.OracleChain.Services.HydratingCacheTest do
ttl: 50
)

# 1ms required just so it has the time to receive the :hydrate msg
:erlang.trace(pid, true, [:receive])
assert_receive {:trace, ^pid, :receive, :hydrate}
Process.sleep(1)
{:ok, _date} = HydratingCache.get(pid)

assert {:ok, _date} = HydratingCache.get(pid)

Process.sleep(50)
assert :error = HydratingCache.get(pid)
Expand All @@ -56,7 +62,8 @@ defmodule ArchethicCache.OracleChain.Services.HydratingCacheTest do
ttl: 50
)

# 1ms required just so it has the time to receive the :hydrate msg
:erlang.trace(pid, true, [:receive])
assert_receive {:trace, ^pid, :receive, :hydrate}
Process.sleep(1)
{:ok, date} = HydratingCache.get(pid)

Expand All @@ -77,11 +84,9 @@ defmodule ArchethicCache.OracleChain.Services.HydratingCacheTest do
ttl: 50
)

# 1ms required just so it has the time to receive the :hydrate msg
Process.sleep(1)
assert :error = HydratingCache.get(pid)

Process.sleep(50)
:erlang.trace(pid, true, [:receive])
assert_receive {:trace, ^pid, :receive, :hydrate}
assert_receive {:trace, ^pid, :receive, {_ref, {:error, %UndefinedFunctionError{}}}}
assert :error = HydratingCache.get(pid)

Process.sleep(100)
Expand All @@ -96,7 +101,8 @@ defmodule ArchethicCache.OracleChain.Services.HydratingCacheTest do
ttl: :infinity
)

# 1ms required just so it has the time to receive the :hydrate msg
:erlang.trace(pid, true, [:receive])
assert_receive {:trace, ^pid, :receive, :hydrate}
Process.sleep(1)

assert :error = HydratingCache.get(pid, 1)
Expand Down

0 comments on commit ef4047c

Please sign in to comment.