Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(particle-data)!: use MsgPack for particle data serialization [Fixes VM-407] #2034

Merged
merged 9 commits into from
Jan 31, 2024
Merged
18 changes: 18 additions & 0 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ jobs:
uses: fluencelabs/cli/.github/workflows/tests.yml@main
with:
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}"
js-client-snapshots: |
{
"aqua-to-js": "0.3.7-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"interfaces": "0.11.0-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"js-client": "0.7.0-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"js-client-isomorphic": "0.5.0-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"marine-worker": "0.5.1-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"npm-aqua-compiler": "0.0.2-feat-VM-407-msgpack-particle-a9a365e-2481-1"
}

js-client:
needs:
Expand All @@ -99,6 +108,15 @@ jobs:
uses: fluencelabs/aqua/.github/workflows/tests.yml@main
with:
nox-image: "${{ needs.nox-snapshot.outputs.nox-image }}"
js-client-snapshots: |
{
"aqua-to-js": "0.3.7-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"interfaces": "0.11.0-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"js-client": "0.7.0-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"js-client-isomorphic": "0.5.0-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"marine-worker": "0.5.1-feat-VM-407-msgpack-particle-a9a365e-2481-1",
"npm-aqua-compiler": "0.0.2-feat-VM-407-msgpack-particle-a9a365e-2481-1"
}

registry:
needs:
Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion connection-pool/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,6 @@ impl NetworkBehaviour for ConnectionPoolBehaviour {
.push_back(ExtendedParticle::new(particle, root_span));
self.wake();
}
Ok(HandlerMessage::InboundUpgradeError(err)) => log::warn!("UpgradeError: {:?}", err),
gurinderu marked this conversation as resolved.
Show resolved Hide resolved
Ok(HandlerMessage::Upgrade) => {}
Ok(HandlerMessage::OutParticle(..)) => unreachable!("can't receive OutParticle"),
Err(err) => log::warn!("Handler error: {:?}", err),
Expand Down
4 changes: 3 additions & 1 deletion particle-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ log = { workspace = true }
derivative = { workspace = true }
base64 = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
asynchronous-codec = { version = "0.7.0", features = ["json"] }
asynchronous-codec = { version = "0.7.0" }
unsigned-varint = { version = "0.8.0", features = ["codec", "asynchronous_codec"] }
tracing = { workspace = true }
air-interpreter-sede = { version = "0.1.0", features = ["msgpack"] }
monoid marked this conversation as resolved.
Show resolved Hide resolved
serde_bytes = "0.11.14"

[dev-dependencies]
rand = { workspace = true }
Expand Down
67 changes: 37 additions & 30 deletions particle-protocol/src/libp2p_protocol/codec/fluence.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,32 @@
use crate::ProtocolMessage;
use asynchronous_codec::{BytesMut, Decoder, Encoder, JsonCodec, JsonCodecError};
use air_interpreter_sede::{
folex marked this conversation as resolved.
Show resolved Hide resolved
define_simple_representation, Format as SedeFormat, FromSerialized as _, MsgPackMultiformat,
ToSerialized as _,
};
use asynchronous_codec::{BytesMut, Decoder, Encoder};
use std::io;
use unsigned_varint::codec::UviBytes;

const MAX_BUF_SIZE: usize = 100 * 1024 * 1024;

type ProtocolMessageFormat = MsgPackMultiformat;

define_simple_representation!(
ProtocolMessageRepresentation,
ProtocolMessage,
ProtocolMessageFormat,
Vec<u8>
);

pub struct FluenceCodec {
length: UviBytes<BytesMut>,
json: JsonCodec<ProtocolMessage, ProtocolMessage>,
}

impl FluenceCodec {
pub fn new() -> Self {
let mut length: UviBytes<BytesMut> = UviBytes::default();
length.set_max_len(MAX_BUF_SIZE);
let json = JsonCodec::new();
Self { length, json }
Self { length }
}
}

