-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Use a descentralized mode computation for Logger #8567
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Co-Authored-By: josevalim <jose.valim@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its nice to never put the message in the message queue in the first place so this feels like a good step forward. However I think we need some more work to get this reliable enough to ship.
I wonder if we could solve this in a simpler way using the max heap size + on heap queue. We should be able to reliably know if the logger got killed and log a warning. It would mean quite different behavior though. Conversely if going with the PR's approach should we use an off heap queue by default to reduce GC?
Thanks @fishcakez! I will reply trying to address all of your comments at once. First, regarding Regarding logging a message when we start discard, the tricky part is to guarantee that it only happens from time to time: i.e. if for some reason our log rate makes our counter close the discard threshold, then we would log that we started discarding multiple times. So my idea was to have a separate process that receives a message every time when there is discarding and it emits the discard log message only from to time, for example, every 30 seconds. If we have this timer process, then we can also use this process to solve the "discard deadlock" you mentioned, where we could stay on discard state forever. So this discard timer process would 1. receive a message on every discard, 2. log whenever it receives the first message for a time period and enter into discard mode, 3. ignore all other messages for the next 30 seconds, 4. emit a log event with the current status (no longer discarding, still discarding, etc) after the 30 seconds. However, we still need to discuss which implementation we want to use for the write table. We can use Thoughts? |
Doesn't the current implementation require locking all processes? It acts on an |
Counters are implemented using hardware atomic operations, so there's no locking involved. |
I have pushed some changes so we no longer write to the table on every read. Instead, we keep the read table only for reads and the write table (or counter) for writes (i.e. both counter and message queue len). I will run some benchmarks next. |
Here are the values for bumping the counter vs update counters in ETS. The first parameter is the number of processes and followed by how far we want to count. It prints the time in microseconds followed by the value of the counter at the end.
This is a machine with four cores. Contention increases considerably with ETS. Here is the code: [instances, limit] = System.argv() |> Enum.map(&String.to_integer/1)
wait = 0
defmodule BumpCounter do
start =
for _ <- 1..instances do
quote do
bumper(self(), var!(counter), unquote(limit), unquote(wait))
end
end
stop =
for _ <- 1..instances do
quote do
receive do
:done -> :ok
end
end
end
def run(modes) do
counter = :counters.new(1, modes)
fn ->
unquote_splicing(start)
unquote_splicing(stop)
:counters.get(counter, 1)
end
|> :timer.tc()
|> IO.inspect()
end
defp bumper(parent, counter, limit, wait) do
spawn_link(fn -> bumper_loop(parent, counter, limit, wait) end)
end
defp bumper_loop(parent, counter, limit, wait) do
:counters.add(counter, 1, 1)
if :counters.get(counter, 1) >= limit do
send(parent, :done)
else
Process.sleep(wait)
bumper_loop(parent, counter, limit, wait)
end
end
end
defmodule BumpETS do
start =
for _ <- 1..instances do
quote do
bumper(self(), var!(counter), unquote(limit), unquote(wait))
end
end
stop =
for _ <- 1..instances do
quote do
receive do
:done -> :ok
end
end
end
def run(modes) do
counter = :ets.new(:counter, [:set, :public] ++ modes)
:ets.insert(counter, {:counter, 0})
fn ->
unquote_splicing(start)
unquote_splicing(stop)
:ets.lookup_element(counter, :counter, 2)
end
|> :timer.tc()
|> IO.inspect()
end
defp bumper(parent, counter, limit, wait) do
spawn_link(fn -> bumper_loop(parent, counter, limit, wait) end)
end
defp bumper_loop(parent, counter, limit, wait) do
if :ets.update_counter(counter, :counter, {2, 1}) >= limit do
send(parent, :done)
else
Process.sleep(wait)
bumper_loop(parent, counter, limit, wait)
end
end
end
IO.puts "BumpCounter"
Task.async(fn -> BumpCounter.run() end) |> Task.await(:infinity)
IO.puts "BumpCounter (write concurrency)"
Task.async(fn -> BumpCounter.run([:write_concurrency]) end) |> Task.await(:infinity)
IO.puts "BumpETS (read concurrency)"
Task.async(fn -> BumpETS.run([read_concurrency: true]) end) |> Task.await(:infinity)
IO.puts "BumpETS (write concurrency)"
Task.async(fn -> BumpETS.run([write_concurrency: true]) end) |> Task.await(:infinity)
IO.puts "BumpETS (none)"
Task.async(fn -> BumpETS.run([]) end) |> Task.await(:infinity) I will benchmark now one writing and multiple reading. |
Here are the results for read and writes with a fixed number of processes:
The ETS table with read/write concurrency is more efficient but the difference is not large because the writer process is the bottleneck. Here is the code: [instances, limit] = System.argv() |> Enum.map(&String.to_integer/1)
defmodule ReadWriteCounter do
start =
for _ <- 1..instances do
quote do
reader(self(), var!(incrementer), var!(counter), unquote(limit))
end
end
stop =
for _ <- 1..instances do
quote do
receive do
:done -> :ok
end
end
end
def run do
counter = :counters.new(1, [:atomics])
incrementer = incrementer(counter)
fn ->
unquote_splicing(start)
unquote_splicing(stop)
:counters.get(counter, 1)
end
|> :timer.tc()
|> IO.inspect()
end
defp incrementer(counter) do
spawn_link(fn -> increment_loop(counter, 0) end)
end
defp increment_loop(counter, value) do
receive do
{:bump, pid} ->
:counters.put(counter, 1, value)
send(pid, :bumped)
increment_loop(counter, value + 1)
end
end
defp reader(parent, incrementer, counter, limit) do
spawn_link(fn -> read_loop(parent, incrementer, counter, limit) end)
end
defp read_loop(parent, incrementer, counter, limit) do
if :counters.get(counter, 1) >= limit do
send(parent, :done)
else
send(incrementer, {:bump, self()})
receive do
:bumped -> read_loop(parent, incrementer, counter, limit)
end
end
end
end
defmodule ReadWriteETS do
start =
for _ <- 1..instances do
quote do
reader(self(), var!(incrementer), var!(counter), unquote(limit))
end
end
stop =
for _ <- 1..instances do
quote do
receive do
:done -> :ok
end
end
end
def run(modes) do
counter = :ets.new(:counter, [:public] ++ modes)
:ets.insert(counter, {:counter, 0})
incrementer = incrementer(counter)
fn ->
unquote_splicing(start)
unquote_splicing(stop)
:ets.lookup_element(counter, :counter, 2)
end
|> :timer.tc()
|> IO.inspect()
end
defp incrementer(counter) do
spawn_link(fn -> increment_loop(counter, 0) end)
end
defp increment_loop(counter, value) do
receive do
{:bump, pid} ->
:ets.update_element(counter, :counter, {2, value})
send(pid, :bumped)
increment_loop(counter, value + 1)
end
end
defp reader(parent, incrementer, counter, limit) do
spawn_link(fn -> read_loop(parent, incrementer, counter, limit) end)
end
defp read_loop(parent, incrementer, counter, limit) do
if :ets.lookup_element(counter, :counter, 2) >= limit do
send(parent, :done)
else
send(incrementer, {:bump, self()})
receive do
:bumped -> read_loop(parent, incrementer, counter, limit)
end
end
end
end
IO.puts("ReadWriteCounter")
Task.async(fn -> ReadWriteCounter.run() end) |> Task.await(:infinity)
IO.puts "ReadWriteETS (read concurrency)"
Task.async(fn -> ReadWriteETS.run([read_concurrency: true]) end) |> Task.await(:infinity)
IO.puts "ReadWriteETS (write concurrency)"
Task.async(fn -> ReadWriteETS.run([write_concurrency: true]) end) |> Task.await(:infinity)
IO.puts "ReadWriteETS (none)"
Task.async(fn -> ReadWriteETS.run([]) end) |> Task.await(:infinity) I will try the burst approach next. |
Here is the burst results (spawning multiple processes were each does only one operation): ~/OSS/elixir[jv-better-logger %]$ elixir foo.exs 16 10000
BurstCounter
{92745, 0}
BurstETS (read concurrency)
{694400, 0}
BurstETS (write concurrency)
{731023, 0}
BurstETS (none)
{742579, 0}
~/OSS/elixir[jv-better-logger %]$ elixir foo.exs 4 40000
BurstCounter
{107563, 0}
BurstETS (read concurrency)
{589730, 0}
BurstETS (write concurrency)
{94679, 0}
BurstETS (none)
{572116, 0}
~/OSS/elixir[jv-better-logger %]$ elixir foo.exs 2 80000
BurstCounter
{160183, 0}
BurstETS (read concurrency)
{149105, 0}
BurstETS (write concurrency)
{142469, 0}
BurstETS (none)
{139383, 0}
~/OSS/elixir[jv-better-logger %]$ elixir foo.exs 1 160000
BurstCounter
{265164, 0}
BurstETS (read concurrency)
{245035, 0}
BurstETS (write concurrency)
{234930, 0}
BurstETS (none)
{230173, 0} Here is the code: [instances, limit] = System.argv() |> Enum.map(&String.to_integer/1)
start =
for _ <- 1..instances do
quote do
spawn(fn -> burst_loop(var!(incrementer), var!(counter), unquote(limit)) end)
end
end
defmodule BurstCounter do
def run do
counter = :counters.new(1, [:atomics])
incrementer = incrementer(self(), counter, unquote(instances * limit))
fn ->
unquote_splicing(start)
receive do
:done -> :counters.get(counter, 1)
end
end
|> :timer.tc()
|> IO.inspect()
end
defp incrementer(parent, counter, limit) do
spawn_link(fn -> increment_loop(parent, counter, limit) end)
end
defp increment_loop(parent, counter, 0) do
:counters.put(counter, 1, 0)
send(parent, :done)
end
defp increment_loop(parent, counter, value) do
receive do
:bump ->
:counters.put(counter, 1, value)
increment_loop(parent, counter, value - 1)
end
end
defp burst_loop(_incrementer, _counter, 0) do
:ok
end
defp burst_loop(incrementer, counter, limit) do
spawn(fn ->
:counters.get(counter, 1)
send(incrementer, :bump)
end)
burst_loop(incrementer, counter, limit - 1)
end
end
defmodule BurstETS do
def run(modes) do
counter = :ets.new(:counter, [:public] ++ modes)
:ets.insert(counter, {:counter, 0})
incrementer = incrementer(self(), counter, unquote(instances * limit))
fn ->
unquote_splicing(start)
receive do
:done -> :ets.lookup_element(counter, :counter, 2)
end
end
|> :timer.tc()
|> IO.inspect()
end
defp incrementer(parent, counter, limit) do
spawn_link(fn -> increment_loop(parent, counter, limit) end)
end
defp increment_loop(parent, counter, 0) do
:ets.update_element(counter, :counter, {2, 0})
send(parent, :done)
end
defp increment_loop(parent, counter, value) do
receive do
:bump ->
:ets.update_element(counter, :counter, {2, value})
increment_loop(parent, counter, value - 1)
end
end
defp burst_loop(_incrementer, _counter, 0) do
:ok
end
defp burst_loop(incrementer, counter, limit) do
spawn(fn ->
:ets.lookup_element(counter, :counter, 2)
send(incrementer, :bump)
end)
burst_loop(incrementer, counter, limit - 1)
end
end
IO.puts("BurstCounter")
Task.async(fn -> BurstCounter.run() end) |> Task.await(:infinity)
IO.puts("BurstETS (read concurrency)")
Task.async(fn -> BurstETS.run(read_concurrency: true) end) |> Task.await(:infinity)
IO.puts("BurstETS (write concurrency)")
Task.async(fn -> BurstETS.run(write_concurrency: true) end) |> Task.await(:infinity)
IO.puts("BurstETS (none)")
Task.async(fn -> BurstETS.run([]) end) |> Task.await(:infinity) Counters behave consistently better under load. |
TL;DR - counters perform better. |
@fishcakez the last commit adds the logging back and has a periodic check, every 30 seconds, that reports the status of the system and heals the counters. Once we enter in discard mode, the next status update is only on the next periodic check, this guarantees we won't be checking all the time. |
Can we move to a single counter? {:message_queue_len, length} = Process.info(self(), :message_queue_len)
total = :counters.get(counter, @counter_pos)
:counters.sub(counter, @counter_pos, total - length) |
The commit looks good |
@fishcakez I think we can do a single counter after all. Good call. |
Actually, we can only do a single counter if we do this: {:message_queue_len, length} = Process.info(self(), :message_queue_len)
total = :counters.get(counter, @counter_pos)
:counters.sub(counter, @counter_pos, max(total - length, 0)) This is to avoid the case where the message queue is big because of system messages or something else being weird in the system. EDIT: Please ignore this. |
I pushed a single counter. |
# TODO: Use counters exclusively when we require Erlang/OTP 22+. | ||
defp new_counter() do | ||
if Code.ensure_loaded?(:counters) do | ||
{:counters, :counters.new(1, [:atomics])} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you explored using the write_concurrency
mode? As far as I understand the counters in Logger are not perfectly consistent anyway and provide more of a heuristic value rather than a hard limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did in the benchmarks. Because it is per row and we have a single row, write_concurrency actually make things worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the published benchmarks, I can see benchmarks for ETS with write_concurrency
but not for counters
with that option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For write_concurrency
with counters when we do a "fast" write (add) we always do a "slow" read (get) so may actually be slower? There isnt a benchmark though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We benchmarked it here: #8567 (comment)
EDIT: Oh, you mention the counter with write_concurrency. I will investigate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fishcakez there is no relevant difference in the benchmarks linked above with the write_concurrency option compared to atomics.
Should we have some way of detecting overflows in the counters? I'm not 100% sure how that would work and if we were likely to hit any limitations given it uses 64bit integers underneath. |
Yeah... we are fine. |
{counter, :discard, discard_threshold, discard_period} | ||
|
||
log == :discard and periodic_check? -> | ||
warn("Attempted to log #{total} messages, which is below :discard_threshold") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isnt a test for this feature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried but it is really tricky to test it without adding more complexity and/or adding long timers to the suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I can test it by manually sending a handle_info
. Not nice but it should do it. 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good
According to the Compatibility and Deprecations page, Elixir v1.9 will support Erlang/OTP 20. I believe |
We are falling back to |
Test added, I think this is ready to go! |
true = :ets.insert_new(@table, [{:data, nil}, {:deleted_handlers, []}]) | ||
tab | ||
def new do | ||
{new_data(), new_counter()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we passing the "data" here to always pattern match against @table
later? (taking into account this is a private module).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we are just being assertive the correct data is being passed around. We could also just use it without checking. Either way is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a 16 (physical) core machine with concurrency above or equal to 16: counter with atomics wins for bump and read write but not burst where counter with write concurrency is slightly faster. Counters are faster than ETS. Sometimes 100x so.
Previous logger versions would always compute the mode on the Logger handler which meant that, if any other Logger handler blocked, either because it was waiting on IO, the system is overloaded or due to long computations, a burst of messages could make it to the Logger message queue grow quite large until it notices it should change its mode.
This commit changes the computation to use two counters, one that is kept by the Logger and another updated by the client. The mode is computed by adding those two counters together. On every message, the Logger handler resets the counter and updates the message queue one. This makes the following script go from tens of thousands messages in the Logger inbox to only hundreds (assuming a
sync_threshold
of 100) when writing to stdio is rate limited withmix run foo.exs | pv -p -L 1k 1>/dev/null
: