Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Commit

Permalink
Fix ARP peer staleness time and add a throttle for ARP requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Vagabond committed Jan 7, 2020
1 parent e63c968 commit 986e8fa
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 31 deletions.
1 change: 1 addition & 0 deletions eqc/stream_eqc.erl
Expand Up @@ -135,6 +135,7 @@ prop_correct() ->
?FORALL({Packet, Cmds},
{eqc_gen:largebinary(500000), noshrink(commands(?MODULE))},
begin
application:ensure_all_started(throttle),
application:ensure_all_started(ranch),
application:ensure_all_started(lager),
lager:set_loglevel(lager_console_backend, debug),
Expand Down
1 change: 1 addition & 0 deletions rebar.config
Expand Up @@ -27,6 +27,7 @@
{nat, ".*", {git, "https://github.com/benoitc/erlang-nat", {branch, "master"}}},
{gpb, "4.2.1"},
{backoff, "1.1.6"},
{throttle, "0.2.0", {pkg, lambda_throttle}},
{cuttlefish, ".*", {git, "https://github.com/helium/cuttlefish", {branch, "develop"}}},
{inet_ext, ".*", {git, "https://github.com/benoitc/inet_ext", {branch, "master"}}},
{splicer, ".*", {git, "https://github.com/helium/erlang-splicer.git", {branch, "master"}}},
Expand Down
6 changes: 4 additions & 2 deletions rebar.lock
Expand Up @@ -40,7 +40,8 @@
{<<"splicer">>,
{git,"https://github.com/helium/erlang-splicer.git",
{ref,"e1f47056786c76bf028b460e2a8cf70d7c31e7a6"}},
0}]}.
0},
{<<"throttle">>,{pkg,<<"lambda_throttle">>,<<"0.2.0">>},0}]}.
[
{pkg_hash,[
{<<"backoff">>, <<"83B72ED2108BA1EE8F7D1C22E0B4A00CFE3593A67DBC792799E8CCE9F42F796B">>},
Expand All @@ -60,5 +61,6 @@
{<<"ranch">>, <<"F04166F456790FEE2AC1AA05A02745CC75783C2BFB26D39FAF6AEFC9A3D3A58A">>},
{<<"rand_compat">>, <<"011646BC1F0B0C432FE101B816F25B9BBB74A085713CEE1DAFD2D62E9415EAD3">>},
{<<"rocksdb">>, <<"635F7D52C2B3E5399617C080BA5FE30AB255FD5D9D8180FFDD298E11EA1D62F4">>},
{<<"small_ints">>, <<"82A824C8794A2DDC73CB5CD00EAD11331DB296521AD16A619C13D668572B868A">>}]}
{<<"small_ints">>, <<"82A824C8794A2DDC73CB5CD00EAD11331DB296521AD16A619C13D668572B868A">>},
{<<"throttle">>, <<"E881B46D9836AFB70F3E2FA3BE9B0140805BA324ED26AA734FF6C5C1568C6CA7">>}]}
].
2 changes: 1 addition & 1 deletion src/group/libp2p_group_gossip_handler.erl
Expand Up @@ -2,7 +2,7 @@


-callback init_gossip_data(State::any()) -> init_result().
-callback handle_gossip_data(Msg::binary(), State::any()) -> {reply, iodata()} | noreply.
-callback handle_gossip_data(StreamPid::pid(), Msg::binary(), State::any()) -> {reply, iodata()} | noreply.

-type init_result() :: ok | {send, binary()}.

Expand Down
2 changes: 1 addition & 1 deletion src/group/libp2p_group_gossip_server.erl
Expand Up @@ -106,7 +106,7 @@ handle_cast({handle_data, StreamPid, Key, Msg}, State=#state{}) ->
{ok, {M, S}} ->
%% Catch the callback response. This avoids a crash in the
%% handler taking down the gossip_server itself.
try M:handle_gossip_data(Msg, S) of
try M:handle_gossip_data(StreamPid, Msg, S) of
{reply, Reply} ->
%% handler wants to reply
case (catch libp2p_gossip_stream:encode(Key, Reply)) of
Expand Down
3 changes: 2 additions & 1 deletion src/libp2p.app.src
Expand Up @@ -32,7 +32,8 @@
splicer,
backoff,
relcast,
libp2p_crypto
libp2p_crypto,
throttle
]},
{env,[]},

Expand Down
43 changes: 25 additions & 18 deletions src/peerbook/libp2p_peer_resolution.erl
Expand Up @@ -5,7 +5,7 @@
-include("pb/libp2p_peer_resolution_pb.hrl").

%% libp2p_group_gossip_handler
-export([handle_gossip_data/2, init_gossip_data/1]).
-export([handle_gossip_data/3, init_gossip_data/1]).

-export([resolve/3, install_handler/2]).

Expand All @@ -14,44 +14,51 @@

