Skip to content

Commit

Permalink
Interface for test infrastructure to inspect streams
Browse files Browse the repository at this point in the history
This change implements private-attribution#1195 by providing a means for unit tests to intercept send requests that send circuit-specific data between MPC helpers.

This change also updates the existing unit tests for malicious `reshare` and `reveal` protocols to showcase the API and test it in real scenarios. In both cases, using it, led to significant reduction in code redundancy. The key improvement is that now tests are not required to re-implement the basic protocol to inject additive attacks. It can now be done outside the circuit being tested.

The support for sharding exists at the API level, but no actual implementation allows intercepting traffic between shards. There is currently not a lot of sharded protocols, so this can be postponed until later.
  • Loading branch information
akoshelev committed Jul 26, 2024
1 parent 1349aa6 commit 0eb61bc
Show file tree
Hide file tree
Showing 11 changed files with 556 additions and 260 deletions.
40 changes: 40 additions & 0 deletions ipa-core/src/ff/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,46 @@ pub trait Serializable: Sized {
.map_err(Into::into)
.unwrap_infallible()
}

/// This method provides the same functionality as [`Self::deserialize`] without
/// compile-time guarantees on the size of `buf`. Therefore, it is not appropriate
/// to use in production code. It is provided as convenience method
/// for tests that are ok to panic.
///
/// ## Panics
/// If the size of `buf` is not equal to `Self::Size` or if `buf` bytes
/// are not a valid representation of this instance. See [`Self::deserialize`] for
/// more details.
///
/// [`Self::deserialize`]: Self::deserialize
#[cfg(test)]
#[must_use]
fn deserialize_from_slice(buf: &[u8]) -> Self {
use typenum::Unsigned;

assert_eq!(buf.len(), Self::Size::USIZE);

let mut arr = GenericArray::default();
arr.copy_from_slice(buf);
Self::deserialize(&arr).unwrap()
}

/// This method provides the same functionality as [`Self::serialize`] without
/// compile-time guarantees on the size of `buf`. Therefore, it is not appropriate
/// to use in production code. It is provided as convenience method
/// for tests that are ok to panic.
///
/// ## Panics
/// If the size of `buf` is not equal to `Self::Size`.
#[cfg(test)]
fn serialize_to_slice(&self, buf: &mut [u8]) {
use typenum::Unsigned;

assert_eq!(buf.len(), Self::Size::USIZE);

let dest = GenericArray::<_, Self::Size>::from_mut_slice(buf);
self.serialize(dest);
}
}

