Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AEWeb cache (files and transactions) #744

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ config :archethic, Archethic.Networking.IPLookup.RemoteDiscovery,

# -----End-of-Networking-configs ------

config :archethic_web,
# The tx_cache is stored on RAM
# 750MB should hold a minimum 250 transactions
tx_cache_bytes: 750 * 1024 * 1024,

# The file_cache is stored on DISK
# 5GB should hold 2000 average size pages
# https://httparchive.org/reports/page-weight
file_cache_bytes: 5 * 1024 * 1024 * 1024

config :esbuild,
version: "0.12.18",
default: [
Expand Down
20 changes: 19 additions & 1 deletion lib/archethic/metrics/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,16 @@ defmodule Archethic.Metrics.Parser do
...> name: "vm_memory_atom",
...> type: "gauge"
...> },
...> %{
...> metrics: [%{value: 4}, %{value: 1}],
...> name: "cache_hit",
...> type: "counter"
...> }
...> ] |> Parser.reduce_metrics()
%{
"archethic_contract_parsing_duration" => %{count: 2, sum: 10.0},
"vm_memory_atom" => 1589609
"vm_memory_atom" => 1589609,
"cache_hit" => 5
}
"""
@spec reduce_metrics(list(metric())) ::
Expand All @@ -287,9 +293,21 @@ defmodule Archethic.Metrics.Parser do

metric = %{type: "gauge"}, acc ->
Map.merge(acc, map_gauge(metric))

metric = %{type: "counter"}, acc ->
Map.merge(acc, map_counter(metric))
end)
end

defp map_counter(%{name: name, metrics: metrics}) do
metrics =
Enum.reduce(metrics, 0, fn %{value: value}, acc ->
acc + value
end)

%{name => metrics}
end

defp map_gauge(%{name: name, metrics: [metric | _]}) do
value = Map.get(metric, :value, 0)
%{name => value}
Expand Down
8 changes: 7 additions & 1 deletion lib/archethic/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,13 @@ defmodule Archethic.Telemetry do
reporter_options: [buckets: [0.01, 0.025, 0.05, 0.1, 0.3, 0.5, 0.8, 1, 1.5, 2, 5, 10]],
measurement: :duration,
tags: [:nb_summaries]
)
),

# Archethic Web
counter("archethic_web.hosting.cache_file.hit.count"),
counter("archethic_web.hosting.cache_file.miss.count"),
counter("archethic_web.hosting.cache_ref_tx.hit.count"),
counter("archethic_web.hosting.cache_ref_tx.miss.count")
]
end
end
159 changes: 159 additions & 0 deletions lib/archethic_cache/lru.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
defmodule ArchethicCache.LRU do
@moduledoc """
A cache that stores the values in an ETS table.
There are hooks available to be able to add effects (ex: write to disk).

It keeps track of the order and bytes in the genserver state.
The `bytes_used` are tracked in here because if we just monitor ETS table size, we would not be able to have a disk cache.
The `keys` are used to determine the Least Recent Used (first is the most recent used, last is the least recent used).