-spec resolve(pid(), libp2p_crypto:pubkey_bin(), non_neg_integer()) -> ok.
resolve(GossipGroup, PK, Ts) ->
lager:debug("ARP request for ~p", [libp2p_crypto:pubkey_bin_to_p2p(PK)]),
libp2p_group_gossip:send(GossipGroup, ?GOSSIP_GROUP_KEY,
libp2p_peer_resolution_pb:encode_msg(
#libp2p_peer_resolution_msg_pb{
msg = {request, #libp2p_peer_request_pb{pubkey=PK, timestamp=Ts}}})),
ok.

install_handler(G, Handle) ->
throttle:setup(?MODULE, 3, per_minute),
libp2p_group_gossip:add_handler(G, ?GOSSIP_GROUP_KEY, {?MODULE, Handle}),
ok.

%%
%% Gossip Group
%%

-spec handle_gossip_data(binary(), libp2p_peerbook:peerbook()) -> {reply, iodata()} | noreply.
handle_gossip_data(Data, Handle) ->
-spec handle_gossip_data(pid(), binary(), libp2p_peerbook:peerbook()) -> {reply, iodata()} | noreply.
handle_gossip_data(StreamPid, Data, Handle) ->
case libp2p_peer_resolution_pb:decode_msg(Data, libp2p_peer_resolution_msg_pb) of
#libp2p_peer_resolution_msg_pb{msg = {request, #libp2p_peer_request_pb{pubkey=PK, timestamp=Ts}}} ->
%% look up our peerbook for a newer record for this peer
case libp2p_peerbook:get(Handle, PK) of
{ok, Peer} ->
case libp2p_peer:timestamp(Peer) > Ts of
true ->
lager:notice("ARP response for ~p Success", [PK]),
{reply, libp2p_peer_resolution_pb:encode_msg(
#libp2p_peer_resolution_msg_pb{msg = {response, Peer}})};
false ->
lager:notice("ARP response for ~p Failed - stale", [PK]),
%% peer is as stale or staler than what they have
case throttle:check(?MODULE, StreamPid) of
{ok, _, _} ->
%% look up our peerbook for a newer record for this peer
case libp2p_peerbook:get(Handle, PK) of
{ok, Peer} ->
case libp2p_peer:timestamp(Peer) > Ts of
true ->
lager:debug("ARP response for ~p Success", [libp2p_crypto:pubkey_bin_to_p2p(PK)]),
{reply, libp2p_peer_resolution_pb:encode_msg(
#libp2p_peer_resolution_msg_pb{msg = {response, Peer}})};
false ->
lager:debug("ARP response for ~p Failed - stale", [libp2p_crypto:pubkey_bin_to_p2p(PK)]),
%% peer is as stale or staler than what they have
noreply
end;
_ ->
lager:debug("ARP response for ~p Failed - notfound", [libp2p_crypto:pubkey_bin_to_p2p(PK)]),
%% don't have this peer
noreply
end;
_ ->
lager:notice("ARP response for ~p Failed - notfound", [PK]),
%% don't have this peer
{limit_exceeded, _, _} ->
noreply
end;
#libp2p_peer_resolution_msg_pb{msg = {response, #libp2p_signed_peer_pb{} = Peer}} ->
lager:notice("ARP request for ~p", [libp2p_peer:pubkey_bin(Peer)]),
lager:debug("ARP result for ~p", [libp2p_crypto:pubkey_bin_to_p2p(libp2p_peer:pubkey_bin(Peer))]),
%% send this peer to the peerbook
libp2p_peerbook:put(Handle, [Peer]),
noreply
Expand Down
11 changes: 5 additions & 6 deletions src/peerbook/libp2p_peerbook.erl
Expand Up @@ -6,7 +6,7 @@
register_session/3, unregister_session/2, blacklist_listen_addr/3,
add_association/3, lookup_association/3]).
%% libp2p_group_gossip_handler
-export([handle_gossip_data/2, init_gossip_data/1]).
-export([handle_gossip_data/3, init_gossip_data/1]).

-type opt() :: {stale_time, pos_integer()}
| {peer_time, pos_integer()}.
Expand Down Expand Up @@ -135,10 +135,9 @@ refresh(#peerbook{tid=TID}=Handle, ID) when is_binary(ID) ->
end
end;
refresh(#peerbook{tid=TID}, Peer) ->
Opts = libp2p_swarm:opts(TID),
StaleTime = libp2p_config:get_opt(Opts, [?MODULE, stale_time], ?DEFAULT_STALE_TIME),
TimeDiffMinutes = application:get_env(libp2p, similarity_time_diff_mins, 6),
case libp2p_peer:network_id_allowable(Peer, libp2p_swarm:network_id(TID)) andalso
libp2p_peer:is_stale(Peer, StaleTime) of
libp2p_peer:is_stale(Peer, timer:minutes(TimeDiffMinutes)) of
false ->
ok;
true ->
Expand Down Expand Up @@ -232,8 +231,8 @@ lookup_association(Handle=#peerbook{}, AssocType, AssocAddress) ->
%% Gossip Group
%%

-spec handle_gossip_data(binary(), peerbook()) -> noreply.
handle_gossip_data(Data, Handle) ->
-spec handle_gossip_data(pid(), binary(), peerbook()) -> noreply.
handle_gossip_data(_StreamPid, Data, Handle) ->
DecodedList = libp2p_peer:decode_list(Data),
?MODULE:put(Handle, DecodedList),
noreply.
Expand Down
4 changes: 2 additions & 2 deletions test/group_gossip_SUITE.erl
Expand Up @@ -7,7 +7,7 @@

-export([all/0, init_per_testcase/2, end_per_testcase/2]).
-export([connection_test/1, gossip_test/1, seed_test/1]).
-export([init_gossip_data/1, handle_gossip_data/2]).
-export([init_gossip_data/1, handle_gossip_data/3]).

all() ->
[
Expand Down Expand Up @@ -129,7 +129,7 @@ seed_test(Config) ->
init_gossip_data(_) ->
ok.

handle_gossip_data(Msg, Parent) ->
handle_gossip_data(_StreamPid, Msg, Parent) ->
Parent ! {handle_gossip_data, Msg},
noreply.

Expand Down
1 change: 1 addition & 0 deletions test/test_util.erl
Expand Up @@ -10,6 +10,7 @@ setup() ->
application:ensure_all_started(lager),
lager:set_loglevel(lager_console_backend, debug),
lager:set_loglevel({lager_file_backend, "log/console.log"}, debug),
application:ensure_all_started(throttle),
ok.

setup_swarms(0, _Opts, Acc) ->
Expand Down

0 comments on commit 986e8fa

Please sign in to comment.