pub trait ArrayAccess {
Expand Down
31 changes: 19 additions & 12 deletions ipa-core/src/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,17 @@ pub use gateway_exports::{Gateway, MpcReceivingEnd, SendingEnd, ShardReceivingEn
pub use prss_protocol::negotiate as negotiate_prss;
#[cfg(feature = "web-app")]
pub use transport::WrappedAxumBodyStream;
#[cfg(feature = "in-memory-infra")]
pub use transport::{
config as in_memory_config, InMemoryMpcNetwork, InMemoryShardNetwork, InMemoryTransport,
};
pub use transport::{
make_owned_handler, query, routing, ApiError, BodyStream, BytesStream, HandlerBox, HandlerRef,
HelperResponse, Identity as TransportIdentity, LengthDelimitedStream, LogErrors, NoQueryId,
NoResourceIdentifier, NoStep, QueryIdBinding, ReceiveRecords, RecordsStream, RequestHandler,
RouteParams, SingleRecordStream, StepBinding, StreamCollection, StreamKey, Transport,
WrappedBoxBodyStream,
};
#[cfg(feature = "in-memory-infra")]
pub use transport::{InMemoryMpcNetwork, InMemoryShardNetwork, InMemoryTransport};
use typenum::{Const, ToUInt, Unsigned, U8};
use x25519_dalek::PublicKey;

Expand Down Expand Up @@ -130,6 +132,20 @@ impl TryFrom<usize> for HelperIdentity {
}
}

impl TryFrom<&str> for HelperIdentity {
type Error = String;

fn try_from(value: &str) -> std::result::Result<Self, Self::Error> {
for identity in HelperIdentity::make_three() {
if identity.as_str() == value {
return Ok(identity);
}
}

Err(format!("{value} is not a valid helper identity"))
}
}

impl From<HelperIdentity> for u8 {
fn from(value: HelperIdentity) -> Self {
value.id
Expand All @@ -138,16 +154,7 @@ impl From<HelperIdentity> for u8 {

impl Debug for HelperIdentity {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self.id {
1 => "A",
2 => "B",
3 => "C",
_ => unreachable!(),
}
)
write!(f, "{}", self.as_str())
}
}

Expand Down
148 changes: 148 additions & 0 deletions ipa-core/src/helpers/transport/in_memory/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::borrow::Cow;

use crate::{
helpers::{HelperIdentity, Role, RoleAssignment},
protocol::Gate,
sharding::ShardIndex,
sync::Arc,
};

pub type DynStreamInterceptor = Arc<dyn StreamInterceptor<Context = InspectContext>>;

/// The interface for stream interceptors.
///
/// It is used in test infrastructure to inspect
/// incoming streams and perform actions based on
/// their contents.
///
/// The `peek` method takes a context object and a mutable reference
/// to the data buffer. It is responsible for inspecting the data
/// and performing any necessary actions based on the context.
pub trait StreamInterceptor: Send + Sync {
/// The context type for the stream peeker.
/// See [`InspectContext`] and [`MaliciousHelperContext`] for
/// details.
type Context;

/// Inspects the stream data and performs any necessary actions.
/// The `data` buffer may be modified in-place.
///
/// ## Implementation considerations
/// This method is free to mutate the `data` buffer
/// however it wants, but it needs to account for the following:
///
/// ### Prime field streams
/// Corrupting streams that send data as sequences of serialized
/// [`PrimeField`] may cause `GreaterThanPrimeError` errors at
/// the serialization layer, instead of maybe intended malicious
/// validation failures.
///
/// ### Boolean fields
/// Flipping bits in fixed-size bit strings is indistinguishable
/// from additive attacks without additional measures implemented
/// at the transport layer, like checksumming, share consistency
/// checks, etc.
fn peek(&self, ctx: &Self::Context, data: &mut Vec<u8>);
}

impl<F: Fn(&InspectContext, &mut Vec<u8>) + Send + Sync + 'static> StreamInterceptor for F {
type Context = InspectContext;

fn peek(&self, ctx: &Self::Context, data: &mut Vec<u8>) {
(self)(ctx, data);
}
}

/// The general context provided to stream inspectors.
#[derive(Debug)]
pub struct InspectContext {
/// The shard index of this instance.
/// This is `None` for non-sharded helpers.
pub shard_index: Option<ShardIndex>,
/// The MPC identity of this instance.
/// The combination (`shard_index`, `identity`)
/// uniquely identifies a single shard within
/// a multi-sharded MPC system.
pub identity: HelperIdentity,
/// Helper that will receive this stream.
pub dest: Cow<'static, str>,
/// Circuit gate this stream is tied to.
pub gate: Gate,
}

/// The no-op stream peeker, which does nothing.
/// This is used as a default value for stream
/// peekers that don't do anything.
#[inline]
#[must_use]
pub fn passthrough() -> Arc<dyn StreamInterceptor<Context = InspectContext>> {
Arc::new(|_ctx: &InspectContext, _data: &mut Vec<u8>| {})
}

/// This narrows the implementation of stream seeker
/// to a specific helper role. Only streams sent from
/// that helper will be inspected by the provided closure.
/// Other helper's streams will be left untouched.
///
/// It does not support sharded environments and will panic
/// if used in a sharded test infrastructure.
#[derive(Debug)]
pub struct MaliciousHelper<F> {
identity: HelperIdentity,
role_assignment: RoleAssignment,
inner: F,
}

impl<F: Fn(&MaliciousHelperContext, &mut Vec<u8>) + Send + Sync> MaliciousHelper<F> {
pub fn new(role: Role, role_assignment: &RoleAssignment, peeker: F) -> Arc<Self> {
Arc::new(Self {
identity: role_assignment.identity(role),
role_assignment: role_assignment.clone(),
inner: peeker,
})
}

fn context(&self, ctx: &InspectContext) -> MaliciousHelperContext {
let dest = HelperIdentity::try_from(ctx.dest.as_ref()).unwrap_or_else(|_| {
panic!(
"MaliciousServerContext::from: invalid destination: {}",
ctx.dest
)
});
let dest = self.role_assignment.role(dest);

MaliciousHelperContext {
shard_index: ctx.shard_index,
dest,
gate: ctx.gate.clone(),
}
}
}

/// Special contexts for stream inspectors
/// created with [`MaliciousHelper`].
/// It provides convenient access to the
/// destination role and assumes a single MPC
/// helper intercepting streams.
#[derive(Debug)]
pub struct MaliciousHelperContext {
/// The shard index of this instance.
/// This is `None` for non-sharded helpers.
pub shard_index: Option<ShardIndex>,
/// Helper that will receive this stream.
pub dest: Role,
/// Circuit gate this stream is tied to.
pub gate: Gate,
}

impl<F: Fn(&MaliciousHelperContext, &mut Vec<u8>) + Send + Sync> StreamInterceptor
for MaliciousHelper<F>
{
type Context = InspectContext;

fn peek(&self, ctx: &Self::Context, data: &mut Vec<u8>) {
if ctx.identity == self.identity {
(self.inner)(&self.context(ctx), data);
}
}
}
32 changes: 26 additions & 6 deletions ipa-core/src/helpers/transport/in_memory/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
pub mod config;
mod sharding;
mod transport;

use std::array;

pub use sharding::InMemoryShardNetwork;
pub use transport::Setup;
use transport::TransportConfigBuilder;

use crate::{
helpers::{HandlerRef, HelperIdentity},
helpers::{
in_memory_config::DynStreamInterceptor, transport::in_memory::config::passthrough,
HandlerRef, HelperIdentity,
},
sync::{Arc, Weak},
};

Expand All @@ -21,15 +24,32 @@ pub struct InMemoryMpcNetwork {

impl Default for InMemoryMpcNetwork {
fn default() -> Self {
Self::new(array::from_fn(|_| None))
Self::new(Self::noop_handlers())
}
}

impl InMemoryMpcNetwork {
#[must_use]
pub fn noop_handlers() -> [Option<HandlerRef>; 3] {
[None, None, None]
}

#[must_use]
pub fn new(handlers: [Option<HandlerRef>; 3]) -> Self {
let [mut first, mut second, mut third]: [_; 3] =
HelperIdentity::make_three().map(Setup::new);
Self::with_stream_interceptor(handlers, &passthrough())
}

#[must_use]
pub fn with_stream_interceptor(
handlers: [Option<HandlerRef>; 3],
interceptor: &DynStreamInterceptor,
) -> Self {
let [mut first, mut second, mut third]: [_; 3] = HelperIdentity::make_three().map(|i| {
let mut config_builder = TransportConfigBuilder::for_helper(i);
config_builder.with_interceptor(interceptor);

Setup::with_config(i, config_builder.not_sharded())
});

first.connect(&mut second);
second.connect(&mut third);
Expand Down
18 changes: 16 additions & 2 deletions ipa-core/src/helpers/transport/in_memory/sharding.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
helpers::{
transport::in_memory::transport::{InMemoryTransport, Setup},
in_memory_config::{passthrough, DynStreamInterceptor},
transport::in_memory::transport::{InMemoryTransport, Setup, TransportConfigBuilder},
HelperIdentity,
},
sharding::ShardIndex,
Expand All @@ -22,9 +23,22 @@ pub struct InMemoryShardNetwork {

impl InMemoryShardNetwork {
pub fn with_shards<I: Into<ShardIndex>>(shard_count: I) -> Self {
Self::with_stream_interceptor(shard_count, &passthrough())
}

pub fn with_stream_interceptor<I: Into<ShardIndex>>(
shard_count: I,
interceptor: &DynStreamInterceptor,
) -> Self {
let shard_count = shard_count.into();
let shard_network: [_; 3] = HelperIdentity::make_three().map(|h| {
let mut shard_connections = shard_count.iter().map(Setup::new).collect::<Vec<_>>();
let mut config_builder = TransportConfigBuilder::for_helper(h);
config_builder.with_interceptor(interceptor);

let mut shard_connections = shard_count
.iter()
.map(|i| Setup::with_config(i, config_builder.bind_to_shard(i)))
.collect::<Vec<_>>();
for i in 0..shard_connections.len() {
let (lhs, rhs) = shard_connections.split_at_mut(i);
if let Some((a, _)) = lhs.split_last_mut() {
Expand Down
Loading

0 comments on commit 0eb61bc

Please sign in to comment.