Skip to content

Commit

Permalink
Fix global and/or cluster transactions issue
Browse files Browse the repository at this point in the history
  • Loading branch information
cabol committed Mar 28, 2022
1 parent 21b071c commit e320ff2
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 36 deletions.
27 changes: 13 additions & 14 deletions lib/nebulex/adapter/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,15 @@ defmodule Nebulex.Adapter.Transaction do
import Nebulex.Helpers

@impl true
def transaction(%{pid: pid} = adapter_meta, opts, fun) do
def transaction(%{cache: cache, pid: pid} = adapter_meta, opts, fun) do
adapter_meta
|> do_in_transaction?()
|> do_transaction(
pid,
adapter_meta[:name] || cache,
Keyword.get(opts, :keys, []),
Keyword.get(opts, :nodes, [node()]),
Keyword.get(opts, :retries, :infinity),
adapter_meta[:name] || adapter_meta[:cache],
fun
)
end
Expand All @@ -109,15 +109,15 @@ defmodule Nebulex.Adapter.Transaction do
## Helpers

defp do_in_transaction?(%{pid: pid}) do
not is_nil(Process.get({pid, self()}))
!!Process.get({pid, self()})
end

defp do_transaction(true, _pid, _keys, _nodes, _retries, _cache, fun) do
defp do_transaction(true, _pid, _name, _keys, _nodes, _retries, fun) do
{:ok, fun.()}
end

defp do_transaction(false, pid, keys, nodes, retries, cache, fun) do
ids = lock_ids(pid, keys)
defp do_transaction(false, pid, name, keys, nodes, retries, fun) do
ids = lock_ids(name, keys)

case set_locks(ids, nodes, retries) do
true ->
Expand All @@ -132,7 +132,7 @@ defmodule Nebulex.Adapter.Transaction do
end

false ->
wrap_error Nebulex.Error, reason: {:transaction_aborted, cache, nodes}
wrap_error Nebulex.Error, reason: {:transaction_aborted, name, nodes}
end
end

Expand All @@ -150,22 +150,21 @@ defmodule Nebulex.Adapter.Transaction do

{:error, locked_ids} ->
:ok = del_locks(locked_ids, nodes)

false
end
end

defp del_locks(ids, nodes) do
Enum.each(ids, fn id ->
true = :global.del_lock(id, nodes)
end)
Enum.each(ids, &:global.del_lock(&1, nodes))
end

defp lock_ids(pid, []) do
[{pid, self()}]
defp lock_ids(name, []) do
[{name, self()}]
end

defp lock_ids(pid, keys) do
for key <- keys, do: {{pid, key}, self()}
defp lock_ids(name, keys) do
Enum.map(keys, &{{name, &1}, self()})
end
end
end
Expand Down
81 changes: 59 additions & 22 deletions lib/nebulex/adapters/replicated.ex
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,8 @@ defmodule Nebulex.Adapters.Replicated do
# Inherit default persistence implementation
use Nebulex.Adapter.Persistence

use Bitwise, only_operators: true

import Nebulex.Adapter
import Nebulex.Helpers

Expand Down Expand Up @@ -523,20 +525,31 @@ defmodule Nebulex.Adapters.Replicated do
other
end

defp with_transaction(
%{pid: pid, name: name} = adapter_meta,
action,
keys,
args,
opts \\ []
) do
nodes = Cluster.get_nodes(name)
defp with_transaction(adapter_meta, action, keys, args, opts \\ []) do
do_with_transaction(adapter_meta, action, keys, args, opts, 1)
end

defp do_with_transaction(%{name: name} = adapter_meta, action, keys, args, opts, times) do
# This is a bit hacky because the `:global_locks` table managed by
# `:global` is being accessed directly breaking the encapsulation.
# So far, this has been the simplest and fastest way to validate if
# the global sync lock `:"$sync_lock"` is set, so we block write-like
# operations until it finishes. The other option would be trying to
# lock the same key `:"$sync_lock"`, and then when the lock is acquired,
# delete it before processing the write operation. But this means another
# global lock across the cluster everytime there is a write. So for the
# time being, we just read the global table to validate it which is much
# faster; since it is a local read with the global ETS, there is no global
# locks across the cluster.
case :ets.lookup(:global_locks, :"$sync_lock") do
[_] ->
:ok = random_sleep(times)

do_with_transaction(adapter_meta, action, keys, args, opts, times + 1)

[] ->
nodes = Cluster.get_nodes(name)

# Ensure it waits until ongoing delete_all or sync operations finish,
# if there's any.
:global.trans(
{name, pid},
fn ->
# Write-like operation must be wrapped within a transaction
# to ensure proper replication
with {:ok, res} <-
Expand All @@ -545,9 +558,7 @@ defmodule Nebulex.Adapters.Replicated do
end) do
res
end
end,
nodes
)
end
end

defp multicall(%{name: name} = meta, action, args, opts) do
Expand Down Expand Up @@ -619,6 +630,29 @@ defmodule Nebulex.Adapters.Replicated do
)
end
end

# coveralls-ignore-start

defp random_sleep(times) do
_ =
if rem(times, 10) == 0 do
_ = :rand.seed(:exsplus)
end

# First time 1/4 seconds, then doubling each time up to 8 seconds max
tmax =
if times > 5 do
8000
else
div((1 <<< times) * 1000, 8)
end

tmax
|> :rand.uniform()
|> Process.sleep()
end

# coveralls-ignore-stop
end

defmodule Nebulex.Adapters.Replicated.Bootstrap do
Expand Down Expand Up @@ -696,22 +730,25 @@ defmodule Nebulex.Adapters.Replicated.Bootstrap do
end

@impl true
def terminate(_reason, state) do
def terminate(_reason, %{name: name}) do
# Delete global lock set when the server started
:ok = unlock(name)

# Ensure leaving the cluster when the cache stops
:ok = Cluster.leave(state.name)
:ok = Cluster.leave(name)
end

## Helpers

defp lock(name) do
nodes = Cluster.get_nodes(name)
true = :global.set_lock({name, self()}, nodes)
true = :global.set_lock({:"$sync_lock", self()}, Cluster.get_nodes(name))

:ok
end

defp unlock(name) do
nodes = Cluster.get_nodes(name)
true = :global.del_lock({name, self()}, nodes)
true = :global.del_lock({:"$sync_lock", self()}, Cluster.get_nodes(name))

:ok
end

Expand Down
2 changes: 2 additions & 0 deletions test/nebulex/adapters/partitioned_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ defmodule Nebulex.Adapters.PartitionedTest do

on_exit(fn ->
_ = Partitioned.put_dynamic_cache(default_dynamic_cache)

:ok = Process.sleep(100)

stop_caches(node_pid_list)
end)

Expand Down
2 changes: 2 additions & 0 deletions test/nebulex/adapters/replicated_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ defmodule Nebulex.Adapters.ReplicatedTest do

on_exit(fn ->
_ = Replicated.put_dynamic_cache(default_dynamic_cache)

:ok = Process.sleep(100)

stop_caches(node_pid_list)
end)

Expand Down

0 comments on commit e320ff2

Please sign in to comment.