Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide AMQP 1.0 binding with fe2o3-amqp #188

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
24c30b5
impl conversion btwn AmqpMessage & AmqpCloudEvent
minghuaw Aug 14, 2022
8b7e2a2
implemented From<Event> for AmqpCloudEvent
minghuaw Aug 14, 2022
4017ffc
added udf to handle extensions
minghuaw Aug 15, 2022
e516489
initial impl of deserializers
minghuaw Aug 18, 2022
64693d1
removed unused imports
minghuaw Aug 18, 2022
f9b5058
removed AmqpCloudEvent
minghuaw Aug 18, 2022
73e1f62
Revert "removed AmqpCloudEvent"
minghuaw Aug 18, 2022
90f84e3
replace format with header_prefix
minghuaw Aug 18, 2022
73ff402
impl From<AmqpMessage> for AmqpCloudEvent
minghuaw Aug 18, 2022
6e90ddb
renamed to AmqpBinding
minghuaw Aug 18, 2022
508976c
renamed to EventMessage
minghuaw Aug 18, 2022
832098c
removed prefix from extension name
minghuaw Aug 18, 2022
3d835a6
added amqp example
minghuaw Aug 18, 2022
880da69
removed unused type alias
minghuaw Aug 18, 2022
b193012
fixed example
minghuaw Aug 18, 2022
55c5c8f
treat missing cloutEvents prefix as extension
minghuaw Aug 18, 2022
8739e9d
prefix extension values like standard attr values
minghuaw Aug 18, 2022
22a88e8
updated example
minghuaw Aug 18, 2022
1188f17
added assertions in example
minghuaw Aug 18, 2022
f7d3b86
updated docs
minghuaw Aug 18, 2022
c26131f
updated mod level doc
minghuaw Aug 18, 2022
c477db4
updated mod level doc
minghuaw Aug 18, 2022
d8df9c0
cargo fmt
minghuaw Aug 18, 2022
2ab34e4
fixed cargo fmt check and clippy warnings
minghuaw Aug 18, 2022
730b94e
Show feature gated bindings in documentaion (#187)
minghuaw Aug 24, 2022
0436209
fixed doctest
minghuaw Aug 26, 2022
f623f15
cargo fix --all && cargo fmt --all
minghuaw Aug 26, 2022
a2e1daa
Merge branch 'main' into amqp-binding
minghuaw Aug 26, 2022
18385a6
updated fe2o3-amqp to latest version
minghuaw Aug 28, 2022
26b41f1
updated fe2o3-amqp-types
minghuaw Aug 30, 2022
695abf4
updated AMQP deps
minghuaw Sep 1, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ warp = ["warp-lib", "bytes", "http", "hyper"]
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
nats = ["nats-lib"]
fe2o3-amqp = ["fe2o3-amqp-types"]

[dependencies]
serde = { version = "^1.0", features = ["derive"] }
Expand Down Expand Up @@ -57,6 +58,7 @@ axum-lib = { version = "^0.5", optional = true , package="axum"}
http-body = { version = "^0.4", optional = true }
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
fe2o3-amqp-types = { version = "0.5.1", optional = true }

[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.3"
Expand All @@ -79,3 +81,4 @@ mockito = "0.25.1"
tokio = { version = "^1.0", features = ["full"] }
mime = "0.3"
tower = { version = "0.4", features = ["util"] }
fe2o3-amqp = { version = "0.6.1" }
12 changes: 12 additions & 0 deletions example-projects/fe2o3-amqp-example/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
[package]
name = "fe2o3-amqp-example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
cloudevents-sdk = { path = "../..", features = ["fe2o3-amqp"] }
fe2o3-amqp = "0.5.1"
tokio = { version = "1", features = ["macros", "net", "rt", "rt-multi-thread"] }
serde_json = "1"
109 changes: 109 additions & 0 deletions example-projects/fe2o3-amqp-example/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//! AMQP 1.0 binding example
//!
//! You need a running AMQP 1.0 broker to try out this example.
//! With docker: docker run -it --rm -e ARTEMIS_USERNAME=guest -e ARTEMIS_PASSWORD=guest -p 5672:5672 vromero/activemq-artemis

use cloudevents::{
binding::fe2o3_amqp::{EventMessage}, message::MessageDeserializer, Event, EventBuilder,
EventBuilderV10, AttributesReader, event::ExtensionValue,
};
use fe2o3_amqp::{Connection, Receiver, Sender, Session};
use serde_json::{json, from_slice, from_str};

type BoxError = Box<dyn std::error::Error>;
type Result<T> = std::result::Result<T, BoxError>;

const EXAMPLE_TYPE: &str = "example.test";
const EXAMPLE_SOURCE: &str = "localhost";
const EXTENSION_NAME: &str = "ext-name";
const EXTENSION_VALUE: &str = "AMQP";

async fn send_binary_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> {
let event = EventBuilderV10::new()
.id(i.to_string())
.ty(EXAMPLE_TYPE)
.source(EXAMPLE_SOURCE)
.extension(EXTENSION_NAME, EXTENSION_VALUE)
.data("application/json", value)
.build()?;
let event_message = EventMessage::from_binary_event(event)?;
sender.send(event_message).await?.accepted_or("not accepted")?;
Ok(())
}

async fn send_structured_event(sender: &mut Sender, i: usize, value: serde_json::Value) -> Result<()> {
let event = EventBuilderV10::new()
.id(i.to_string())
.ty(EXAMPLE_TYPE)
.source(EXAMPLE_SOURCE)
.extension(EXTENSION_NAME, EXTENSION_VALUE)
.data("application/json", value)
.build()?;
let event_message = EventMessage::from_structured_event(event)?;
sender.send(event_message).await?.accepted_or("not accepted")?;
Ok(())
}

async fn recv_event(receiver: &mut Receiver) -> Result<Event> {
let delivery = receiver.recv().await?;
receiver.accept(&delivery).await?;

let event_message = EventMessage::from(delivery.into_message());
let event = MessageDeserializer::into_event(event_message)?;
Ok(event)
}

fn convert_data_into_json_value(data: &cloudevents::Data) -> Result<serde_json::Value> {
let value = match data {
cloudevents::Data::Binary(bytes) => from_slice(bytes)?,
cloudevents::Data::String(s) => from_str(s)?,
cloudevents::Data::Json(value) => value.clone(),
};
Ok(value)
}

#[tokio::main]
async fn main() {
let mut connection =
Connection::open("cloudevents-sdk-rust", "amqp://guest:guest@localhost:5672")
.await
.unwrap();
let mut session = Session::begin(&mut connection).await.unwrap();
let mut sender = Sender::attach(&mut session, "sender", "q1").await.unwrap();
let mut receiver = Receiver::attach(&mut session, "receiver", "q1")
.await
.unwrap();

let expected = json!({"hello": "world"});

// Binary content mode
send_binary_event(&mut sender, 1, expected.clone()).await.unwrap();
let event = recv_event(&mut receiver).await.unwrap();
let value = convert_data_into_json_value(event.data().unwrap()).unwrap();
assert_eq!(event.id(), "1");
assert_eq!(event.ty(), EXAMPLE_TYPE);
assert_eq!(event.source(), EXAMPLE_SOURCE);
match event.extension(EXTENSION_NAME).unwrap() {
ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE),
_ => panic!("Expect a String"),
}
assert_eq!(value, expected);

// Structured content mode
send_structured_event(&mut sender, 2, expected.clone()).await.unwrap();
let event = recv_event(&mut receiver).await.unwrap();
let value = convert_data_into_json_value(event.data().unwrap()).unwrap();
assert_eq!(event.id(), "2");
assert_eq!(event.ty(), EXAMPLE_TYPE);
assert_eq!(event.source(), EXAMPLE_SOURCE);
match event.extension(EXTENSION_NAME).unwrap() {
ExtensionValue::String(value) => assert_eq!(value, EXTENSION_VALUE),
_ => panic!("Expect a String"),
}
assert_eq!(value, expected);

sender.close().await.unwrap();
receiver.close().await.unwrap();
session.end().await.unwrap();
connection.close().await.unwrap();
}
24 changes: 24 additions & 0 deletions src/binding/fe2o3_amqp/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Required
pub(super) const ID: &str = "id";
pub(super) const SOURCE: &str = "source";
pub(super) const SPECVERSION: &str = "specversion";
pub(super) const TYPE: &str = "type";

// Optional
pub(super) const DATACONTENTTYPE: &str = "datacontenttype";
pub(super) const DATASCHEMA: &str = "dataschema";
pub(super) const SUBJECT: &str = "subject";
pub(super) const TIME: &str = "time";

pub(super) mod prefixed {
// Required
pub const ID: &str = "cloudEvents:id";
pub const SOURCE: &str = "cloudEvents:source";
pub const SPECVERSION: &str = "cloudEvents:specversion";
pub const TYPE: &str = "cloudEvents:type";

// Optional
pub const DATASCHEMA: &str = "cloudEvents:dataschema";
pub const SUBJECT: &str = "cloudEvents:subject";
pub const TIME: &str = "cloudEvents:time";
}
104 changes: 104 additions & 0 deletions src/binding/fe2o3_amqp/deserializer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::convert::TryFrom;

use fe2o3_amqp_types::primitives::{SimpleValue, Symbol};

use crate::{
binding::CLOUDEVENTS_JSON_HEADER,
event::SpecVersion,
message::{
BinaryDeserializer, BinarySerializer, Encoding, Error, MessageAttributeValue,
MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer,
},
};

use super::{
constants::{prefixed, DATACONTENTTYPE},
EventMessage, ATTRIBUTE_PREFIX,
};

impl BinaryDeserializer for EventMessage {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(
mut self,
mut serializer: V,
) -> Result<R> {
use fe2o3_amqp_types::messaging::Body;

// specversion
let spec_version = {
let value = self
.application_properties
.as_mut()
.ok_or(Error::WrongEncoding {})?
.remove(prefixed::SPECVERSION)
.ok_or(Error::WrongEncoding {})
.map(|val| match val {
SimpleValue::String(s) => Ok(s),
_ => Err(Error::WrongEncoding {}),
})??;
SpecVersion::try_from(&value[..])?
};
serializer = serializer.set_spec_version(spec_version.clone())?;

// datacontenttype
serializer = match self.content_type {
Some(Symbol(content_type)) => serializer
.set_attribute(DATACONTENTTYPE, MessageAttributeValue::String(content_type))?,
None => serializer,
};

// remaining attributes
let attributes = spec_version.attribute_names();

if let Some(application_properties) = self.application_properties {
for (key, value) in application_properties.0.into_iter() {
if let Some(key) = key.strip_prefix(ATTRIBUTE_PREFIX) {
if attributes.contains(&key) {
let value = MessageAttributeValue::try_from((key, value))?;
serializer = serializer.set_attribute(key, value)?;
} else {
let value = MessageAttributeValue::try_from(value)?;
serializer = serializer.set_extension(key, value)?;
}
}
}
}

match self.body {
Body::Data(data) => {
let bytes = data.0.into_vec();
serializer.end_with_data(bytes)
}
Body::Empty => serializer.end(),
Body::Sequence(_) | Body::Value(_) => Err(Error::WrongEncoding {}),
}
}
}

impl StructuredDeserializer for EventMessage {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(
self,
serializer: V,
) -> Result<R> {
use fe2o3_amqp_types::messaging::Body;
let bytes = match self.body {
Body::Data(data) => data.0.into_vec(),
Body::Empty => vec![],
Body::Sequence(_) | Body::Value(_) => return Err(Error::WrongEncoding {}),
};
serializer.set_structured_event(bytes)
}
}

impl MessageDeserializer for EventMessage {
fn encoding(&self) -> Encoding {
match self
.content_type
.as_ref()
.map(|s| s.starts_with(CLOUDEVENTS_JSON_HEADER))
{
Some(true) => Encoding::STRUCTURED,
Some(false) => Encoding::BINARY,
None => Encoding::UNKNOWN,
}
}
}
Loading