Skip to content

Commit

Permalink
Add cache for AEWeb (files and transactions) (#744)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchamagne committed Jan 10, 2023
1 parent 5dc532c commit 5b39bb7
Show file tree
Hide file tree
Showing 15 changed files with 811 additions and 142 deletions.
10 changes: 10 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,16 @@ config :archethic, Archethic.Networking.IPLookup.RemoteDiscovery,

# -----End-of-Networking-configs ------

config :archethic_web,
# The tx_cache is stored on RAM
# 750MB should hold a minimum 250 transactions
tx_cache_bytes: 750 * 1024 * 1024,

# The file_cache is stored on DISK
# 5GB should hold 2000 average size pages
# https://httparchive.org/reports/page-weight
file_cache_bytes: 5 * 1024 * 1024 * 1024

config :esbuild,
version: "0.12.18",
default: [
Expand Down
20 changes: 19 additions & 1 deletion lib/archethic/metrics/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,16 @@ defmodule Archethic.Metrics.Parser do
...> name: "vm_memory_atom",
...> type: "gauge"
...> },
...> %{
...> metrics: [%{value: 4}, %{value: 1}],
...> name: "cache_hit",
...> type: "counter"
...> }
...> ] |> Parser.reduce_metrics()
%{
"archethic_contract_parsing_duration" => %{count: 2, sum: 10.0},
"vm_memory_atom" => 1589609
"vm_memory_atom" => 1589609,
"cache_hit" => 5
}
"""
@spec reduce_metrics(list(metric())) ::
Expand All @@ -287,9 +293,21 @@ defmodule Archethic.Metrics.Parser do

metric = %{type: "gauge"}, acc ->
Map.merge(acc, map_gauge(metric))

metric = %{type: "counter"}, acc ->
Map.merge(acc, map_counter(metric))
end)
end

defp map_counter(%{name: name, metrics: metrics}) do
metrics =
Enum.reduce(metrics, 0, fn %{value: value}, acc ->
acc + value
end)

%{name => metrics}
end

defp map_gauge(%{name: name, metrics: [metric | _]}) do
value = Map.get(metric, :value, 0)
%{name => value}
Expand Down
8 changes: 7 additions & 1 deletion lib/archethic/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,13 @@ defmodule Archethic.Telemetry do
reporter_options: [buckets: [0.01, 0.025, 0.05, 0.1, 0.3, 0.5, 0.8, 1, 1.5, 2, 5, 10]],
measurement: :duration,
tags: [:nb_summaries]
)
),

# Archethic Web
counter("archethic_web.hosting.cache_file.hit.count"),
counter("archethic_web.hosting.cache_file.miss.count"),
counter("archethic_web.hosting.cache_ref_tx.hit.count"),
counter("archethic_web.hosting.cache_ref_tx.miss.count")
]
end
end
159 changes: 159 additions & 0 deletions lib/archethic_cache/lru.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
defmodule ArchethicCache.LRU do
@moduledoc """
A cache that stores the values in an ETS table.
There are hooks available to be able to add effects (ex: write to disk).
It keeps track of the order and bytes in the genserver state.
The `bytes_used` are tracked in here because if we just monitor ETS table size, we would not be able to have a disk cache.
The `keys` are used to determine the Least Recent Used (first is the most recent used, last is the least recent used).
We do not store the values directly in ETS but we insert a pair {size, value} instead.
Because size can be modified with the hooks
(ex: For LRUDisk, we discard the value from the ETS table, but still want to know the size written to disk)
"""

@spec start_link(GenServer.name(), non_neg_integer(), keyword()) ::
{:ok, binary()} | {:error, term()}
def start_link(name, max_bytes, opts \\ []) do
GenServer.start_link(__MODULE__, [name, max_bytes, opts], name: name)
end

@spec put(GenServer.server(), term(), term()) :: :ok
def put(server, key, value) do
GenServer.cast(server, {:put, key, value})
end

@spec get(GenServer.server(), term()) :: nil | term()
def get(server, key) do
GenServer.call(server, {:get, key})
end

@spec purge(GenServer.server()) :: :ok
def purge(server) do
GenServer.call(server, :purge)
end

def init([name, max_bytes, opts]) do
table = :ets.new(:"aecache_#{name}", [:set, {:read_concurrency, true}])

{:ok,
%{
table: table,
bytes_max: max_bytes,
bytes_used: 0,
keys: [],
put_fn: Keyword.get(opts, :put_fn, fn _key, value -> value end),
get_fn: Keyword.get(opts, :get_fn, fn _key, value -> value end),
evict_fn: Keyword.get(opts, :evict_fn, fn _key, _value -> :ok end)
}}
end

def handle_call({:get, key}, _from, state = %{table: table, keys: keys, get_fn: get_fn}) do
{reply, new_state} =
case :ets.lookup(table, key) do
[{^key, {_size, value}}] ->
{
get_fn.(key, value),
%{state | keys: keys |> move_front(key)}
}

[] ->
{nil, state}
end

{:reply, reply, new_state}
end

def handle_call(:purge, _from, state = %{table: table, evict_fn: evict_fn}) do
# we call the evict_fn to be able to clean effects (ex: file written to disk)
:ets.foldr(
fn {key, {_size, value}}, acc ->
evict_fn.(key, value)
acc + 1
end,
0,
table
)

:ets.delete_all_objects(table)
{:reply, :ok, %{state | keys: [], bytes_used: 0}}
end

def handle_cast(
{:put, key, value},
state = %{table: table, bytes_max: bytes_max, put_fn: put_fn, evict_fn: evict_fn}
) do
size = :erlang.external_size(value)

if size > bytes_max do
{:noreply, state}
else
# maybe evict some keys to make space
state =
evict_until(state, fn %{bytes_used: bytes_used, bytes_max: bytes_max} ->
bytes_used + size <= bytes_max
end)

case :ets.lookup(table, key) do
[] ->
value_to_store = put_fn.(key, value)

:ets.insert(table, {key, {size, value_to_store}})

new_state = %{
state
| keys: [key | state.keys],
bytes_used: state.bytes_used + size
}

{:noreply, new_state}

[{^key, {old_size, old_value}}] ->
# this is a replacement, we need to evict to update the bytes_used
evict_fn.(key, old_value)
value_to_store = put_fn.(key, value)

:ets.insert(table, {key, {size, value_to_store}})

new_state = %{
state
| keys: state.keys |> move_front(key),
bytes_used: state.bytes_used + size - old_size
}

{:noreply, new_state}
end
end
end

defp evict_until(
state = %{table: table, keys: keys, evict_fn: evict_fn, bytes_used: bytes_used},
predicate
) do
if predicate.(state) do
state
else
case Enum.reverse(keys) do
[] ->
state

[oldest_key | rest] ->
[{_, {size, oldest_value}}] = :ets.take(table, oldest_key)
evict_fn.(oldest_key, oldest_value)

evict_until(
%{
state
| bytes_used: bytes_used - size,
keys: rest
},
predicate
)
end
end
end

defp move_front(list, item) do
[item | List.delete(list, item)]
end
end
67 changes: 67 additions & 0 deletions lib/archethic_cache/lru_disk.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
defmodule ArchethicCache.LRUDisk do
@moduledoc """
Wraps the LRU genserver and adds hooks to write / read from disk.
The value is always a binary.
"""
alias ArchethicCache.LRU
alias Archethic.Crypto

require Logger

@spec start_link(GenServer.name(), non_neg_integer(), binary()) ::
{:ok, binary()} | {:error, term()}
def start_link(name, max_bytes, cache_dir) do
cache_dir = Path.join(cache_dir, "#{name}")
:ok = reset_directory(cache_dir)

LRU.start_link(name, max_bytes,
put_fn: fn key, value ->
# write to disk
File.write!(key_to_path(cache_dir, key), value, [:exclusive, :binary])

# store a nil value in the LRU's ETS table
nil
end,
get_fn: fn key, nil ->
# called only if the key is already in LRU's ETS table
case File.read(key_to_path(cache_dir, key)) do
{:ok, bin} ->
bin

{:error, _} ->
nil
end
end,
evict_fn: fn key, nil ->
case File.rm(key_to_path(cache_dir, key)) do
:ok ->
:ok

{:error, _} ->
:ok
end
end
)
end

@spec put(GenServer.server(), term(), binary()) :: :ok
defdelegate put(server, key, value), to: LRU, as: :put

@spec get(GenServer.server(), term()) :: nil | binary()
defdelegate get(server, key), to: LRU, as: :get

@spec purge(GenServer.server()) :: :ok
defdelegate purge(server), to: LRU, as: :purge

defp reset_directory(dir) do
File.rm_rf!(dir)
File.mkdir_p!(dir)
end

defp key_to_path(cache_dir, key) do
Path.join(
cache_dir,
Crypto.hash(:erlang.term_to_binary(key)) |> Base.url_encode64(padding: false)
)
end
end
37 changes: 2 additions & 35 deletions lib/archethic_web/controllers/aeweb_root_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,7 @@ defmodule ArchethicWeb.AEWebRootController do

use ArchethicWeb, :controller

def index(conn, params = %{"url_path" => url_path}) do
cache_headers = WebHostingController.get_cache_headers(conn)

case WebHostingController.get_website(params, cache_headers) do
{:ok, file_content, encoding, mime_type, cached?, etag} ->
WebHostingController.send_response(conn, file_content, encoding, mime_type, cached?, etag)

{:error, {:is_a_directory, transaction}} ->
{:ok, listing_html, encoding, mime_type, cached?, etag} =
WebHostingController.DirectoryListing.list(
conn.request_path,
params,
transaction,
cache_headers
)

WebHostingController.send_response(conn, listing_html, encoding, mime_type, cached?, etag)

{:error, :file_not_found} ->
# If file is not found, returning default file (url can be handled by index file)
case url_path do
[] ->
send_resp(conn, 404, "Not Found")

["index.html"] ->
send_resp(conn, 400, "Not Found")

_path ->
params = Map.put(params, "url_path", ["index.html"])
index(conn, params)
end

_ ->
send_resp(conn, 404, "Not Found")
end
def index(conn, params) do
WebHostingController.web_hosting(conn, params)
end
end
Loading

0 comments on commit 5b39bb7

Please sign in to comment.