Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Session key for packet_router #438

Merged
merged 5 commits into from Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
392 changes: 247 additions & 145 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/api/mod.rs
Expand Up @@ -20,6 +20,7 @@ impl TryFrom<RouterRes> for crate::packet_router::RouterStatus {
Ok(Self {
uri: http::Uri::from_str(&value.uri)?,
connected: value.connected,
session_key: helium_crypto::PublicKey::try_from(value.session_key).ok(),
})
}
}
4 changes: 4 additions & 0 deletions src/api/server.rs
Expand Up @@ -76,6 +76,10 @@ impl Api for LocalServer {
Ok(Response::new(RouterRes {
uri: router_status.uri.to_string(),
connected: router_status.connected,
session_key: router_status
.session_key
.map(|k| k.to_vec())
.unwrap_or_default(),
}))
}

Expand Down
9 changes: 6 additions & 3 deletions src/gateway.rs
@@ -1,6 +1,6 @@
use crate::{
beaconer, packet, packet_router, region_watcher, sync, PacketDown, PacketUp, RegionParams,
Result, Settings,
beaconer, packet, packet_router, region_watcher, sync, PacketDown, PacketUp, PublicKey,
RegionParams, Result, Settings,
};
use beacon::Beacon;
use lorawan::PHYPayload;
Expand Down Expand Up @@ -60,6 +60,7 @@ impl MessageSender {
}

