Skip to content

Commit

Permalink
#9 Encoders for MQTT
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Dec 22, 2020
1 parent a3ad7a9 commit dc04020
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 258 deletions.
42 changes: 42 additions & 0 deletions cloudevents-sdk-paho-mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# CloudEvents SDK Rust - paho-mqtt [![Crates badge]][crates.io] [![Docs badge]][docs.rs]

Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [paho-mqtt](https://www.eclipse.org/paho/).

Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.

## Development & Contributing

If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md)

## Community

## Sample usage

- Check the example [paho-mqtt-example](../example-projects/paho-mqtt-example)

### MQTT V3
- Start the MQTT V3 Consumer

```
run --package <package-name> --bin <binary-name> -- --mode consumerV3 --broker tcp://localhost:1883 --topic test
```

- Start the MQTT V3 Producer

```
run --package <package-name> --bin <binary-name> -- --broker tcp://localhost:1883 --topic test --mode producerV3
```

### MQTT V5
- Start the MQTT V5 Consumer

```
run --package <package-name> --bin <binary-name> -- --mode consumerV5 --broker tcp://localhost:1883 --topic test
```

- Start the MQTT V5 Producer

```
run --package <package-name> --bin <binary-name> -- --broker tcp://localhost:1883 --topic test --mode producerV5
```

5 changes: 2 additions & 3 deletions cloudevents-sdk-paho-mqtt/src/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudeven
pub(crate) static CONTENT_TYPE: &'static str = "content-type";

pub enum MqttVersion {
V3_1,
V3_1_1,
V5,
MQTT_3,
MQTT_5,
}
106 changes: 28 additions & 78 deletions cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,90 +5,59 @@ use cloudevents::message::{
Result, StructuredDeserializer, StructuredSerializer,
};
use cloudevents::{message, Event};
use paho_mqtt::{Message, PropertyCode};
use std::collections::HashMap;
use paho_mqtt::{Message, Properties, PropertyCode};
use std::convert::TryFrom;
use std::str;

pub struct ConsumerMessageDeserializer {
pub(crate) headers: HashMap<String, Vec<u8>>,
pub struct ConsumerMessageDeserializer<'a> {
pub(crate) headers: &'a Properties,
pub(crate) payload: Option<Vec<u8>>,
}

impl ConsumerMessageDeserializer {
fn get_mqtt_headers(message: &Message) -> Result<HashMap<String, Vec<u8>>> {
let mut hm = HashMap::new();
let prop_iterator = message.properties().iter(PropertyCode::UserProperty);

for property in prop_iterator {
let header = property.get_string_pair().unwrap();
hm.insert(header.0.to_string(), Vec::from(header.1));
}

Ok(hm)
impl<'a> ConsumerMessageDeserializer<'a> {
fn get_mqtt_headers(message: &Message) -> &Properties {
message.properties()
}

pub fn new(message: &Message) -> Result<ConsumerMessageDeserializer> {
Ok(ConsumerMessageDeserializer {
headers: Self::get_mqtt_headers(message)?,
headers: Self::get_mqtt_headers(message),
payload: Some(message.payload()).map(|s| Vec::from(s)),
})
}
}

impl BinaryDeserializer for ConsumerMessageDeserializer {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(mut self, mut visitor: V) -> Result<R> {
impl<'a> BinaryDeserializer for ConsumerMessageDeserializer<'a> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}

let spec_version = SpecVersion::try_from(
str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..])
.map_err(|e| cloudevents::message::Error::Other {
source: Box::new(e),
})?,
self.headers
.find_user_property(headers::SPEC_VERSION_HEADER)
.unwrap()
.as_str(),
)?;

visitor = visitor.set_spec_version(spec_version.clone())?;

let attributes = spec_version.attribute_names();

if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?),
)?
if let Some(hv) = self.headers.find_user_property(headers::CONTENT_TYPE) {
visitor = visitor.set_attribute("datacontenttype", MessageAttributeValue::String(hv))?
}

for (hn, hv) in self
.headers
.into_iter()
.user_iter()
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
{
let name = &hn["ce_".len()..];

if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?),
)?
visitor = visitor.set_attribute(name, MessageAttributeValue::String(hv))?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| {
cloudevents::message::Error::Other {
source: Box::new(e),
}
})?),
)?
visitor = visitor.set_extension(name, MessageAttributeValue::String(hv))?
}
}

Expand All @@ -100,51 +69,32 @@ impl BinaryDeserializer for ConsumerMessageDeserializer {
}
}

impl StructuredDeserializer for ConsumerMessageDeserializer {
impl<'a> StructuredDeserializer for ConsumerMessageDeserializer<'a> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
visitor.set_structured_event(self.payload.unwrap())
}
}

impl MessageDeserializer for ConsumerMessageDeserializer {
impl<'a> MessageDeserializer for ConsumerMessageDeserializer<'a> {
fn encoding(&self) -> Encoding {
match (
self.headers
.get("content-type")
.map(|s| String::from_utf8(s.to_vec()).ok())
.flatten()
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
.unwrap_or(false),
self.headers.get(headers::SPEC_VERSION_HEADER),
) {
(true, _) => Encoding::STRUCTURED,
(_, Some(_)) => Encoding::BINARY,
_ => Encoding::UNKNOWN,
match self.headers.iter(PropertyCode::UserProperty).count() == 0 {
true => Encoding::STRUCTURED,
false => Encoding::BINARY,
}
}
}

pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result<Event> {
match version {
headers::MqttVersion::V5 => {
BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
headers::MqttVersion::V3_1 => {
StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
headers::MqttVersion::V3_1_1 => {
StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
}
pub fn record_to_event(msg: &Message) -> Result<Event> {
MessageDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}

pub trait MessageExt {
fn to_event(&self, version: headers::MqttVersion) -> Result<Event>;
fn to_event(&self) -> Result<Event>;
}

impl MessageExt for Message {
fn to_event(&self, version: headers::MqttVersion) -> Result<Event> {
record_to_event(self, version)
fn to_event(&self) -> Result<Event> {
record_to_event(self)
}
}

Expand Down
14 changes: 7 additions & 7 deletions cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@ impl MessageRecord {

pub fn from_event(event: Event, version: headers::MqttVersion) -> Result<Self> {
match version {
headers::MqttVersion::V5 => {
headers::MqttVersion::MQTT_5 => {
BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
}
headers::MqttVersion::V3_1 => {
StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
}
headers::MqttVersion::V3_1_1 => {
headers::MqttVersion::MQTT_3 => {
StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
}
}
Expand Down Expand Up @@ -127,11 +124,14 @@ impl StructuredSerializer<MessageRecord> for MessageRecord {
}

pub trait MessageBuilderExt {
fn message_record(self, message_record: &MessageRecord) -> MessageBuilder;
fn event(self, event: Event, version: headers::MqttVersion) -> MessageBuilder;
}

impl MessageBuilderExt for MessageBuilder {
fn message_record(mut self, message_record: &MessageRecord) -> MessageBuilder {
fn event(mut self, event: Event, version: headers::MqttVersion) -> MessageBuilder {
let message_record =
MessageRecord::from_event(event, version).expect("error while serializing the event");

self = self.properties(message_record.headers.clone());

if let Some(s) = message_record.payload.as_ref() {
Expand Down
Loading

0 comments on commit dc04020

Please sign in to comment.