We do not store the values directly in ETS but we insert a pair {size, value} instead.
Because size can be modified with the hooks
(ex: For LRUDisk, we discard the value from the ETS table, but still want to know the size written to disk)
"""

@spec start_link(GenServer.name(), non_neg_integer(), keyword()) ::
{:ok, binary()} | {:error, term()}
def start_link(name, max_bytes, opts \\ []) do
GenServer.start_link(__MODULE__, [name, max_bytes, opts], name: name)
end

@spec put(GenServer.server(), term(), term()) :: :ok
def put(server, key, value) do
GenServer.cast(server, {:put, key, value})
end

@spec get(GenServer.server(), term()) :: nil | term()
def get(server, key) do
GenServer.call(server, {:get, key})
end

@spec purge(GenServer.server()) :: :ok
def purge(server) do
GenServer.call(server, :purge)
end

def init([name, max_bytes, opts]) do
table = :ets.new(:"aecache_#{name}", [:set, {:read_concurrency, true}])

{:ok,
%{
table: table,
bytes_max: max_bytes,
bytes_used: 0,
keys: [],
put_fn: Keyword.get(opts, :put_fn, fn _key, value -> value end),
get_fn: Keyword.get(opts, :get_fn, fn _key, value -> value end),
evict_fn: Keyword.get(opts, :evict_fn, fn _key, _value -> :ok end)
}}
end

def handle_call({:get, key}, _from, state = %{table: table, keys: keys, get_fn: get_fn}) do
{reply, new_state} =
case :ets.lookup(table, key) do
[{^key, {_size, value}}] ->
{
get_fn.(key, value),
%{state | keys: keys |> move_front(key)}
}

[] ->
{nil, state}
end

{:reply, reply, new_state}
end

def handle_call(:purge, _from, state = %{table: table, evict_fn: evict_fn}) do
# we call the evict_fn to be able to clean effects (ex: file written to disk)
:ets.foldr(
fn {key, {_size, value}}, acc ->
evict_fn.(key, value)
acc + 1
end,
0,
table
)

:ets.delete_all_objects(table)
{:reply, :ok, %{state | keys: [], bytes_used: 0}}
end

def handle_cast(
{:put, key, value},
state = %{table: table, bytes_max: bytes_max, put_fn: put_fn, evict_fn: evict_fn}
) do
size = :erlang.external_size(value)

if size > bytes_max do
{:noreply, state}
else
# maybe evict some keys to make space
state =
evict_until(state, fn %{bytes_used: bytes_used, bytes_max: bytes_max} ->
bytes_used + size <= bytes_max
end)

case :ets.lookup(table, key) do
[] ->
value_to_store = put_fn.(key, value)

:ets.insert(table, {key, {size, value_to_store}})

new_state = %{
state
| keys: [key | state.keys],
bytes_used: state.bytes_used + size
}

{:noreply, new_state}

[{^key, {old_size, old_value}}] ->
# this is a replacement, we need to evict to update the bytes_used
evict_fn.(key, old_value)
value_to_store = put_fn.(key, value)

:ets.insert(table, {key, {size, value_to_store}})

new_state = %{
state
| keys: state.keys |> move_front(key),
bytes_used: state.bytes_used + size - old_size
}

{:noreply, new_state}
end
end
end

defp evict_until(
state = %{table: table, keys: keys, evict_fn: evict_fn, bytes_used: bytes_used},
predicate
) do
if predicate.(state) do
state
else
case Enum.reverse(keys) do
[] ->
state

[oldest_key | rest] ->
[{_, {size, oldest_value}}] = :ets.take(table, oldest_key)
evict_fn.(oldest_key, oldest_value)

evict_until(
%{
state
| bytes_used: bytes_used - size,
keys: rest
},
predicate
)
end
end
end

defp move_front(list, item) do
[item | List.delete(list, item)]
end
end
67 changes: 67 additions & 0 deletions lib/archethic_cache/lru_disk.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule ArchethicCache.LRUDisk do
@moduledoc """
Wraps the LRU genserver and adds hooks to write / read from disk.
The value is always a binary.
"""
alias ArchethicCache.LRU
alias Archethic.Crypto

require Logger

@spec start_link(GenServer.name(), non_neg_integer(), binary()) ::
{:ok, binary()} | {:error, term()}
def start_link(name, max_bytes, cache_dir) do
cache_dir = Path.join(cache_dir, "#{name}")
:ok = reset_directory(cache_dir)

LRU.start_link(name, max_bytes,
put_fn: fn key, value ->
# write to disk
File.write!(key_to_path(cache_dir, key), value, [:exclusive, :binary])

# store a nil value in the LRU's ETS table
nil
end,
get_fn: fn key, nil ->
# called only if the key is already in LRU's ETS table
case File.read(key_to_path(cache_dir, key)) do
{:ok, bin} ->
bin

{:error, _} ->
nil
end
end,
evict_fn: fn key, nil ->
case File.rm(key_to_path(cache_dir, key)) do
:ok ->
:ok

{:error, _} ->
:ok
end
end
)
end

@spec put(GenServer.server(), term(), binary()) :: :ok
defdelegate put(server, key, value), to: LRU, as: :put

@spec get(GenServer.server(), term()) :: nil | binary()
defdelegate get(server, key), to: LRU, as: :get

@spec purge(GenServer.server()) :: :ok
defdelegate purge(server), to: LRU, as: :purge

defp reset_directory(dir) do
File.rm_rf!(dir)
File.mkdir_p!(dir)
end

defp key_to_path(cache_dir, key) do
Path.join(
cache_dir,
Crypto.hash(:erlang.term_to_binary(key)) |> Base.url_encode64(padding: false)
)
end
end
37 changes: 2 additions & 35 deletions lib/archethic_web/controllers/aeweb_root_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,7 @@ defmodule ArchethicWeb.AEWebRootController do

use ArchethicWeb, :controller

def index(conn, params = %{"url_path" => url_path}) do
cache_headers = WebHostingController.get_cache_headers(conn)

case WebHostingController.get_website(params, cache_headers) do
{:ok, file_content, encoding, mime_type, cached?, etag} ->
WebHostingController.send_response(conn, file_content, encoding, mime_type, cached?, etag)

{:error, {:is_a_directory, transaction}} ->
{:ok, listing_html, encoding, mime_type, cached?, etag} =
WebHostingController.DirectoryListing.list(
conn.request_path,
params,
transaction,
cache_headers
)

WebHostingController.send_response(conn, listing_html, encoding, mime_type, cached?, etag)

{:error, :file_not_found} ->
# If file is not found, returning default file (url can be handled by index file)
case url_path do
[] ->
send_resp(conn, 404, "Not Found")

["index.html"] ->
send_resp(conn, 400, "Not Found")

_path ->
params = Map.put(params, "url_path", ["index.html"])
index(conn, params)
end

_ ->
send_resp(conn, 404, "Not Found")
end
def index(conn, params) do
WebHostingController.web_hosting(conn, params)
end
end
Loading