pub struct Gateway {
public_key: PublicKey,
messages: MessageReceiver,
uplinks: packet_router::MessageSender,
beacons: beaconer::MessageSender,
Expand All @@ -79,7 +80,9 @@ impl Gateway {
beacons: beaconer::MessageSender,
) -> Result<Self> {
let region_params = region_watcher::current_value(&region_watch);
let public_key = settings.keypair.public_key().clone();
let gateway = Gateway {
public_key,
messages,
uplinks,
beacons,
Expand Down Expand Up @@ -136,7 +139,7 @@ impl Gateway {
info!(%mac, %addr, "disconnected packet forwarder")
}
Event::PacketReceived(rxpk, _gateway_mac) => {
match PacketUp::from_rxpk(rxpk, self.region_params.region) {
match PacketUp::from_rxpk(rxpk, &self.public_key, self.region_params.region) {
Ok(packet) if packet.is_potential_beacon() => {
self.handle_potential_beacon(packet).await;
}
Expand Down
11 changes: 11 additions & 0 deletions src/keypair.rs
Expand Up @@ -28,6 +28,17 @@ pub fn save_to_file(keypair: &Keypair, path: &str) -> io::Result<()> {
Ok(())
}

pub fn mk_session_keypair() -> Keypair {
let keypair = helium_crypto::Keypair::generate(
KeyTag {
network: Network::MainNet,
key_type: KeyType::Ed25519,
},
&mut OsRng,
);
keypair.into()
}

macro_rules! uri_error {
($format:expr) => {
error::DecodeError::keypair_uri(format!($format))
Expand Down
6 changes: 3 additions & 3 deletions src/packet.rs
@@ -1,4 +1,4 @@
use crate::{error::DecodeError, Error, Region, Result};
use crate::{error::DecodeError, Error, PublicKey, Region, Result};
use helium_proto::services::{
poc_lora,
router::{PacketRouterPacketDownV1, PacketRouterPacketUpV1},
Expand Down Expand Up @@ -88,7 +88,7 @@ impl TryFrom<PacketUp> for poc_lora::LoraWitnessReportReqV1 {
}

impl PacketUp {
pub fn from_rxpk(rxpk: push_data::RxPk, region: Region) -> Result<Self> {
pub fn from_rxpk(rxpk: push_data::RxPk, gateway: &PublicKey, region: Region) -> Result<Self> {
if rxpk.get_crc_status() != &CRC::OK {
return Err(DecodeError::invalid_crc());
}
Expand All @@ -105,7 +105,7 @@ impl PacketUp {
snr: rxpk.get_snr(),
region: region.into(),
hold_time: 0,
gateway: vec![],
gateway: gateway.into(),
signature: vec![],
};
Ok(Self(packet))
Expand Down
93 changes: 81 additions & 12 deletions src/packet_router/mod.rs
@@ -1,17 +1,23 @@
use crate::{
gateway,
keypair::mk_session_keypair,
message_cache::{CacheMessage, MessageCache},
service::packet_router::PacketRouterService,
sign, sync, Base64, Keypair, PacketUp, Result, Settings,
};
use exponential_backoff::Backoff;
use futures::TryFutureExt;
use helium_proto::{
services::router::{PacketRouterPacketDownV1, PacketRouterPacketUpV1},
services::router::{
envelope_down_v1, envelope_up_v1, PacketRouterPacketDownV1, PacketRouterPacketUpV1,
PacketRouterSessionInitV1, PacketRouterSessionOfferV1,
},
Message as ProtoMessage,
};
use serde::Serialize;
use std::{sync::Arc, time::Instant as StdInstant};
use tokio::time::{self, Duration, Instant};

use tracing::{debug, info, warn};

const STORE_GC_INTERVAL: Duration = Duration::from_secs(60);
Expand All @@ -34,6 +40,7 @@ pub struct RouterStatus {
#[serde(with = "http_serde::uri")]
pub uri: http::Uri,
pub connected: bool,
pub session_key: Option<helium_crypto::PublicKey>,
}

pub type MessageSender = sync::MessageSender<Message>;
Expand All @@ -58,6 +65,7 @@ pub struct PacketRouter {
transmit: gateway::MessageSender,
service: PacketRouterService,
reconnect_retry: u32,
session_key: Option<Arc<Keypair>>,
keypair: Arc<Keypair>,
store: MessageCache<PacketUp>,
}
Expand All @@ -75,6 +83,7 @@ impl PacketRouter {
Self {
service,
keypair: settings.keypair.clone(),
session_key: None,
transmit,
messages,
store,
Expand Down Expand Up @@ -105,13 +114,15 @@ impl PacketRouter {
message = self.messages.recv() => match message {
Some(Message::Uplink{packet, received}) =>
if self.handle_uplink(packet, received).await.is_err() {
self.disconnect();
warn!("router disconnected");
reconnect_sleep = self.next_connect(&reconnect_backoff, true);
},
Some(Message::Status(tx_resp)) => {
let status = RouterStatus {
uri: self.service.uri.clone(),
connected: self.service.is_connected(),
session_key: self.session_key.as_ref().map(|keypair| keypair.public_key().to_owned()),
};
tx_resp.send(status)
}
Expand All @@ -120,8 +131,9 @@ impl PacketRouter {
_ = time::sleep_until(reconnect_sleep) => {
reconnect_sleep = self.handle_reconnect(&reconnect_backoff).await;
},
downlink = self.service.recv() => match downlink {
Ok(Some(message)) => self.handle_downlink(message).await,
router_message = self.service.recv() => match router_message {
Ok(Some(envelope_down_v1::Data::Packet(message))) => self.handle_downlink(message).await,
Ok(Some(envelope_down_v1::Data::SessionOffer(message))) => self.handle_session_offer(message).await,
Ok(None) => {
warn!("router disconnected");
reconnect_sleep = self.next_connect(&reconnect_backoff, true)
Expand Down Expand Up @@ -154,9 +166,11 @@ impl PacketRouter {
info!("connecting");
let inc_retry = match self.service.reconnect().await {
Ok(_) => {
// Do not send waiting packets here since we wait for a sesson
// offer
info!("connected");
self.reconnect_retry = RECONNECT_BACKOFF_RETRIES;
self.send_waiting_packets().await.is_err()
false
}
Err(err) => {
warn!(%err, "failed to connect");
Expand All @@ -169,7 +183,9 @@ impl PacketRouter {
async fn handle_uplink(&mut self, uplink: PacketUp, received: StdInstant) -> Result {
self.store.push_back(uplink, received);
if self.service.is_connected() {
self.send_waiting_packets().await?;
if let Some(session_key) = &self.session_key {
self.send_waiting_packets(session_key.clone()).await?;
}
}
Ok(())
}
Expand All @@ -178,12 +194,43 @@ impl PacketRouter {
self.transmit.downlink(message.into()).await;
}

async fn send_waiting_packets(&mut self) -> Result {
async fn handle_session_offer(&mut self, message: PacketRouterSessionOfferV1) {
info!("received session offer");
let disconnect = match mk_session_key_init(self.keypair.clone(), &message)
.and_then(|(session_key, session_init)| {
self.service.send(session_init).map_ok(|_| session_key)
})
.await
{
Ok(session_key) => {
self.session_key = Some(session_key.clone());
info!(session_key = %session_key.public_key(),"initialized session");
self.send_waiting_packets(session_key.clone())
.inspect_err(|err| warn!(%err, "failed to send queued packets"))
.await
.is_err()
}
Err(err) => {
warn!(%err, "failed to initialize session");
true
}
};
if disconnect {
self.disconnect();
}
}

fn disconnect(&mut self) {
self.service.disconnect();
self.session_key = None;
}

async fn send_waiting_packets(&mut self, keypair: Arc<Keypair>) -> Result {
while let (removed, Some(packet)) = self.store.pop_front(STORE_GC_INTERVAL) {
if removed > 0 {
info!(removed, "discarded queued packets");
}
if let Err(err) = self.send_packet(&packet).await {
if let Err(err) = self.send_packet(&packet, keypair.clone()).await {
warn!(%err, "failed to send uplink");
self.store.push_front(packet);
return Err(err);
Expand All @@ -192,22 +239,44 @@ impl PacketRouter {
Ok(())
}

async fn send_packet(&mut self, packet: &CacheMessage<PacketUp>) -> Result {
async fn send_packet(
&mut self,
packet: &CacheMessage<PacketUp>,
keypair: Arc<Keypair>,
) -> Result {
debug!(packet_hash = packet.hash().to_b64(), "sending packet");

let uplink = mk_uplink(packet, self.keypair.clone()).await?;
let uplink = mk_uplink(packet, keypair).await?;
self.service.send(uplink).await
}
}

pub async fn mk_uplink(
packet: &CacheMessage<PacketUp>,
keypair: Arc<Keypair>,
) -> Result<PacketRouterPacketUpV1> {
) -> Result<envelope_up_v1::Data> {
use std::ops::Deref;
let mut uplink: PacketRouterPacketUpV1 = packet.deref().into();
uplink.hold_time = packet.hold_time().as_millis() as u64;
uplink.gateway = keypair.public_key().into();
uplink.signature = sign(keypair, uplink.encode_to_vec()).await?;
Ok(uplink)
let envelope = envelope_up_v1::Data::Packet(uplink);
Ok(envelope)
}

pub async fn mk_session_key_init(
keypair: Arc<Keypair>,
offer: &PacketRouterSessionOfferV1,
) -> Result<(Arc<Keypair>, envelope_up_v1::Data)> {
let session_keypair = Arc::new(mk_session_keypair());
let session_key = session_keypair.public_key();

let mut session_init = PacketRouterSessionInitV1 {
gateway: keypair.public_key().into(),
session_key: session_key.into(),
nonce: offer.nonce.clone(),
signature: vec![],
};
session_init.signature = sign(keypair, session_init.encode_to_vec()).await?;
let envelope = envelope_up_v1::Data::SessionInit(session_init);
Ok((session_keypair, envelope))
}
17 changes: 8 additions & 9 deletions src/service/packet_router.rs
Expand Up @@ -13,7 +13,7 @@ use helium_proto::{
services::{
router::{
envelope_down_v1, envelope_up_v1, EnvelopeDownV1, EnvelopeUpV1, PacketRouterClient,
PacketRouterPacketDownV1, PacketRouterPacketUpV1, PacketRouterRegisterV1,
PacketRouterRegisterV1,
},
Channel, Endpoint,
},
Expand Down Expand Up @@ -73,21 +73,19 @@ impl PacketRouterConduit {
Ok(Self { tx, rx })
}

async fn recv(&mut self) -> Result<Option<PacketRouterPacketDownV1>> {
async fn recv(&mut self) -> Result<Option<envelope_down_v1::Data>> {
match self.rx.message().await {
Ok(Some(msg)) => match msg.data {
Some(envelope_down_v1::Data::Packet(packet)) => Ok(Some(packet)),
Some(data) => Ok(Some(data)),
None => Err(DecodeError::invalid_envelope()),
},
Ok(None) => Ok(None),
Err(err) => Err(err.into()),
}
}

async fn send(&mut self, msg: PacketRouterPacketUpV1) -> Result {
let msg = EnvelopeUpV1 {
data: Some(envelope_up_v1::Data::Packet(msg)),
};
async fn send(&mut self, msg: envelope_up_v1::Data) -> Result {
let msg = EnvelopeUpV1 { data: Some(msg) };
Ok(self.tx.send(msg).await?)
}

Expand All @@ -99,6 +97,7 @@ impl PacketRouterConduit {
.as_millis() as u64,
gateway: keypair.public_key().into(),
signature: vec![],
session_capable: true,
};
msg.signature = sign(keypair.clone(), msg.encode_to_vec()).await?;
let msg = EnvelopeUpV1 {
Expand All @@ -117,7 +116,7 @@ impl PacketRouterService {
}
}

pub async fn send(&mut self, msg: PacketRouterPacketUpV1) -> Result {
pub async fn send(&mut self, msg: envelope_up_v1::Data) -> Result {
if self.conduit.is_none() {
self.connect().await?;
}
Expand All @@ -131,7 +130,7 @@ impl PacketRouterService {
}
}

pub async fn recv(&mut self) -> Result<Option<PacketRouterPacketDownV1>> {
pub async fn recv(&mut self) -> Result<Option<envelope_down_v1::Data>> {
// Since recv is usually called from a select loop we don't try a
// connect every time it is called since the rate for attempted
// connections in failure setups would be as high as the loop rate of
Expand Down