From da77d0e4b29a5c9d0adcce3a8df21d3a2580d21f Mon Sep 17 00:00:00 2001 From: Samuel Manzanera Date: Mon, 20 Mar 2023 15:04:11 +0100 Subject: [PATCH] Optimize network stats fetching --- lib/archethic/beacon_chain.ex | 99 +-------- .../beacon_chain/network_coordinates.ex | 205 +++++++++++++++--- lib/archethic/beacon_chain/subset.ex | 8 +- .../beacon_chain/subset/summary_cache.ex | 4 +- .../p2p/message/get_network_stats.ex | 53 +++-- lib/archethic/p2p/message/network_stats.ex | 51 ++++- .../beacon_chain/network_coordinates_test.exs | 33 +-- test/archethic/beacon_chain/subset_test.exs | 13 +- 8 files changed, 292 insertions(+), 174 deletions(-) diff --git a/lib/archethic/beacon_chain.ex b/lib/archethic/beacon_chain.ex index 36d0e14a8..dc519e87b 100644 --- a/lib/archethic/beacon_chain.ex +++ b/lib/archethic/beacon_chain.ex @@ -9,6 +9,7 @@ defmodule Archethic.BeaconChain do alias __MODULE__.Slot.Validation, as: SlotValidation alias __MODULE__.SlotTimer alias __MODULE__.Subset + alias __MODULE__.NetworkCoordinates alias __MODULE__.Subset.P2PSampling alias __MODULE__.Subset.SummaryCache alias __MODULE__.Summary @@ -194,6 +195,10 @@ defmodule Archethic.BeaconChain do @spec get_summary_slots(binary()) :: list(TransactionSummary.t()) def get_summary_slots(subset) when is_binary(subset) do SummaryCache.stream_current_slots(subset) + |> Stream.map(fn + {slot, _} -> slot + slot -> slot + end) |> Stream.flat_map(fn %Slot{transaction_attestations: transaction_attestations} -> transaction_summaries = transaction_attestations @@ -419,97 +424,7 @@ defmodule Archethic.BeaconChain do Retrieve the network stats for a given subset from the cached slots """ @spec get_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()} - def get_network_stats(subset) do - subset - |> SummaryCache.stream_current_slots() - |> Stream.filter(&match?({_, _}, &1)) - |> Stream.map(fn - {%Slot{p2p_view: %{network_stats: net_stats}}, node} -> - {node, net_stats} - end) - |> Stream.reject(fn - {_, %{latencies: []}} -> - true - - _ -> - false - end) - |> Enum.reduce(%{}, fn {node, net_stats}, acc -> - Map.update(acc, node, [net_stats], &(&1 ++ [net_stats])) - end) - |> Enum.map(fn {node, net_stats} -> - aggregated_stats = - net_stats - |> Enum.zip() - |> Enum.map(fn stats -> - aggregated_latency = - stats - |> Tuple.to_list() - |> Enum.map(& &1.latency) - # The logistic regression is used to avoid impact of outliers - # while providing a weighted approach to priorize the latest samples. - |> weighted_logistic_regression() - |> trunc() - - %{latency: aggregated_latency} - end) - - {node, aggregated_stats} - end) - |> Enum.into(%{}) - end - - defp weighted_logistic_regression(list) do - %{sum_weight: sum_weight, sum_weighted_list: sum_weighted_list} = - list - |> clean_outliers() - # We want to apply a weight based on the tier of the latency - |> Utils.chunk_list_in(3) - |> weight_list() - |> Enum.reduce(%{sum_weight: 0.0, sum_weighted_list: 0.0}, fn {weight, weighted_list}, - acc -> - acc - |> Map.update!(:sum_weighted_list, &(&1 + Enum.sum(weighted_list))) - |> Map.update!(:sum_weight, &(&1 + weight * Enum.count(weighted_list))) - end) - - sum_weighted_list / sum_weight - end - - defp clean_outliers(list) do - list_size = Enum.count(list) - - sorted_list = Enum.sort(list) - - # Compute percentiles (P80, P20) to remove the outliers - p1 = (0.8 * list_size) |> trunc() - p2 = (0.2 * list_size) |> trunc() - - max = Enum.at(sorted_list, p1) - min = Enum.at(sorted_list, p2) - - Enum.map(list, fn - x when x < min -> - min - - x when x > max -> - max - - x -> - x - end) - end - - defp weight_list(list) do - list - |> Enum.with_index() - |> Enum.map(fn {list, i} -> - # Apply weight of the tier - weight = (i + 1) * (1 / 3) - - weighted_list = Enum.map(list, &(&1 * weight)) - - {weight, weighted_list} - end) + def get_network_stats(subset) when is_binary(subset) do + NetworkCoordinates.aggregate_network_stats(subset) end end diff --git a/lib/archethic/beacon_chain/network_coordinates.ex b/lib/archethic/beacon_chain/network_coordinates.ex index 7f04c1bde..9ee3093b3 100644 --- a/lib/archethic/beacon_chain/network_coordinates.ex +++ b/lib/archethic/beacon_chain/network_coordinates.ex @@ -6,14 +6,20 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do @digits ["F", "E", "D", "C", "B", "A", "9", "8", "7", "6", "5", "4", "3", "2", "1", "0"] alias Archethic.BeaconChain + alias Archethic.BeaconChain.Slot + alias Archethic.BeaconChain.Subset.SummaryCache alias Archethic.BeaconChain.Subset.P2PSampling + alias Archethic.Crypto + alias Archethic.Election alias Archethic.P2P alias Archethic.P2P.Message.GetNetworkStats alias Archethic.P2P.Message.NetworkStats + alias Archethic.Utils + alias Archethic.TaskSupervisor @doc """ @@ -211,41 +217,82 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do matrix = Nx.broadcast(0, {nb_nodes, nb_nodes}) - Task.async_stream( - BeaconChain.list_subsets(), - fn subset -> - beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes) - - stats = - Task.Supervisor.async_stream_nolink( - TaskSupervisor, - beacon_nodes, - &P2P.send_message(&1, %GetNetworkStats{subset: subset}), - on_timeout: :kill_task, - ordered: false - ) - |> Stream.filter(fn - # Filter slots with P2P view - {:ok, {:ok, %NetworkStats{stats: stats}}} when map_size(stats) > 0 -> - true - - _ -> - false - end) - |> Stream.map(fn {:ok, {:ok, %NetworkStats{stats: stats}}} -> Map.to_list(stats) end) - |> Enum.to_list() - |> List.flatten() - |> Enum.reduce(%{}, fn {node, stats}, acc -> - Map.put(acc, node, stats) - end) + %{bounded: bounded_subsets, unbounded: unbounded_subsets} = + get_subsets_bounding(summary_time, authorized_nodes) + + bounded_netstats = + bounded_subsets + |> Task.async_stream(&{&1, aggregate_network_stats(&1)}) + |> Enum.reduce(%{}, fn + {:ok, {subset, stats}}, acc when map_size(stats) > 0 -> + Map.put(acc, subset, stats) + + _, acc -> + acc + end) + + unbounded_matrix = + unbounded_subsets + # Aggregate subsets by node + |> Enum.reduce(%{}, fn {subset, beacon_nodes}, acc -> + Enum.reduce(beacon_nodes, acc, fn node, acc -> + Map.update(acc, node, [subset], &[subset | &1]) + end) + end) + |> stream_subsets_stats() + # Aggregate stats per node to identify the sampling nodes + |> aggregate_stats_per_subset() + |> update_matrix_from_stats(matrix, sorted_node_list) + + full_matrix = update_matrix_from_stats(bounded_netstats, unbounded_matrix, sorted_node_list) + full_matrix + end + + defp get_subsets_bounding(summary_time, authorized_nodes) do + Enum.reduce(BeaconChain.list_subsets(), %{bounded: [], unbounded: []}, fn subset, acc -> + beacon_nodes = Election.beacon_storage_nodes(subset, summary_time, authorized_nodes) + + if Utils.key_in_node_list?(beacon_nodes, Crypto.first_node_public_key()) do + Map.update!(acc, :bounded, &[subset | &1]) + else + Map.update!(acc, :unbounded, &[{subset, beacon_nodes} | &1]) + end + end) + end - {subset, stats} + defp stream_subsets_stats(subsets_by_node) do + Task.Supervisor.async_stream_nolink( + TaskSupervisor, + subsets_by_node, + fn {node, subsets} -> + P2P.send_message(node, %GetNetworkStats{subsets: subsets}) end, + ordered: false, on_timeout: :kill_task, - ordered: false + max_concurrency: 256 ) - |> Stream.filter(&match?({:ok, _}, &1)) - |> Enum.reduce(matrix, fn {:ok, {subset, stats}}, acc -> + |> Stream.filter(fn + {:ok, {:ok, %NetworkStats{stats: stats}}} when map_size(stats) > 0 -> + true + + _ -> + false + end) + |> Stream.map(fn {:ok, {:ok, %NetworkStats{stats: stats}}} -> stats end) + end + + defp aggregate_stats_per_subset(stats) do + stats + |> Enum.flat_map(& &1) + |> Enum.reduce(%{}, fn {subset, stats}, acc -> + Enum.reduce(stats, acc, fn {node, stats}, acc -> + Map.update(acc, subset, %{node => stats}, &Map.put(&1, node, stats)) + end) + end) + end + + defp update_matrix_from_stats(stats_by_subset, matrix, sorted_node_list) do + Enum.reduce(stats_by_subset, matrix, fn {subset, stats}, acc -> sampling_nodes = P2PSampling.list_nodes_to_sample(subset) Enum.reduce(stats, acc, fn {node_public_key, stats}, acc -> @@ -330,4 +377,100 @@ defmodule Archethic.BeaconChain.NetworkCoordinates do matrix end end + + @doc """ + Aggregate the network stats from the SummaryCache + + The summary cache holds the slots of the current summary identified by beacon node. + Hence we can aggregate the view of one particular beacon node regarding the nodes sampled. + + The aggregation is using some weighted logistic regression. + """ + @spec aggregate_network_stats(binary()) :: %{Crypto.key() => Slot.net_stats()} + def aggregate_network_stats(subset) when is_binary(subset) do + subset + |> SummaryCache.stream_current_slots() + |> Stream.filter(&match?({%Slot{p2p_view: %{network_stats: [_ | _]}}, _}, &1)) + |> Stream.map(fn + {%Slot{p2p_view: %{network_stats: net_stats}}, node} -> + {node, net_stats} + end) + |> Enum.reduce(%{}, fn {node, net_stats}, acc -> + Map.update(acc, node, [net_stats], &(&1 ++ [net_stats])) + end) + |> Enum.map(fn {node, net_stats} -> + aggregated_stats = + net_stats + |> Enum.zip() + |> Enum.map(fn stats -> + aggregated_latency = + stats + |> Tuple.to_list() + |> Enum.map(& &1.latency) + # The logistic regression is used to avoid impact of outliers + # while providing a weighted approach to priorize the latest samples. + |> weighted_logistic_regression() + |> trunc() + + %{latency: aggregated_latency} + end) + + {node, aggregated_stats} + end) + |> Enum.into(%{}) + end + + defp weighted_logistic_regression(list) do + %{sum_weight: sum_weight, sum_weighted_list: sum_weighted_list} = + list + |> clean_outliers() + # We want to apply a weight based on the tier of the latency + |> Utils.chunk_list_in(3) + |> weight_list() + |> Enum.reduce(%{sum_weight: 0.0, sum_weighted_list: 0.0}, fn {weight, weighted_list}, + acc -> + acc + |> Map.update!(:sum_weighted_list, &(&1 + Enum.sum(weighted_list))) + |> Map.update!(:sum_weight, &(&1 + weight * Enum.count(weighted_list))) + end) + + sum_weighted_list / sum_weight + end + + defp clean_outliers(list) do + list_size = Enum.count(list) + + sorted_list = Enum.sort(list) + + # Compute percentiles (P80, P20) to remove the outliers + p1 = (0.8 * list_size) |> trunc() + p2 = (0.2 * list_size) |> trunc() + + max = Enum.at(sorted_list, p1) + min = Enum.at(sorted_list, p2) + + Enum.map(list, fn + x when x < min -> + min + + x when x > max -> + max + + x -> + x + end) + end + + defp weight_list(list) do + list + |> Enum.with_index() + |> Enum.map(fn {list, i} -> + # Apply weight of the tier + weight = (i + 1) * (1 / 3) + + weighted_list = Enum.map(list, &(&1 * weight)) + + {weight, weighted_list} + end) + end end diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index ebe4e683f..2ee7bb0d5 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -355,7 +355,13 @@ defmodule Archethic.BeaconChain.Subset do end defp handle_summary(time, subset) do - beacon_slots = SummaryCache.stream_current_slots(subset) + beacon_slots = + subset + |> SummaryCache.stream_current_slots() + |> Stream.map(fn + {slot, _} -> slot + slot -> slot + end) if Enum.empty?(beacon_slots) do :ok diff --git a/lib/archethic/beacon_chain/subset/summary_cache.ex b/lib/archethic/beacon_chain/subset/summary_cache.ex index fb66a2137..9e1e49a83 100644 --- a/lib/archethic/beacon_chain/subset/summary_cache.ex +++ b/lib/archethic/beacon_chain/subset/summary_cache.ex @@ -3,7 +3,9 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do Handle the caching of the beacon slots defined for the summary """ + alias Archethic.BeaconChain alias Archethic.BeaconChain.Slot + alias Archethic.BeaconChain.SlotTimer alias Archethic.BeaconChain.SummaryTimer alias Archethic.Crypto @@ -51,7 +53,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do # Check if there are slots for the next summary contains_next_summary_slots? = subset - |> SummaryCache.stream_current_slots() + |> stream_current_slots() |> Enum.any?(fn {%Slot{slot_time: slot_time}, _} -> DateTime.diff(slot_time, previous_summary_time) >= 0 diff --git a/lib/archethic/p2p/message/get_network_stats.ex b/lib/archethic/p2p/message/get_network_stats.ex index 08c6a7119..c6ab4e0d7 100644 --- a/lib/archethic/p2p/message/get_network_stats.ex +++ b/lib/archethic/p2p/message/get_network_stats.ex @@ -3,16 +3,15 @@ defmodule Archethic.P2P.Message.GetNetworkStats do Represents a message to get the network stats from the beacon summary cache """ - @enforce_keys :subset - defstruct [:subset] + @enforce_keys :subsets + defstruct subsets: [] alias Archethic.BeaconChain alias Archethic.Crypto - alias Archethic.P2P.Message alias Archethic.P2P.Message.NetworkStats @type t :: %__MODULE__{ - subset: binary() + subsets: list(binary()) } @doc """ @@ -20,11 +19,16 @@ defmodule Archethic.P2P.Message.GetNetworkStats do ## Examples - iex> %GetNetworkStats{subset: <<0>>} |> GetNetworkStats.serialize() - <<0>> + iex> %GetNetworkStats{subsets: [<<0>>]} |> GetNetworkStats.serialize() + << + # Length of subsets + 1, + # Subset + 0 + >> """ - def serialize(%__MODULE__{subset: subset}) do - <> + def serialize(%__MODULE__{subsets: subsets}) do + <> end @doc """ @@ -32,15 +36,20 @@ defmodule Archethic.P2P.Message.GetNetworkStats do ## Examples - iex> <<0>> |> GetNetworkStats.deserialize() + iex> <<1, 0>> |> GetNetworkStats.deserialize() { - %GetNetworkStats{subset: <<0>>}, + %GetNetworkStats{subsets: [<<0>>]}, "" } """ - def deserialize(<>) do + def deserialize(<>) do + subsets = + subsets_binary + |> :erlang.binary_to_list() + |> Enum.map(&<<&1>>) + { - %__MODULE__{subset: subset}, + %__MODULE__{subsets: subsets}, rest } end @@ -48,9 +57,23 @@ defmodule Archethic.P2P.Message.GetNetworkStats do @doc """ Process the message to get the network stats from the summary cache """ - @spec process(t(), Crypto.key()) :: Message.response() - def process(%__MODULE__{subset: subset}, _node_public_key) do - stats = BeaconChain.get_network_stats(subset) + @spec process(t(), Crypto.key()) :: NetworkStats.t() + def process(%__MODULE__{subsets: subsets}, _node_public_key) do + stats = + subsets + |> Task.async_stream(fn subset -> + stats = BeaconChain.get_network_stats(subset) + {subset, stats} + end) + |> Stream.map(fn {:ok, res} -> res end) + |> Enum.reduce(%{}, fn + {subset, stats}, acc when map_size(stats) > 0 -> + Map.put(acc, subset, stats) + + _, acc -> + acc + end) + %NetworkStats{stats: stats} end end diff --git a/lib/archethic/p2p/message/network_stats.ex b/lib/archethic/p2p/message/network_stats.ex index aac668065..3bdba235a 100644 --- a/lib/archethic/p2p/message/network_stats.ex +++ b/lib/archethic/p2p/message/network_stats.ex @@ -3,7 +3,7 @@ defmodule Archethic.P2P.Message.NetworkStats do Represents network stats from the aggregated beacon chain summary's cache """ - defstruct stats: [] + defstruct stats: %{} alias Archethic.BeaconChain.Slot alias Archethic.Crypto @@ -12,7 +12,9 @@ defmodule Archethic.P2P.Message.NetworkStats do @type t :: %__MODULE__{ stats: %{ - Crypto.key() => Slot.net_stats() + (subset :: binary) => %{ + Crypto.key() => Slot.net_stats() + } } } @@ -21,11 +23,15 @@ defmodule Archethic.P2P.Message.NetworkStats do ## Examples - iex> %NetworkStats{stats: %{ + iex> %NetworkStats{stats: %{ <<0>> => %{ ...> <<0, 0, 75, 23, 134, 64, 221, 117, 107, 77, 233, 123, 201, 244, 18, 151, 8, 255, ...> 53, 137, 251, 197, 67, 25, 38, 95, 2, 62, 216, 131, 112, 116, 238, 180>> => [%{latency: 100}, %{latency: 110}, %{latency: 80}] - ...> }} |> NetworkStats.serialize() + ...> }}} |> NetworkStats.serialize() << + # Nb subsets + 1, + # Subset + 0, # Nb node stats 1, 1, # Node public key @@ -41,6 +47,18 @@ defmodule Archethic.P2P.Message.NetworkStats do """ @spec serialize(t()) :: bitstring() def serialize(%__MODULE__{stats: stats}) do + nb_subsets = map_size(stats) + + stats_binary = + Enum.map(stats, fn {subset, stats} -> + serialize_subset_stats(subset, stats) + end) + |> :erlang.list_to_binary() + + <> + end + + defp serialize_subset_stats(subset, stats) do stats_bin = stats |> Enum.map(fn {node_public_key, latencies} -> @@ -57,8 +75,7 @@ defmodule Archethic.P2P.Message.NetworkStats do nb_stats = map_size(stats) nb_stats_bin = VarInt.from_value(nb_stats) - - <> + <> end @doc """ @@ -66,23 +83,24 @@ defmodule Archethic.P2P.Message.NetworkStats do ## Examples - iex> <<1, 1, 0, 0, 75, 23, 134, 64, 221, 117, 107, 77, 233, 123, 201, 244, 18, 151, + iex> <<1, 0, 1, 1, 0, 0, 75, 23, 134, 64, 221, 117, 107, 77, 233, 123, 201, 244, 18, 151, ...> 8, 255, 53, 137, 251, 197, 67, 25, 38, 95, 2, 62, 216, 131, 112, 116, 238, ...> 180, 1, 3, 1, 100, 1, 110, 1, 80>> |> NetworkStats.deserialize() { %NetworkStats{ stats: %{ - <<0, 0, 75, 23, 134, 64, 221, 117, 107, 77, 233, 123, 201, 244, 18, 151, 8, 255, - 53, 137, 251, 197, 67, 25, 38, 95, 2, 62, 216, 131, 112, 116, 238, 180>> => [%{latency: 100}, %{latency: 110}, %{latency: 80}] + <<0>> => %{ + <<0, 0, 75, 23, 134, 64, 221, 117, 107, 77, 233, 123, 201, 244, 18, 151, 8, 255, + 53, 137, 251, 197, 67, 25, 38, 95, 2, 62, 216, 131, 112, 116, 238, 180>> => [%{latency: 100}, %{latency: 110}, %{latency: 80}] + } } }, "" } """ @spec deserialize(bitstring()) :: {t(), bitstring()} - def deserialize(data) when is_bitstring(data) do - {nb_stats, rest} = VarInt.get_value(data) - {stats, rest} = get_stats(rest, nb_stats, %{}) + def deserialize(<>) do + {stats, rest} = get_subsets_stats(rest, nb_subsets, %{}) { %__MODULE__{stats: stats}, @@ -90,6 +108,15 @@ defmodule Archethic.P2P.Message.NetworkStats do } end + defp get_subsets_stats(<<>>, _nb_subsets, acc), do: {acc, <<>>} + defp get_subsets_stats(rest, nb_subsets, acc) when map_size(acc) == nb_subsets, do: {acc, rest} + + defp get_subsets_stats(<>, nb_subsets, acc) do + {nb_stats, rest} = VarInt.get_value(data) + {stats, rest} = get_stats(rest, nb_stats, %{}) + get_subsets_stats(rest, nb_subsets, Map.put(acc, subset, stats)) + end + defp get_stats(<<>>, _nb_stats, acc), do: {acc, <<>>} defp get_stats(rest, nb_stats, acc) when map_size(acc) == nb_stats, do: {acc, rest} diff --git a/test/archethic/beacon_chain/network_coordinates_test.exs b/test/archethic/beacon_chain/network_coordinates_test.exs index 34b3cabed..f9dca62b1 100644 --- a/test/archethic/beacon_chain/network_coordinates_test.exs +++ b/test/archethic/beacon_chain/network_coordinates_test.exs @@ -43,26 +43,29 @@ defmodule Archethic.BeaconChain.NetworkCoordinatesTest do MockClient |> stub(:send_message, fn - _, %GetNetworkStats{subset: <<0>>}, _ -> + _, %GetNetworkStats{subsets: _}, _ -> {:ok, %NetworkStats{ stats: %{ - <<0::8, 0::8, 1::8, "key_b0">> => [ - %{latency: 100}, - %{latency: 110}, - %{latency: 90} - ], - <<0::8, 0::8, 1::8, "key_b1">> => [ - %{latency: 100}, - %{latency: 105}, - %{latency: 90} - ], - <<0::8, 0::8, 1::8, "key_b2">> => [%{latency: 90}, %{latency: 105}, %{latency: 90}] + <<0>> => %{ + <<0::8, 0::8, 1::8, "key_b0">> => [ + %{latency: 100}, + %{latency: 110}, + %{latency: 90} + ], + <<0::8, 0::8, 1::8, "key_b1">> => [ + %{latency: 100}, + %{latency: 105}, + %{latency: 90} + ], + <<0::8, 0::8, 1::8, "key_b2">> => [ + %{latency: 90}, + %{latency: 105}, + %{latency: 90} + ] + } } }} - - _, _, _ -> - {:ok, %NetworkStats{stats: %{}}} end) assert Nx.tensor([ diff --git a/test/archethic/beacon_chain/subset_test.exs b/test/archethic/beacon_chain/subset_test.exs index c89027ab4..b733513ab 100644 --- a/test/archethic/beacon_chain/subset_test.exs +++ b/test/archethic/beacon_chain/subset_test.exs @@ -322,20 +322,19 @@ defmodule Archethic.BeaconChain.SubsetTest do {:ok, %Ok{}} _, %NewBeaconSlot{slot: slot = %Slot{subset: subset}}, _ -> - SummaryCache.add_slot(subset, slot) + SummaryCache.add_slot(subset, slot, Crypto.first_node_public_key()) {:ok, %Ok{}} - _, %GetNetworkStats{subset: ^subset}, _ -> + _, %GetNetworkStats{subsets: _}, _ -> {:ok, %NetworkStats{ stats: %{ - Crypto.first_node_public_key() => [%{latency: 90}, %{latency: 100}], - <<0::8, 0::8, "key_beacon_node2">> => [%{latency: 90}, %{latency: 100}] + <> => %{ + Crypto.first_node_public_key() => [%{latency: 90}, %{latency: 100}], + <<0::8, 0::8, "key_beacon_node2">> => [%{latency: 90}, %{latency: 100}] + } } }} - - _, %GetNetworkStats{}, _ -> - {:ok, %NetworkStats{}} end) MockClient