Expand All @@ -26,10 +37,10 @@ impl Decoder for FluenceCodec {
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let bytes = self.length.decode(src)?;
if let Some(bytes) = bytes {
return self
.json
.decode(&mut BytesMut::from(&bytes[..]))
.map_err(Into::into);
return ProtocolMessageRepresentation
.deserialize(&bytes)
.map(Some)
.map_err(FluenceCodecError::Deserialize);
}
Ok(None)
}
Expand All @@ -40,9 +51,10 @@ impl Encoder for FluenceCodec {
type Error = FluenceCodecError;

fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let mut json_buf = BytesMut::new();
self.json.encode(item, &mut json_buf)?;
self.length.encode(json_buf, dst)?;
let msg_buf = ProtocolMessageRepresentation
.serialize(&item)
.map_err(FluenceCodecError::Serialize)?;
self.length.encode(msg_buf[..].into(), dst)?;
Ok(())
}
}
Expand All @@ -53,8 +65,8 @@ pub enum FluenceCodecError {
Io(std::io::Error),
/// Length error
Length(std::io::Error),
/// JSON error
Json(JsonCodecError),
Serialize(<ProtocolMessageFormat as SedeFormat<ProtocolMessage>>::SerializationError),
Deserialize(<ProtocolMessageFormat as SedeFormat<ProtocolMessage>>::DeserializationError),
}

impl From<std::io::Error> for FluenceCodecError {
Expand All @@ -63,18 +75,13 @@ impl From<std::io::Error> for FluenceCodecError {
}
}

impl From<JsonCodecError> for FluenceCodecError {
fn from(e: JsonCodecError) -> FluenceCodecError {
FluenceCodecError::Json(e)
}
}

