Skip to content

Commit

Permalink
HydratingCache now properly handles a timeout of the hydrate function (
Browse files Browse the repository at this point in the history
  • Loading branch information
bchamagne committed Sep 1, 2023
1 parent c2081c8 commit 77acb7e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 17 deletions.
31 changes: 17 additions & 14 deletions lib/archethic/oracle_chain/services/hydrating_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
use GenServer
@vsn Mix.Project.config()[:version]

alias Archethic.TaskSupervisor
alias Archethic.Utils

require Logger

defmodule State do
Expand Down Expand Up @@ -78,7 +80,7 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
}
) do
hydrating_task =
Task.async(fn ->
Task.Supervisor.async_nolink(TaskSupervisor, fn ->
try do
{:ok, apply(m, f, a)}
rescue
Expand All @@ -93,8 +95,9 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
{:noreply, %State{state | hydrating_task: hydrating_task}}
end

def handle_info({:kill_hydrating_task, task}, state) do
Task.shutdown(task, :brutal_kill)
def handle_info({:kill_hydrating_task, %Task{pid: pid}}, state) do
# Task.shutdown will not send DOWN msg
Process.exit(pid, :kill)

{:noreply, state}
end
Expand All @@ -103,7 +106,6 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
{ref, result},
state = %State{
mfa: {m, f, a},
refresh_interval: refresh_interval,
ttl_timer: ttl_timer,
ttl: ttl,
hydrating_task: %Task{ref: ref_task}
Expand All @@ -123,15 +125,7 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
nil
end

# start a new hydrate timer
hydrating_timer = Process.send_after(self(), :hydrate, next_tick_in_seconds(refresh_interval))

new_state = %{
state
| ttl_timer: ttl_timer,
hydrating_task: nil,
hydrating_timer: hydrating_timer
}
new_state = %{state | ttl_timer: ttl_timer}

case result do
{:ok, value} ->
Expand All @@ -143,7 +137,16 @@ defmodule Archethic.OracleChain.Services.HydratingCache do
end
end

def handle_info({:DOWN, _ref, :process, _, _}, state), do: {:noreply, state}
def handle_info(
{:DOWN, _ref, :process, _, _},
state = %State{refresh_interval: refresh_interval}
) do
# we always receive a DOWN on success/error/timeout
# so this is the best place to cleanup & start a new timer
hydrating_timer = Process.send_after(self(), :hydrate, next_tick_in_seconds(refresh_interval))

{:noreply, %{state | hydrating_task: nil, hydrating_timer: hydrating_timer}}
end

def handle_info(:discard_value, state) do
{:noreply, %State{state | value: nil, ttl_timer: nil}}
Expand Down
18 changes: 15 additions & 3 deletions test/archethic/oracle_chain/services/hydrating_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,31 @@ defmodule ArchethicCache.OracleChain.Services.HydratingCacheTest do
test "should kill the task if hydrating function takes longer than timeout" do
{:ok, pid} =
HydratingCache.start_link(
mfa: {Process, :sleep, [100]},
refresh_interval: 100,
hydrating_function_timeout: 10,
mfa: {Process, :sleep, [200]},
refresh_interval: 200,
hydrating_function_timeout: 100,
ttl: :infinity
)

:erlang.trace(pid, true, [:receive])

state_begin = :sys.get_state(pid)

Process.sleep(100)

assert_receive {:trace, ^pid, :receive, :hydrate}
assert_receive {:trace, ^pid, :receive, {:kill_hydrating_task, _}}

# check task has been killed but not genserver
refute Process.alive?(:sys.get_state(pid).hydrating_task.pid)
assert Process.alive?(pid)

# genserver still able to reply
assert :error = HydratingCache.get(pid)

Process.sleep(200)

# make sure a new timer has been started
assert :sys.get_state(pid).hydrating_timer != state_begin.hydrating_timer
end
end

0 comments on commit 77acb7e

Please sign in to comment.