Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add infrastructure for more MQTT dialects #362

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ testcontainers = { git = "https://github.com/testcontainers/testcontainers-rs",
drogue-bazaar = { git = "https://github.com/drogue-iot/drogue-bazaar", rev = "d19ad32f200938aeb5d7081ee3385ee40c5ae0ff" } # FIXME: awaiting release 0.4.0
#drogue-bazaar = { path = "../drogue-bazaar" }

drogue-client = { git = "https://github.com/drogue-iot/drogue-client", rev = "798c968f0a63a0debcff9965c66b361e85946458" } # FIXME: awaiting release 0.12.0
drogue-client = { git = "https://github.com/drogue-iot/drogue-client", rev = "c3e5fc6e0ef1781f8362394a114b72738990d00d" } # FIXME: awaiting release 0.12.0
#drogue-client = { path = "../drogue-client" }

operator-framework = { git = "https://github.com/ctron/operator-framework", rev = "8366506a3ed44b638f899dcce4a82ac32fcaff9e" } # FIXME: awaiting release 0.7.0
Expand Down
8 changes: 3 additions & 5 deletions endpoint-common/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,7 @@ impl DeviceAuthenticator {
C: AsRef<str> + Debug,
{
log::debug!(
"Authenticate MQTT - username: {:?}, password: {:?}, client_id: {:?}, certs: {:?}",
username,
password,
client_id,
certs
"Authenticate MQTT - username: {username:?}, password: {password:?}, client_id: {client_id:?}, certs: {certs:?}, verified_identity: {verified_identity:?}",
);

match (
Expand Down Expand Up @@ -303,6 +299,8 @@ impl DeviceAuthenticator {
}
// Client cert only
(None, None, _, Some(certs), None) => self.authenticate_cert(certs.0).await,
// Client cert plus username
(Some(_username), None, _, Some(certs), None) => self.authenticate_cert(certs.0).await,
// TLS-PSK verified identity
(None, None, _, None, Some(verified_identity)) => {
self.authenticate_verified_identity(verified_identity)
Expand Down
11 changes: 8 additions & 3 deletions endpoint-common/src/command/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ pub struct Subscription {

impl Commands {
pub fn new() -> Self {
log::info!("New internal command broker");
Self {
devices: Arc::new(Mutex::new(HashMap::new())),
wildcards: Arc::new(Mutex::new(HashMap::new())),
Expand Down Expand Up @@ -218,21 +219,22 @@ impl Commands {
{
log::debug!("Adding entry for: {key:?}");

let map = match map.entry(key) {
let entry_map = match map.entry(key) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => entry.insert(HashMap::new()),
};

loop {
let id: usize = rand::random();

match map.entry(id) {
match entry_map.entry(id) {
Entry::Vacant(entry) => {
// entry was free, we can insert
entry.insert(value);
break id;
}
Entry::Occupied(_) => {
log::debug!("ID clash, retrying");
// entry is occupied, we need to re-try
}
}
Expand Down Expand Up @@ -263,9 +265,11 @@ impl CommandDispatcher for Commands {

log::debug!("Dispatching command to {:?}", msg.address);

let mut possible: usize = 0;
let mut num: usize = 0;

if let Some(senders) = self.devices.lock().await.get(&msg.address) {
possible += senders.len();
log::debug!(
"Sending command {:?} sent to device {:?}",
msg.command,
Expand All @@ -278,6 +282,7 @@ impl CommandDispatcher for Commands {
msg.address.app_id.clone(),
msg.address.gateway_id.clone(),
)) {
possible += senders.len();
log::debug!(
"Sending command {:?} sent to wildcard {:?}",
msg.command,
Expand All @@ -286,7 +291,7 @@ impl CommandDispatcher for Commands {
num += dispatch_command(senders.values(), &msg).await;
}

log::debug!("Sent to {} receivers", num);
log::debug!("Sent to {num} receivers of {possible}");
}
}

Expand Down
2 changes: 2 additions & 0 deletions endpoint-common/src/command/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ impl KafkaCommandSource {
where
D: CommandDispatcher + Send + Sync + 'static,
{
log::info!("Starting Kafka command source");

let mut source = EventStream::<AutoAck>::new(EventStreamConfig {
kafka: KafkaConfig {
topic: config.topic,
Expand Down
1 change: 1 addition & 0 deletions mqtt-endpoint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
tracing = { version = "0.1", features = ["log-always"] }
url = "2"
uuid = { version = "1", features = ["v4"] }
webpki = "0.22"

Expand Down
80 changes: 49 additions & 31 deletions mqtt-endpoint/src/service/app.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::{auth::DeviceAuthenticator, config::EndpointConfig, service::session::Session};
use crate::{
auth::DeviceAuthenticator,
config::EndpointConfig,
service::session::{dialect::DialectBuilder, Session},
};
use async_trait::async_trait;
use drogue_client::{
registry::v1::{Application, Device, MqttSpec},
Expand All @@ -13,7 +17,7 @@ use drogue_cloud_endpoint_common::{
};
use drogue_cloud_mqtt_common::{
error::ServerError,
mqtt::{AckOptions, Connect, ConnectAck, Service, Sink},
mqtt::{AckOptions, Connect, ConnectAck, Service},
};
use drogue_cloud_service_api::{
auth::device::authn::Outcome as AuthOutcome,
Expand All @@ -36,7 +40,13 @@ pub struct App {

impl App {
/// authenticate a client
#[instrument]
#[instrument(skip_all, fields(
username,
has_password = password.is_some(),
client_id,
has_certs = certs.is_some(),
has_verified_identity = verified_identity.is_some(),
), err)]
pub async fn authenticate(
&self,
username: Option<&str>,
Expand Down Expand Up @@ -71,16 +81,14 @@ impl App {
fields(
application = %application.metadata.name,
device = %device.metadata.name,
lwt = ?lwt,
),
err(Debug)
)]
async fn create_session(
&self,
application: Application,
device: Device,
sink: Sink,
lwt: Option<LastWillTestament>,
connect: &Connect<'_>,
) -> Result<Session, ServerError> {
// eval dialect
let dialect = match device
Expand All @@ -100,6 +108,19 @@ impl App {

log::debug!("MQTT dialect: {dialect:?}");

// validate

let dialect = dialect.create();

dialect.validate_connect(connect)?;

// prepare

let sink = connect.sink();

let lwt = Self::make_lwt(&connect);
log::debug!("LWT: {lwt:?}");

// acquire session

let opts = CreateOptions { lwt };
Expand Down Expand Up @@ -135,6 +156,7 @@ impl App {
))
}

/// Build a LWT from the connect request.
fn make_lwt(connect: &Connect<'_>) -> Option<LastWillTestament> {
match connect {
Connect::V3(handshake) => match &handshake.packet().last_will {
Expand All @@ -156,6 +178,7 @@ impl App {
}
}

#[instrument(skip(self))]
async fn lookup_identity(&self, identity: &Identity) -> Option<VerifiedIdentity> {
if let Ok(PreSharedKeyResponse {
outcome:
Expand All @@ -177,23 +200,10 @@ impl App {
None
}
}
}

#[async_trait(?Send)]
impl Service<Session> for App {
#[instrument]
async fn connect<'a>(
&'a self,
mut connect: Connect<'a>,
) -> Result<ConnectAck<Session>, ServerError> {
log::info!("new connection: {:?}", connect);

if !connect.clean_session() {
return Err(ServerError::UnsupportedOperation);
}

let certs = connect.io().client_certs();
let verified_identity = if self.disable_psk {
/// Find the (optional) TLS-PSK identity from the underlying I/O system.
async fn find_verified_identity(&self, connect: &mut Connect<'_>) -> Option<VerifiedIdentity> {
if self.disable_psk {
None
} else {
use ntex_tls::PskIdentity;
Expand All @@ -213,7 +223,21 @@ impl Service<Session> for App {
} else {
None
}
};
}
}
}

#[async_trait(?Send)]
impl Service<Session> for App {
#[instrument]
async fn connect<'a>(
&'a self,
mut connect: Connect<'a>,
) -> Result<ConnectAck<Session>, ServerError> {
log::info!("new connection: {:?}", connect);

let certs = connect.io().client_certs();
let verified_identity = self.find_verified_identity(&mut connect).await;
let (username, password) = connect.credentials();

match self
Expand All @@ -231,21 +255,15 @@ impl Service<Session> for App {
device,
r#as: _,
}) => {
let session = self
.create_session(
application,
device,
connect.sink(),
Self::make_lwt(&connect),
)
.await?;
let session = self.create_session(application, device, &connect).await?;

Ok(ConnectAck {
session,
ack: AckOptions {
wildcard_subscription_available: Some(true),
shared_subscription_available: Some(false),
subscription_identifiers_available: Some(false),
session_present: false,
..Default::default()
},
})
Expand Down
92 changes: 92 additions & 0 deletions mqtt-endpoint/src/service/session/dialect/az.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use super::*;
use std::borrow::Cow;

/// Azure IoT dialect.
///
/// NOTE: This is experimental.
pub struct Azure;

impl ConnectValidator for Azure {
fn validate_connect(&self, _connect: &Connect) -> Result<(), ServerError> {
// we accept everything
Ok(())
}
}

impl PublishTopicParser for Azure {
fn parse_publish<'a>(&self, path: &'a str) -> Result<ParsedPublishTopic<'a>, ParseError> {
let (channel, properties) = split_topic(path);

if channel.is_empty() {
return Err(ParseError::Empty);
}

log::debug!("Azure: {channel} - properties: {properties:?}");

Ok(ParsedPublishTopic {
channel,
device: None,
properties,
})
}
}

impl SubscribeTopicParser for Azure {
fn parse_subscribe<'a>(&self, path: &'a str) -> Result<ParsedSubscribeTopic<'a>, ParseError> {
log::debug!("Azure: {path}");
Ok(ParsedSubscribeTopic {
filter: SubscribeFilter {
device: DeviceFilter::Device,
command: Some(path),
},
encoder: SubscriptionTopicEncoder::new(PlainTopicEncoder),
})
}
}

/// Split an Azure topic, which might carry a "bag of properties" as the last topic segment
pub fn split_topic(path: &str) -> (&str, Vec<(Cow<str>, Cow<str>)>) {
if let Some((topic, last)) = path.rsplit_once('/') {
// at least two segments
if last.starts_with("?") {
// last one is a bag of properties
let query = url::form_urlencoded::parse(&last.as_bytes()[1..]);
(topic, query.collect())
} else {
// last one is a regular one
(path.trim_end_matches('/'), vec![])
}
} else {
// single topic segment
(path, vec![])
}
}

#[cfg(test)]
mod test {

use super::*;

#[test]
fn test_plain() {
assert_eq!(split_topic("foo/bar"), ("foo/bar", vec![]));
}

#[test]
fn test_plain_slash() {
assert_eq!(split_topic("foo/bar/"), ("foo/bar", vec![]));
}

#[test]
fn test_plain_slash_q() {
assert_eq!(split_topic("foo/bar/?"), ("foo/bar", vec![]));
}

#[test]
fn test_properties() {
assert_eq!(
split_topic("foo/bar/?baz=123"),
("foo/bar", vec![("baz".into(), "123".into())])
);
}
}
Loading