Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check child pid when disowning processes #174

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions lib/horde/dynamic_supervisor_impl.ex
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,26 @@ defmodule Horde.DynamicSupervisorImpl do

# TODO think of a better name than "disown_child_process"
def handle_cast({:disown_child_process, child_id}, state) do
{{_, _, child_pid}, new_processes_by_id} = Map.pop(state.processes_by_id, child_id)
{child_info, new_processes_by_id} = Map.pop(state.processes_by_id, child_id)

new_state = %{
state
| processes_by_id: new_processes_by_id,
process_pid_to_id: Map.delete(state.process_pid_to_id, child_pid),
local_process_count: state.local_process_count - 1
}
new_state =
case child_info do
nil ->
Logger.warn("Disowning child with id #{child_id} but no pid was found")
state

{_, _, child_pid} ->
:ok =
DeltaCrdt.mutate(crdt_name(state.name), :remove, [{:process, child_id}], :infinity)

%{
state
| processes_by_id: new_processes_by_id,
process_pid_to_id: Map.delete(state.process_pid_to_id, child_pid),
local_process_count: state.local_process_count - 1
}
end

:ok = DeltaCrdt.mutate(crdt_name(state.name), :remove, [{:process, child_id}], :infinity)
{:noreply, new_state}
end

Expand Down
100 changes: 99 additions & 1 deletion test/netsplit_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,98 @@ defmodule NetsplitTest do
refute_receive {^n, :hello_echo_server}, 1100
end

test "process redistribution after netsplit" do
cluster_name = "node"
sleep_millis = 2000

[node1, node2, node3] = nodes = LocalCluster.start_nodes(cluster_name, 3)

Enum.each(nodes, fn node ->
assert :pong = Node.ping(node)
end)

# Start a test supervision tree in all three nodes
Enum.each(nodes, fn node ->
Node.spawn(node, LocalClusterHelper, :start, [
MySupervisionTree,
:start_link,
[
[
cluster: cluster_name,
distribution: Horde.UniformDistribution,
sync_interval: 100
]
]
])
end)

# Wait for supervisor and registry in all nodes
Process.sleep(sleep_millis)

Enum.each(nodes, fn node ->
assert MySupervisor.alive?(node)
assert MyRegistry.alive?(node)
end)

# Start 10 servers. Processes should be distributed across all three
# nodes in the cluster
servers = 10

1..servers
|> Enum.each(fn n ->
{:ok, _} = MyCluster.start_server(node1, server_name(n))
end)

Process.sleep(sleep_millis)

# Ensure all servers are running in one of the 3 nodes of the
# clusters.
ensure_servers_in_nodes(servers, nodes)

Process.sleep(sleep_millis)

# Create a network partition. Node3 is now isolated from the other
# two nodes
Schism.partition([node3])

Process.sleep(sleep_millis)

Enum.each(nodes, fn node ->
assert MySupervisor.alive?(node)
assert MyRegistry.alive?(node)
end)

# Verify all 10 servers are now living in the partition
# formed by node1 and node2. We verify this for each one of the ten
# servers started, and we verify the view on that process is
# consistent from both nodes of the partition, ie, both nodes see
# the same pid and that pid is in one of those nodes
ensure_servers_in_nodes(servers, [node1, node2])
end

defp server_name(n), do: "server#{n}"

# Convenience function that inspects each server, and fails the test
# if necessary
defp ensure_servers_in_nodes(count, nodes) do
1..count
|> Enum.each(fn n ->
name = server_name(n)

case MyCluster.server_in_nodes?(nodes, name) do
false ->
flunk(
"Server #{name} not running in one of: #{inspect(nodes)}. Debug: #{
inspect(MyCluster.debug(nodes))
}"
)

true ->
:ok
end
end)
end

test "name conflict after healing netsplit" do
cluster_name = "cluster"
server_name = "server"
Expand All @@ -57,7 +149,13 @@ defmodule NetsplitTest do
Node.spawn(node, LocalClusterHelper, :start, [
MySupervisionTree,
:start_link,
[[cluster: cluster_name, distribution: Horde.UniformQuorumDistribution, sync_interval: 5]]
[
[
cluster: cluster_name,
distribution: Horde.UniformQuorumDistribution,
sync_interval: 100
]
]
])
end)

Expand Down
71 changes: 71 additions & 0 deletions test/support/my_supervision_tree.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,73 @@ defmodule MyCluster do
def whereis_server(node, name) do
:rpc.call(node, MyCluster, :whereis_server, [name])
end

def debug(nodes) when is_list(nodes) do
Enum.map(nodes, fn n ->
{n, debug(n)}
end)
end

def debug(n) when is_atom(n) do
:rpc.call(n, MyCluster, :debug, [])
end

@doc """
Returns member and registry content information. This function is used
to debug failed expectations in tests
"""
def debug() do
[
members: [
supervisor: Horde.Cluster.members(MySupervisor),
registry: Horde.Cluster.members(MyRegistry)
],
keys:
MyRegistry.keys()
|> Enum.map(fn {k, pid} ->
{k, pid, node(pid)}
end)
]
end

@doc """
Checks whether the given server name is seen by all the nodes
in the given list. This function returns true, if all nodes are able
to see the same pid, and the node of that pid is one of those nodes.

"""
def server_in_nodes?(nodes, name) do
Enum.reduce_while(nodes, [], fn n, pids ->
case whereis_server(n, name) do
:not_found ->
# At least one node is not able to locate the server
# return false
{:halt, :not_found}

pid when is_pid(pid) ->
# Collect the pid see by the node, for later
{:cont, [pid | pids]}
end
end)
|> case do
:not_found ->
false

pids ->
# All nodes are able to see a pid. Check whether all nodes
# see the same pid, and that pid is one of the nodes of original
# list
case Enum.uniq(pids) do
[pid] ->
Enum.member?(nodes, node(pid))

_ ->
# There is more than one distinct pid in the list
# Return false
false
end
end
end
end

defmodule MyRegistry do
Expand Down Expand Up @@ -115,6 +182,10 @@ defmodule MyRegistry do
false
end
end

def keys() do
Horde.Registry.select(__MODULE__, [{{:"$1", :"$2", :"$3"}, [], [{{:"$1", :"$2"}}]}])
end
end

defmodule MySupervisor do
Expand Down