Skip to content

Commit

Permalink
9 Encoders for MQTT
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Dec 9, 2020
1 parent 5ab3062 commit e09fd5c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 10 deletions.
13 changes: 6 additions & 7 deletions cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ impl MessageDeserializer for ConsumerMessageDeserializer {
.flatten()
.map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
.unwrap_or(false),
self.headers.get(headers::MQTT_VERSION_HEADER)
self.headers
.get(headers::MQTT_VERSION_HEADER)
.map(|s| String::from_utf8(s.to_vec()).ok())
.flatten()
.map(|s| s.eq(headers::MQTT_V5_BINARY))
Expand All @@ -130,7 +131,7 @@ impl MessageDeserializer for ConsumerMessageDeserializer {

pub fn record_to_event(msg: &Message) -> Result<Event> {
MessageDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
/* match version {
/* match version {
headers::MqttVersion::V5 => {
BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
Expand Down Expand Up @@ -190,7 +191,8 @@ mod tests {
.extension("mqttversion", headers::MQTT_V5_BINARY)
.data(
"application/octet-stream",
Data::Binary(String::from("hello rust").into_bytes()))
Data::Binary(String::from("hello rust").into_bytes()),
)
.build()
.unwrap(),
)
Expand Down Expand Up @@ -234,9 +236,6 @@ mod tests {
.qos(1)
.finalize();

assert_eq!(
msg.to_event().unwrap(),
expected
)
assert_eq!(msg.to_event().unwrap(), expected)
}
}
8 changes: 5 additions & 3 deletions cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ impl MessageRecord {
}

pub fn from_event(event: Event) -> Result<Self> {
match event.extension(headers::MQTT_VERSION_HEADER)
match event
.extension(headers::MQTT_VERSION_HEADER)
.map(|e| e.to_string().eq(headers::MQTT_V5_BINARY))
.unwrap_or(false) {
.unwrap_or(false)
{
true => BinaryDeserializer::deserialize_binary(event, MessageRecord::new()),
_ => StructuredDeserializer::deserialize_structured(event, MessageRecord::new())
_ => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()),
}
}
}
Expand Down

0 comments on commit e09fd5c

Please sign in to comment.