Skip to content

Commit

Permalink
Pr 824 integrate hydrating cache oracle chain (#854)
Browse files Browse the repository at this point in the history
* Adding initial keys for hydrating caches

* Adding hydrating cache code

* Adding cache to supervisor

* Using cache in agregator

* Increasing deault ttl

* Fixing querying when no initial value plus bugfixes

* Starting cache as first child

* Adding tests

* Adding test for manager + refinement

* Addressing comments

* Uncreasing oracle service frequency to be closer to real use ( and removing coco service )

* Adding 3 secs timeout when fethcing value from cache

* Managing case when value is requested but couldn't have been initialised

* Adding cast to request hydrating function registration from api

* Adding tests

* Adding timeout support for ttl plus refinement

* Using prior value if no new value for a curreny ( fixes #836 )

* Fixing TTL bug

* Reverting changes on cache manager

* Removing cache manager, better hydrating cache name management

* Addressing MR comments plus bugfixes

* Adding get_all to get all keys from hydrating cache plus tests

* Preventing pipeline race condition

* Move HydratingCache into ArchethicCache folder

* replace erlang timer with elixir's

* Improve task monitoring and tests

The renewal of the hydrating message is done after the task is completed
The timers are persisted in the state to have a better monitoring
The tests are now using :erlang.trace to avoid undeterministic behaviors
based on time.

* Hydrating Cache accepts CRON intervals

* Add a timeout for the hydrating function

---------

Co-authored-by: Bastien CHAMAGNE <bastien@chamagne.fr>
Co-authored-by: Samuel Manzanera <samuelmanzanera@protonmail.com>
  • Loading branch information
3 people committed Mar 24, 2023
1 parent 3434f2f commit 0a37ad0
Show file tree
Hide file tree
Showing 16 changed files with 663 additions and 348 deletions.
12 changes: 7 additions & 5 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ config :archethic, Archethic.OracleChain,
]

config :archethic, Archethic.OracleChain.Services.UCOPrice,
providers: [
Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko,
Archethic.OracleChain.Services.UCOPrice.Providers.CoinMarketCap,
Archethic.OracleChain.Services.UCOPrice.Providers.CoinPaprika
]
providers: %{
# Coingecko limits to 10-30 calls, with 30s delay we would be under the limitation
Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko => [refresh_interval: 30_000],
Archethic.OracleChain.Services.UCOPrice.Providers.CoinMarketCap => [refresh_interval: 10_000],
# Coinpaprika limits to 25K req/mo; with 2min delay we can reach ~21K
Archethic.OracleChain.Services.UCOPrice.Providers.CoinPaprika => [refresh_interval: 120_000]
}

config :archethic, ArchethicWeb.FaucetController,
seed:
Expand Down
12 changes: 11 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,18 @@ config :archethic, Archethic.OracleChain.Scheduler,
polling_interval: "0 0 * * * *",
summary_interval: "0 0 0 * * *"

config :archethic, Archethic.OracleChain.ServiceCacheSupervisor, enabled: false

config :archethic, Archethic.OracleChain,
services: [
uco: MockUCOPrice
]

config :archethic, Archethic.OracleChain.Services.UCOPrice,
providers: [MockUCOPriceProvider1, MockUCOPriceProvider2, MockUCOPriceProvider3]
providers: %{
MockUCOProvider1 => [refresh_interval: 1000],
MockUCOProvider2 => [refresh_interval: 1000]
}

# -----Start-of-Networking-tests-configs-----

Expand Down
12 changes: 11 additions & 1 deletion lib/archethic/oracle_chain/services.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Archethic.OracleChain.Services do
def fetch_new_data(previous_content \\ %{}) do
Enum.map(services(), fn {service, handler} ->
Logger.debug("Fetching #{service} oracle data...")
{service, apply(handler, :fetch, [])}
{service, handler.fetch()}
end)
|> Enum.filter(fn
{service, {:ok, data}} ->
Expand Down Expand Up @@ -82,4 +82,14 @@ defmodule Archethic.OracleChain.Services do
defp services do
Application.get_env(:archethic, Archethic.OracleChain) |> Keyword.fetch!(:services)
end

@doc """
List all the service cache supervisor specs
"""
@spec cache_service_supervisor_specs() :: list(Supervisor.child_spec())
def cache_service_supervisor_specs do
Enum.map(services(), fn {_service_name, handler} ->
handler.cache_child_spec()
end)
end
end
16 changes: 16 additions & 0 deletions lib/archethic/oracle_chain/services/cache_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Archethic.OracleChain.ServiceCacheSupervisor do
@moduledoc false

use Supervisor

alias Archethic.OracleChain.Services

def start_link(arg \\ []) do
Supervisor.start_link(__MODULE__, arg)
end

def init(_arg) do
children = Services.cache_service_supervisor_specs()
Supervisor.init(children, strategy: :one_for_one)
end
end
158 changes: 158 additions & 0 deletions lib/archethic/oracle_chain/services/hydrating_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
defmodule Archethic.OracleChain.Services.HydratingCache do
@moduledoc """
This module is responsible for :
- Run the hydrating function associated with this key at a given interval
- Discard the value after some time
- Return the value when requested
"""
use GenServer

alias Archethic.Utils
require Logger

defmodule State do
@moduledoc false
defstruct([
:mfa,
:ttl,
:ttl_timer,
# refresh_interval :: Int | CronInterval
:refresh_interval,
:value,
:hydrating_task,
:hydrating_timer,
:hydrating_function_timeout
])
end

@spec start_link(keyword()) ::
{:ok, GenServer.on_start()} | {:error, term()}
def start_link(arg \\ []) do
GenServer.start_link(__MODULE__, arg, Keyword.take(arg, [:name]))
end

@spec get(GenServer.server(), integer()) :: {:ok, any()} | :error
def get(server, timeout \\ 5000) do
try do
GenServer.call(server, :get, timeout)
catch
:exit, {:timeout, _} ->
:error
end
end

def init(options) do
refresh_interval = Keyword.fetch!(options, :refresh_interval)
mfa = Keyword.fetch!(options, :mfa)
ttl = Keyword.get(options, :ttl, :infinity)
hydrating_function_timeout = Keyword.get(options, :hydrating_function_timeout, 5000)

# start hydrating as soon as init is done
hydrating_timer = Process.send_after(self(), :hydrate, 0)

## Hydrate the value
{:ok,
%State{
mfa: mfa,
ttl: ttl,
hydrating_function_timeout: hydrating_function_timeout,
refresh_interval: refresh_interval,
hydrating_timer: hydrating_timer
}}
end

def handle_call(:get, _from, state = %State{value: nil}) do
{:reply, :error, state}
end

def handle_call(:get, _from, state = %State{value: value}) when value != nil do
{:reply, {:ok, value}, state}
end

def handle_info(
:hydrate,
state = %State{
hydrating_function_timeout: hydrating_function_timeout,
mfa: {m, f, a}
}
) do
hydrating_task =
Task.async(fn ->
try do
{:ok, apply(m, f, a)}
rescue
e ->
{:error, e}
end
end)

# we make sure that our hydrating function does not hang
Process.send_after(self(), {:kill_hydrating_task, hydrating_task}, hydrating_function_timeout)

{:noreply, %State{state | hydrating_task: hydrating_task}}
end

def handle_info({:kill_hydrating_task, task}, state) do
Task.shutdown(task, :brutal_kill)

{:noreply, state}
end

def handle_info(
{ref, result},
state = %State{
mfa: {m, f, a},
refresh_interval: refresh_interval,
ttl_timer: ttl_timer,
ttl: ttl,
hydrating_task: %Task{ref: ref_task}
}
)
when ref == ref_task do
# cancel current ttl if any
if is_reference(ttl_timer) do
Process.cancel_timer(ttl_timer)
end

# start new ttl timer
ttl_timer =
if is_integer(ttl) do
Process.send_after(self(), :discard_value, ttl)
else
nil
end

# start a new hydrate timer
hydrating_timer = Process.send_after(self(), :hydrate, next_tick_in_seconds(refresh_interval))

new_state = %{
state
| ttl_timer: ttl_timer,
hydrating_task: nil,
hydrating_timer: hydrating_timer
}

case result do
{:ok, value} ->
{:noreply, %{new_state | value: value}}

{:error, reason} ->
Logger.error("#{m}.#{f}.#{inspect(a)} returns an error: #{inspect(reason)}")
{:noreply, new_state}
end
end

def handle_info({:DOWN, _ref, :process, _, _}, state), do: {:noreply, state}

def handle_info(:discard_value, state) do
{:noreply, %State{state | value: nil, ttl_timer: nil}}
end

defp next_tick_in_seconds(refresh_interval) do
if is_binary(refresh_interval) do
Utils.time_offset(refresh_interval) * 1000
else
refresh_interval
end
end
end
1 change: 1 addition & 0 deletions lib/archethic/oracle_chain/services/impl.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Archethic.OracleChain.Services.Impl do
@moduledoc false

@callback cache_child_spec() :: Supervisor.child_spec()
@callback fetch() :: {:ok, %{required(String.t()) => any()}} | {:error, any()}
@callback verify?(%{required(String.t()) => any()}) :: boolean
@callback parse_data(map()) :: {:ok, map()} | :error
Expand Down
56 changes: 56 additions & 0 deletions lib/archethic/oracle_chain/services/provider_cache_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule Archethic.OracleChain.Services.ProviderCacheSupervisor do
@moduledoc """
Supervise the several self-hydrating cache for the providers
"""

use Supervisor

alias Archethic.OracleChain.Services.HydratingCache

def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end

def init(arg) do
fetch_args = Keyword.fetch!(arg, :fetch_args)
providers = Keyword.fetch!(arg, :providers)

provider_child_specs =
Enum.map(providers, fn {provider, opts} ->
refresh_interval = Keyword.get(opts, :refresh_interval, 60_000)

Supervisor.child_spec(
{HydratingCache,
[
refresh_interval: refresh_interval,
mfa: {provider, :fetch, [fetch_args]},
name: cache_name(provider)
]},
id: cache_name(provider)
)
end)

children = provider_child_specs

Supervisor.init(
children,
strategy: :one_for_one
)
end

defp cache_name(module), do: :"#{module}Cache"

@doc """
Return the values from the several provider caches
"""
@spec get_values(list(module())) :: list(any())
def get_values(providers) do
providers
|> Enum.map(fn {provider, _} -> cache_name(provider) end)
|> Enum.map(&HydratingCache.get/1)
|> Enum.filter(&match?({:ok, {:ok, _}}, &1))
|> Enum.map(fn
{:ok, {:ok, val}} -> val
end)
end
end
Loading

0 comments on commit 0a37ad0

Please sign in to comment.