Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pr 824 integrate hydrating cache oracle chain #854

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
a7232d7
Adding initial keys for hydrating caches
netboz Jan 18, 2023
d0f69c5
Adding hydrating cache code
netboz Jan 18, 2023
d0042ca
Adding cache to supervisor
netboz Jan 18, 2023
c9f70d1
Using cache in agregator
netboz Jan 18, 2023
fd20839
Increasing deault ttl
netboz Jan 18, 2023
121acc1
Fixing querying when no initial value plus bugfixes
netboz Jan 19, 2023
dfda3ac
Fixing specs
netboz Jan 19, 2023
1e7c4a9
Starting cache as first child
netboz Jan 19, 2023
05ec2ce
Bug fixes and refinement
netboz Jan 19, 2023
1525faf
Adding tests
netboz Jan 19, 2023
fc89bdb
Adding test for manager + refinement
netboz Jan 19, 2023
1de4d8f
Addressing comments
netboz Jan 20, 2023
45c2406
Uncreasing oracle service frequency to be closer to real use ( and re…
netboz Jan 21, 2023
aa0f0cb
Adding 3 secs timeout when fethcing value from cache
netboz Jan 21, 2023
76bf4f5
Managing case when value is requested but couldn't have been initialised
netboz Jan 21, 2023
fc63a3b
Adding Trace
netboz Jan 21, 2023
992276f
Adding cast to request hydrating function registration from api
netboz Jan 21, 2023
0b43778
Adding tests
netboz Jan 21, 2023
b58c91d
Cleaning code
netboz Jan 21, 2023
f9c3d50
Adding timeout support for ttl plus refinement
netboz Jan 22, 2023
4df5ba5
Using prior value if no new value for a curreny ( fixes #836 )
netboz Jan 23, 2023
bfe85bd
Fixing TTL bug
netboz Jan 23, 2023
2a8508f
Fixing legacy tests + bugfixes
netboz Jan 23, 2023
6f31aee
Fixing credo warnings
netboz Jan 23, 2023
c1271cf
Fixing Credo second round
netboz Jan 23, 2023
fb07570
Fixing formatting
netboz Jan 23, 2023
8527cce
Fixing @impl syntax
netboz Jan 23, 2023
52bc9ff
Fixing unref variable...
netboz Jan 23, 2023
f109561
Formatting
netboz Jan 23, 2023
448917c
Fixing incorrect module name
netboz Jan 23, 2023
e3a2084
Fixing legacy tests
netboz Jan 23, 2023
6f33f1d
Reverting changes on cache manager
netboz Jan 23, 2023
62a86e8
Removing cache manager, better hydrating cache name management
netboz Jan 31, 2023
b184d84
Adressing Credo remarks
netboz Jan 31, 2023
2a1cc08
Addressing MR comments plus bugfixes
netboz Feb 6, 2023
e3a4270
Adding get_all to get all keys from hydrating cache plus tests
netboz Feb 6, 2023
cc167f8
Removing trace
netboz Feb 6, 2023
21fe8e8
Fixing unused variable
netboz Feb 6, 2023
12099ff
Preventing pipeline race condition
netboz Feb 6, 2023
deba1f5
Move HydratingCache into ArchethicCache folder
bchamagne Mar 1, 2023
cf8282e
Refactor hydrating cache configuration
bchamagne Mar 1, 2023
5b4a6c2
replace erlang timer with elixir's
bchamagne Mar 2, 2023
bc7fed6
remove obsolete mocks
bchamagne Mar 2, 2023
f1965ea
300ms tests instead of 21s
bchamagne Mar 2, 2023
3c895d8
Refactor
samuelmanzanera Mar 10, 2023
ef4047c
Improve task monitoring and tests
samuelmanzanera Mar 17, 2023
04706b3
Hydrating Cache accepts CRON intervals
bchamagne Mar 22, 2023
ef646d7
Add a timeout for the hydrating function
bchamagne Mar 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ config :archethic, Archethic.OracleChain,
]

config :archethic, Archethic.OracleChain.Services.UCOPrice,
providers: [
Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko,
Archethic.OracleChain.Services.UCOPrice.Providers.CoinMarketCap,
Archethic.OracleChain.Services.UCOPrice.Providers.CoinPaprika
]
providers: %{
# Coingecko limits to 10-30 calls, with 30s delay we would be under the limitation
Archethic.OracleChain.Services.UCOPrice.Providers.Coingecko => [refresh_interval: 30_000],
Archethic.OracleChain.Services.UCOPrice.Providers.CoinMarketCap => [refresh_interval: 10_000],
# Coinpaprika limits to 25K req/mo; with 2min delay we can reach ~21K
Archethic.OracleChain.Services.UCOPrice.Providers.CoinPaprika => [refresh_interval: 120_000]
}

config :archethic, ArchethicWeb.FaucetController,
seed:
Expand Down
12 changes: 11 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,18 @@ config :archethic, Archethic.OracleChain.Scheduler,
polling_interval: "0 0 * * * *",
summary_interval: "0 0 0 * * *"

config :archethic, Archethic.OracleChain.ServiceCacheSupervisor, enabled: false

config :archethic, Archethic.OracleChain,
services: [
uco: MockUCOPrice
]

config :archethic, Archethic.OracleChain.Services.UCOPrice,
providers: [MockUCOPriceProvider1, MockUCOPriceProvider2, MockUCOPriceProvider3]
providers: %{
MockUCOProvider1 => [refresh_interval: 1000],
MockUCOProvider2 => [refresh_interval: 1000]
}

# -----Start-of-Networking-tests-configs-----

Expand Down
12 changes: 11 additions & 1 deletion lib/archethic/oracle_chain/services.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Archethic.OracleChain.Services do
def fetch_new_data(previous_content \\ %{}) do
Enum.map(services(), fn {service, handler} ->
Logger.debug("Fetching #{service} oracle data...")
{service, apply(handler, :fetch, [])}
{service, handler.fetch()}
end)
|> Enum.filter(fn
{service, {:ok, data}} ->
Expand Down Expand Up @@ -82,4 +82,14 @@ defmodule Archethic.OracleChain.Services do
defp services do
Application.get_env(:archethic, Archethic.OracleChain) |> Keyword.fetch!(:services)
end

@doc """
List all the service cache supervisor specs
"""
@spec cache_service_supervisor_specs() :: list(Supervisor.child_spec())
def cache_service_supervisor_specs do
Enum.map(services(), fn {_service_name, handler} ->
handler.cache_child_spec()
end)
end
end
16 changes: 16 additions & 0 deletions lib/archethic/oracle_chain/services/cache_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Archethic.OracleChain.ServiceCacheSupervisor do
@moduledoc false

use Supervisor

alias Archethic.OracleChain.Services

def start_link(arg \\ []) do
Supervisor.start_link(__MODULE__, arg)
end

def init(_arg) do
children = Services.cache_service_supervisor_specs()
Supervisor.init(children, strategy: :one_for_one)
end
end
158 changes: 158 additions & 0 deletions lib/archethic/oracle_chain/services/hydrating_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
defmodule Archethic.OracleChain.Services.HydratingCache do
@moduledoc """
This module is responsible for :
- Run the hydrating function associated with this key at a given interval
- Discard the value after some time
- Return the value when requested
"""
use GenServer

alias Archethic.Utils
require Logger

defmodule State do
@moduledoc false
defstruct([
:mfa,
:ttl,
:ttl_timer,
# refresh_interval :: Int | CronInterval
:refresh_interval,
:value,
:hydrating_task,
:hydrating_timer,
:hydrating_function_timeout
])
end

@spec start_link(keyword()) ::
{:ok, GenServer.on_start()} | {:error, term()}
def start_link(arg \\ []) do
GenServer.start_link(__MODULE__, arg, Keyword.take(arg, [:name]))
end

@spec get(GenServer.server(), integer()) :: {:ok, any()} | :error
def get(server, timeout \\ 5000) do
try do
GenServer.call(server, :get, timeout)
catch
:exit, {:timeout, _} ->
:error
end
end

def init(options) do
refresh_interval = Keyword.fetch!(options, :refresh_interval)
mfa = Keyword.fetch!(options, :mfa)
ttl = Keyword.get(options, :ttl, :infinity)
hydrating_function_timeout = Keyword.get(options, :hydrating_function_timeout, 5000)

# start hydrating as soon as init is done
hydrating_timer = Process.send_after(self(), :hydrate, 0)

## Hydrate the value
{:ok,
%State{
mfa: mfa,
ttl: ttl,
hydrating_function_timeout: hydrating_function_timeout,
refresh_interval: refresh_interval,
hydrating_timer: hydrating_timer
}}
end

def handle_call(:get, _from, state = %State{value: nil}) do
{:reply, :error, state}
end

def handle_call(:get, _from, state = %State{value: value}) when value != nil do
{:reply, {:ok, value}, state}
end

def handle_info(
:hydrate,
state = %State{
hydrating_function_timeout: hydrating_function_timeout,
mfa: {m, f, a}
}
) do
hydrating_task =
Task.async(fn ->
try do
{:ok, apply(m, f, a)}
rescue
e ->
{:error, e}
end
end)

# we make sure that our hydrating function does not hang
Process.send_after(self(), {:kill_hydrating_task, hydrating_task}, hydrating_function_timeout)

{:noreply, %State{state | hydrating_task: hydrating_task}}
end

def handle_info({:kill_hydrating_task, task}, state) do
Task.shutdown(task, :brutal_kill)

{:noreply, state}
end

def handle_info(
{ref, result},
state = %State{
mfa: {m, f, a},
refresh_interval: refresh_interval,
ttl_timer: ttl_timer,
ttl: ttl,
hydrating_task: %Task{ref: ref_task}
}
)
when ref == ref_task do
# cancel current ttl if any
if is_reference(ttl_timer) do
Process.cancel_timer(ttl_timer)
end

# start new ttl timer
ttl_timer =
if is_integer(ttl) do
Process.send_after(self(), :discard_value, ttl)
else
nil
end

# start a new hydrate timer
hydrating_timer = Process.send_after(self(), :hydrate, next_tick_in_seconds(refresh_interval))

new_state = %{
state
| ttl_timer: ttl_timer,
hydrating_task: nil,
hydrating_timer: hydrating_timer
}

case result do
{:ok, value} ->
{:noreply, %{new_state | value: value}}

{:error, reason} ->
Logger.error("#{m}.#{f}.#{inspect(a)} returns an error: #{inspect(reason)}")
{:noreply, new_state}
end
end

def handle_info({:DOWN, _ref, :process, _, _}, state), do: {:noreply, state}

def handle_info(:discard_value, state) do
{:noreply, %State{state | value: nil, ttl_timer: nil}}
end

defp next_tick_in_seconds(refresh_interval) do
if is_binary(refresh_interval) do
samuelmanzanera marked this conversation as resolved.
Show resolved Hide resolved
Utils.time_offset(refresh_interval) * 1000
else
refresh_interval
end
end
end
1 change: 1 addition & 0 deletions lib/archethic/oracle_chain/services/impl.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Archethic.OracleChain.Services.Impl do
@moduledoc false

@callback cache_child_spec() :: Supervisor.child_spec()
@callback fetch() :: {:ok, %{required(String.t()) => any()}} | {:error, any()}
@callback verify?(%{required(String.t()) => any()}) :: boolean
@callback parse_data(map()) :: {:ok, map()} | :error
Expand Down
56 changes: 56 additions & 0 deletions lib/archethic/oracle_chain/services/provider_cache_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule Archethic.OracleChain.Services.ProviderCacheSupervisor do
@moduledoc """
Supervise the several self-hydrating cache for the providers
"""

use Supervisor

alias Archethic.OracleChain.Services.HydratingCache

def start_link(arg) do
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
end

def init(arg) do
fetch_args = Keyword.fetch!(arg, :fetch_args)
providers = Keyword.fetch!(arg, :providers)

provider_child_specs =
Enum.map(providers, fn {provider, opts} ->
refresh_interval = Keyword.get(opts, :refresh_interval, 60_000)

Supervisor.child_spec(
{HydratingCache,
[
refresh_interval: refresh_interval,
mfa: {provider, :fetch, [fetch_args]},
name: cache_name(provider)
]},
id: cache_name(provider)
)
end)

children = provider_child_specs

Supervisor.init(
children,
strategy: :one_for_one
)
end

defp cache_name(module), do: :"#{module}Cache"

@doc """
Return the values from the several provider caches
"""
@spec get_values(list(module())) :: list(any())
def get_values(providers) do
providers
|> Enum.map(fn {provider, _} -> cache_name(provider) end)
|> Enum.map(&HydratingCache.get/1)
|> Enum.filter(&match?({:ok, {:ok, _}}, &1))
|> Enum.map(fn
{:ok, {:ok, val}} -> val
end)
end
end
Loading