Skip to content

Commit

Permalink
Prepare hypergeometric distribution on authorized nodes changes
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Nov 17, 2022
1 parent b3b1405 commit 5466387
Showing 1 changed file with 88 additions and 9 deletions.
97 changes: 88 additions & 9 deletions lib/archethic/election/hypergeometric_distribution.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,115 @@ defmodule Archethic.Election.HypergeometricDistribution do
use GenServer
@vsn Mix.Project.config()[:version]

alias Archethic.P2P
alias Archethic.P2P.Node
alias Archethic.PubSub

@executable Application.app_dir(:archethic, "/priv/c_dist/hypergeometric_distribution")

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end

def init(_opts) do
executable = Application.app_dir(:archethic, "/priv/c_dist/hypergeometric_distribution")
{:ok, %{executable: executable, previous_simulations: %{}}}
PubSub.register_to_node_update()

{:ok, %{previous_simulations: %{}, clients: %{}, tasks: %{}}}
end

def handle_info(
{:node_update, %Node{available?: true, authorized?: true}},
state = %{tasks: tasks}
) do
nb_nodes = length(P2P.authorized_and_available_nodes())

case Map.get(tasks, nb_nodes) do
nil ->
%Task{ref: ref} = start_simulation_task(nb_nodes)
{:noreply, Map.update!(state, :tasks, &Map.put(&1, nb_nodes, ref))}

_ ->
{:noreply, state}
end
end

def handle_info(
{:node_update, %Node{available?: false, authorized?: true}},
state = %{tasks: tasks}
) do
nb_nodes = length(P2P.authorized_and_available_nodes())

case Map.get(tasks, nb_nodes) do
nil ->
%Task{ref: ref} = start_simulation_task(nb_nodes)
{:noreply, Map.update!(state, :tasks, &Map.put(&1, nb_nodes, ref))}

_ ->
{:noreply, state}
end
end

def handle_info({:node_update, _}, state), do: {:noreply, state}

def handle_info(
{ref, {nb_nodes, simulation_result}},
state = %{clients: clients}
) do
clients
|> Map.get(ref, [])
|> Enum.each(&GenServer.reply(&1, simulation_result))

new_state =
state
|> Map.update!(:previous_simulations, &Map.put(&1, nb_nodes, simulation_result))
|> Map.update!(:tasks, &Map.delete(&1, nb_nodes))
|> Map.update!(:clients, &Map.delete(&1, ref))

{:noreply, new_state}
end

def handle_info({:DOWN, _ref, :process, _pid, _reason}, state), do: {:noreply, state}

def handle_call(
{:run_simulation, nb_nodes},
_from,
state = %{executable: executable, previous_simulations: previous_simulations}
from,
state = %{previous_simulations: previous_simulations, tasks: tasks}
)
when is_integer(nb_nodes) and nb_nodes >= 0 do
case Map.get(previous_simulations, nb_nodes) do
nil ->
pid = Port.open({:spawn_executable, executable}, args: [Integer.to_string(nb_nodes)])
case Map.get(tasks, nb_nodes) do
nil ->
%Task{ref: ref} = start_simulation_task(nb_nodes)

new_state =
state
|> Map.update!(:clients, &Map.put(&1, ref, [from]))
|> Map.update!(:tasks, &Map.put(&1, nb_nodes, ref))

{:noreply, new_state}

receive do
{^pid, {:data, data}} ->
{n, _} = :string.to_integer(data)
{:reply, n, put_in(state, [:previous_simulations, nb_nodes], n)}
ref ->
{:noreply, update_in(state, [:clients, Access.key(ref, [])], &[from | &1])}
end

simulation ->
{:reply, simulation, state}
end
end

defp start_simulation_task(nb_nodes) do
Task.async(fn ->
pid = Port.open({:spawn_executable, @executable}, args: [Integer.to_string(nb_nodes)])

receive do
{^pid, {:data, data}} ->
{result, _} = :string.to_integer(data)
{nb_nodes, result}
end
end)
end

@doc """
Execute the hypergeometric distribution simulation from a given number of nodes.
Expand Down

0 comments on commit 5466387

Please sign in to comment.