Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the h3dex to find gateways in the selected hex for poc notifications #58

Merged
merged 4 commits into from Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 85 additions & 20 deletions src/grpc/helium_stream_poc_impl.erl
Expand Up @@ -8,7 +8,8 @@
-type handler_state() :: #{
mod => atom(),
streaming_initialized => boolean(),
addr => libp2p_crypto:pubkey_bin()
addr => libp2p_crypto:pubkey_bin(),
location => undefined | pos_integer()
}.
-export_type([handler_state/0]).

Expand All @@ -29,7 +30,7 @@ init(_RPC, StreamState) ->
lager:debug("handler init, stream state ~p", [StreamState]),
NewStreamState = grpcbox_stream:stream_handler_state(
StreamState,
#{streaming_initialized => false, mod => ?MODULE}
#{streaming_initialized => false, mod => ?MODULE, location => undefined}
),
NewStreamState.

Expand Down Expand Up @@ -71,6 +72,40 @@ handle_info(
%% the payload is fully formed and encoded
NewStreamState = grpcbox_stream:send(false, Msg, StreamState),
NewStreamState;
handle_info(
{asserted_gw_notify, Addr},
StreamState
) ->
lager:debug("got asserted_gw_notify for addr ~p, resubscribing to new hex", [Addr]),
Chain = sibyl_mgr:blockchain(),
Ledger = blockchain:ledger(Chain),
#{location := OldLoc} =
HandlerState =
grpcbox_stream:stream_handler_state(StreamState),
NewHandlerState =
case blockchain_ledger_v1:find_gateway_location(Addr, Ledger) of
{ok, NewLoc} when OldLoc == undefined ->
%% first time assert for GW
%% need to sub all our things
ok = subscribe_to_events(NewLoc, Addr, Ledger),
%% save new location to state
HandlerState#{location => NewLoc};
{ok, NewLoc} when NewLoc /= OldLoc ->
%% gw has reasserted to new location
%% unsub from old location, sub to new
_ = sibyl_bus:leave(location_topic(OldLoc, Ledger), self()),
_ = sibyl_bus:sub(location_topic(NewLoc, Ledger), self()),
%% save new location to state
HandlerState#{location => NewLoc};
_ ->
%% hmm
HandlerState
end,
NewStreamState =
grpcbox_stream:stream_handler_state(
StreamState, NewHandlerState
),
NewStreamState;
handle_info(
_Msg,
StreamState
Expand All @@ -81,7 +116,6 @@ handle_info(
%% ------------------------------------------------------------------
%% callback breakout functions
%% ------------------------------------------------------------------

-spec pocs(
blockchain:blockchain(),
boolean(),
Expand Down Expand Up @@ -121,17 +155,29 @@ pocs(
false ->
{grpc_error, {grpcbox_stream:code_to_status(14), <<"bad signature">>}};
true ->
lager:info("gw ~p is subscribing to poc events", [?TO_ANIMAL_NAME(Addr)]),
ok = subscribe_to_events(Addr),
Ledger = blockchain:ledger(Chain),
HandlerState = grpcbox_stream:stream_handler_state(StreamState),
NewHandlerState =
case blockchain_ledger_v1:find_gateway_location(Addr, Ledger) of
{ok, Loc} when Loc /= undefined ->
lager:info("gw ~p is subscribing to poc events", [?TO_ANIMAL_NAME(Addr)]),
%% GW is asserted, we are good to proceed
ok = subscribe_to_events(Loc, Addr, Ledger),
ok = check_if_reactivated_gw(Addr, Ledger),
HandlerState#{location => Loc};
_ ->
lager:info("unasserted gw ~p is subscribing to poc events", [
?TO_ANIMAL_NAME(Addr)
]),
HandlerState
end,
NewStreamState = grpcbox_stream:stream_handler_state(
StreamState,
HandlerState#{
NewHandlerState#{
streaming_initialized => true,
addr => Addr
}
),
_ = check_if_reactivated_gw(Addr, Chain),
{ok, NewStreamState}
end.

Expand Down Expand Up @@ -163,9 +209,11 @@ handle_event(
lager:warning("received unhandled event ~p", [_Event]),
StreamState.

-spec check_if_reactivated_gw(libp2p_crypto:pubkey_bin(), blockchain:blockchain()) -> ok.
check_if_reactivated_gw(GWAddr, Chain) ->
Ledger = blockchain:ledger(Chain),
-spec check_if_reactivated_gw(
libp2p_crypto:pubkey_bin(),
blockchain_ledger_v1:ledger()
) -> ok.
check_if_reactivated_gw(GWAddr, Ledger) ->
{ok, CurHeight} = blockchain_ledger_v1:current_height(Ledger),
case blockchain:config(poc_activity_filter_enabled, Ledger) of
{ok, true} ->
Expand All @@ -176,7 +224,8 @@ check_if_reactivated_gw(GWAddr, Chain) ->
{ok, undefined} ->
%% No activity set, so include in list to reactivate
%% this means it will become available for POC
true = sibyl_poc_mgr:cache_reactivated_gw(GWAddr);
true = sibyl_poc_mgr:cache_reactivated_gw(GWAddr),
ok;
{ok, C} ->
{ok, MaxActivityAge} =
case
Expand All @@ -190,7 +239,8 @@ check_if_reactivated_gw(GWAddr, Chain) ->
case (CurHeight - C) > MaxActivityAge of
true ->
lager:debug("reactivating gw ~p", [?TO_ANIMAL_NAME(GWAddr)]),
true = sibyl_poc_mgr:cache_reactivated_gw(GWAddr);
true = sibyl_poc_mgr:cache_reactivated_gw(GWAddr),
ok;
false ->
ok
end
Expand All @@ -200,12 +250,27 @@ check_if_reactivated_gw(GWAddr, Chain) ->
ok
end.

-spec subscribe_to_events(libp2p_crypto:pubkey_bin()) -> ok.
subscribe_to_events(Addr) ->
%% topic key for POC streams is the pub key bin
%% streamed msgs will be received & published by the sibyl_poc_mgr
%% streamed POC msgs will be potential challenge notifications
%% we also want to activity check events
POCTopic = sibyl_utils:make_poc_topic(Addr),
[sibyl_bus:sub(E, self()) || E <- [?EVENT_ACTIVITY_CHECK_NOTIFICATION, POCTopic]],
-spec subscribe_to_events(
Loc :: pos_integer(),
Addr :: libp2p_crypto:pubkey_bin(),
Ledger :: blockchain_ledger_v1:ledger()
) -> ok.
subscribe_to_events(Loc, Addr, Ledger) ->
%% topic key for POC streams is the hex of their asserted location
%% streamed POC notification is published by sibyl_poc_mgr
_ = sibyl_bus:sub(location_topic(Loc, Ledger), self()),
%% subscribe activity check events
_ = sibyl_bus:sub(?EVENT_ACTIVITY_CHECK_NOTIFICATION, self()),
%% subscribe to reassert notifications
ReassertTopic = sibyl_utils:make_asserted_gw_topic(Addr),
_ = sibyl_bus:sub(ReassertTopic, self()),
ok.

-spec location_topic(
Loc :: pos_integer(),
Ledger :: blockchain_ledger_v1:ledger()
) -> binary().
location_topic(Loc, Ledger) ->
{ok, Res} = blockchain:config(?poc_target_hex_parent_res, Ledger),
Hex = h3:parent(Loc, Res),
sibyl_utils:make_poc_topic(Hex).
29 changes: 8 additions & 21 deletions src/sibyl_poc_mgr.erl
Expand Up @@ -150,25 +150,16 @@ handle_poc_event(
],
{noreply, State#state{}}.

run_poc_targetting(ChallengerAddr, Key, Ledger, BlockHash, Vars) ->
run_poc_targetting(ChallengerAddr, Key, Ledger, BlockHash, _Vars) ->
Entropy = <<Key/binary, BlockHash/binary>>,
ZoneRandState = blockchain_utils:rand_state(Entropy),
TargetMod = blockchain_utils:target_v_to_mod(blockchain:config(?poc_targeting_version, Ledger)),
case TargetMod:target_zone(ZoneRandState, Ledger) of
{error, _} ->
lager:debug("*** failed to find a target zone", []),
noop;
{ok, {HexList, Hex, HexRandState}} ->
%% get all GWs in this zone
{ok, ZoneGWs} = TargetMod:gateways_for_zone(
ChallengerAddr,
Ledger,
Vars,
HexList,
[{Hex, HexRandState}]
),
lager:debug("*** found gateways for target zone: ~p", [ZoneGWs]),
%% create the notification
{ok, {_HexList, Hex, _HexRandState}} ->
%% create notification informing GWs they may be being challenged
case sibyl_utils:address_data([ChallengerAddr]) of
[] ->
lager:debug("*** no public addr for ~p", [ChallengerAddr]),
Expand All @@ -186,14 +177,10 @@ run_poc_targetting(ChallengerAddr, Key, Ledger, BlockHash, Vars) ->
NotificationPB,
sibyl_mgr:sigfun()
),
%% send the notification to all the GWs in the zone, informing them they might be being challenged
lists:foreach(
fun(GW) ->
Topic = sibyl_utils:make_poc_topic(GW),
lager:debug("*** sending poc notification to gateway ~p", [GW]),
sibyl_bus:pub(Topic, {poc_notify, Notification})
end,
ZoneGWs
)
%% send the notification to all the GWs in the zone,
%% GWs subscribe by the hex they are located in
Topic = sibyl_utils:make_poc_topic(Hex),
lager:debug("*** sending poc notification to all gateways in hex ~p", [Hex]),
sibyl_bus:pub(Topic, {poc_notify, Notification})
end
end.
5 changes: 3 additions & 2 deletions src/sibyl_utils.erl
Expand Up @@ -45,8 +45,9 @@ make_event(EventType, EventPayload) ->
make_sc_topic(SCID) ->
<<?EVENT_STATE_CHANNEL_UPDATE/binary, SCID/binary>>.

make_poc_topic(GatewayAddr) ->
<<?EVENT_POC_NOTIFICATION/binary, GatewayAddr/binary>>.
make_poc_topic(Hex) ->
HexB = ensure(binary, Hex),
<<?EVENT_POC_NOTIFICATION/binary, HexB/binary>>.

make_config_update_topic() ->
<<?EVENT_CONFIG_UPDATE_NOTIFICATION/binary>>.
Expand Down