impl std::error::Error for FluenceCodecError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
FluenceCodecError::Io(ref e) => Some(e),
FluenceCodecError::Length(ref e) => Some(e),
FluenceCodecError::Json(ref e) => Some(e),
FluenceCodecError::Serialize(ref e) => Some(e),
FluenceCodecError::Deserialize(ref e) => Some(e),
}
}
}
Expand All @@ -84,7 +91,8 @@ impl std::fmt::Display for FluenceCodecError {
match self {
FluenceCodecError::Io(e) => write!(f, "I/O error: {}", e),
FluenceCodecError::Length(e) => write!(f, "I/O error: {}", e),
FluenceCodecError::Json(e) => write!(f, "JSON error: {}", e),
FluenceCodecError::Serialize(e) => write!(f, "Serialization error: {}", e),
FluenceCodecError::Deserialize(e) => write!(f, "Deserialization error: {}", e),
}
}
}
Expand All @@ -94,7 +102,8 @@ impl From<FluenceCodecError> for std::io::Error {
match value {
FluenceCodecError::Io(e) => e,
FluenceCodecError::Length(e) => io::Error::new(io::ErrorKind::InvalidInput, e),
FluenceCodecError::Json(e) => io::Error::new(io::ErrorKind::InvalidInput, e),
FluenceCodecError::Serialize(e) => io::Error::new(io::ErrorKind::InvalidInput, e),
FluenceCodecError::Deserialize(e) => io::Error::new(io::ErrorKind::InvalidInput, e),
}
}
}
Expand Down Expand Up @@ -132,14 +141,12 @@ mod tests {

#[test]
fn deserialization_test() {
let raw_str = "9QN7ImFjdGlvbiI6IlBhcnRpY2xlIiwiaWQiOiJkMjA1ZDE0OC00Y2YxLTRlNzYtOGY2ZS1\
mY2U5ODEwZjVlNmMiLCJpbml0X3BlZXJfaWQiOiIxMkQzS29vV0xMRjdnUUtiNzd4WEhWWm4zS1hhMTR4cDNSQmlBa2J\
uSzJVQlJwRGFSOEtiIiwidGltZXN0YW1wIjoxNzAwNTc0OTU5MDU5LCJ0dGwiOjAsInNjcmlwdCI6IihjYWxsICVpbml\
0X3BlZXJfaWQlIChcImdldERhdGFTcnZcIiBcIi1yZWxheS1cIikgW10gLXJlbGF5LSkiLCJzaWduYXR1cmUiOlsxMTE\
sMTgyLDkyLDEsNzgsNDQsMjI1LDc1LDExNCwxMTMsMTA5LDIyNCw2MCwyNDUsMTksMTgyLDE1MiwyNiwxNDEsMTA5LDE\
4NSw1MCwxOTEsMjM5LDE4OCwxMjIsNTAsMTkxLDEwMywyMSw1MywxMjAsMjE2LDMxLDIxMywyMiwyNDAsMTk0LDc4LDI\
xMSwyNDAsMTkyLDE2MiwyMjAsMjAsMTcwLDEyMSwyNSwyMDAsNjMsMjQ1LDE1MSwxNywyNTMsMTU2LDI0MiwxNDEsMTI\
5LDIxNywyMDUsMTgxLDE1NiwyMzEsMTBdLCJkYXRhIjoiIn0=";
let raw_str = "zwKBBIimYWN0aW9uqFBhcnRpY2xlpGRhdGGQomlk2SRkMjA1ZDE0OC00Y2YxLTRlNzYtOGY2ZS1mY\
2U5ODEwZjVlNmOsaW5pdF9wZWVyX2lk2TQxMkQzS29vV0xMRjdnUUtiNzd4WEhWWm4zS1hhMTR4cDNSQmlBa2JuSzJVQ\
lJwRGFSOEtipnNjcmlwdNk5KGNhbGwgJWluaXRfcGVlcl9pZCUgKCJnZXREYXRhU3J2IiAiLXJlbGF5LSIpIFtdIC1yZ\
WxheS0pqXNpZ25hdHVyZdwAQG/MtlwBTizM4UtycW3M4DzM9RPMtsyYGsyNbcy5Msy/zO/MvHoyzL9nFTV4zNgfzNUWz\
PDMwk7M08zwzMDMoszcFMyqeRnMyD/M9cyXEcz9zJzM8syNzIHM2czNzLXMnMznCql0aW1lc3RhbXDPAAABi/IqldOjd\
HRsAA==";
let hex_data = base64.decode(raw_str).expect("Base64");
let mut bytes = BytesMut::from(&hex_data[..]);

Expand Down
11 changes: 0 additions & 11 deletions particle-protocol/src/libp2p_protocol/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ pub enum HandlerMessage {
/// Particle being received from a remote peer.
/// Receive-only, can't be sent.
InParticle(Particle),
/// Error while receiving a message
InboundUpgradeError(serde_json::Value),
/// Dummy plug. Generated by the `OneshotHandler` when Inbound or Outbound Upgrade happened.
Upgrade,
}
Expand All @@ -70,9 +68,6 @@ impl HandlerMessage {
HandlerMessage::OutParticle(particle, channel) => {
(ProtocolMessage::Particle(particle), channel.outlet())
}
HandlerMessage::InboundUpgradeError(err) => {
(ProtocolMessage::InboundUpgradeError(err), None)
}
HandlerMessage::Upgrade => (ProtocolMessage::Upgrade, None),
HandlerMessage::InParticle(_) => {
unreachable!("InParticle is never sent, only received")
Expand All @@ -93,8 +88,6 @@ impl From<()> for HandlerMessage {
#[serde(tag = "action")]
pub enum ProtocolMessage {
Particle(Particle),
/// Error while receiving a message
InboundUpgradeError(serde_json::Value),
// TODO: is it needed?
Upgrade,
}
Expand All @@ -103,9 +96,6 @@ impl std::fmt::Display for ProtocolMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ProtocolMessage::Particle(particle) => particle.fmt(f),
ProtocolMessage::InboundUpgradeError(error) => {
write!(f, "InboundUpgradeError {error}")
}
ProtocolMessage::Upgrade => write!(f, "Upgrade"),
}
}
Expand All @@ -115,7 +105,6 @@ impl From<ProtocolMessage> for HandlerMessage {
fn from(msg: ProtocolMessage) -> HandlerMessage {
match msg {
ProtocolMessage::Particle(p) => HandlerMessage::InParticle(p),
ProtocolMessage::InboundUpgradeError(err) => HandlerMessage::InboundUpgradeError(err),
ProtocolMessage::Upgrade => HandlerMessage::Upgrade,
}
}
Expand Down
4 changes: 2 additions & 2 deletions particle-protocol/src/particle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::error::ParticleError::{
};
use fluence_keypair::{KeyPair, PublicKey, Signature};
use fluence_libp2p::{peerid_serializer, RandomPeerId};
use json_utils::base64_serde;
use now_millis::now_ms;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -77,9 +76,10 @@ pub struct Particle {
// TTL in milliseconds
pub ttl: u32,
pub script: String,
#[serde(with = "serde_bytes")]
pub signature: Vec<u8>,
/// base64-encoded
#[serde(with = "base64_serde")]
#[serde(with = "serde_bytes")]
#[derivative(Debug(format_with = "fmt_data"))]
pub data: Vec<u8>,
}
Expand Down
Loading