Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
Stabilize p2p messaging API
Browse files Browse the repository at this point in the history
This makes one more change to the p2p messaging API, which allows consensus
engines to save messages with their signatures and enables validation of
message signatures after the fact by an engine or observer.

Signed-off-by: Adam Ludvik <ludvik@bitwise.io>
  • Loading branch information
Adam Ludvik committed Oct 30, 2018
1 parent 7ac644d commit 10cf27b
Show file tree
Hide file tree
Showing 16 changed files with 154 additions and 106 deletions.
46 changes: 29 additions & 17 deletions protos/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,31 @@ syntax = "proto3";

// --== Data Structures ==--

// A consensus-related message sent between peers
message ConsensusPeerMessage {
// Interpretation is left to the consensus engine implementation
string message_type = 1;

// The opaque payload to send to other nodes
bytes content = 2;
}

message ConsensusPeerMessageHeader {
// Public key for the component internal to the validator that
// signed the message
bytes signer_public_key = 1;
bytes signer_id = 1;

// The sha512 hash of the encoded message
bytes message_sha512 = 2;
bytes content_sha512 = 2;
// Interpretation is left to the consensus engine implementation
string message_type = 5;

// Used to identify the consensus engine that produced this message
string name = 3;
string version = 4;
}

message ConsensusPeerMessageEnvelope {
// A consensus-related message sent between peers
message ConsensusPeerMessage {
// The serialized version of the ConsensusPeerMessageHeader
bytes header = 1;

// The signature derived from signing the header
bytes header_signature = 3;

// The serialized version of the ConsensusPeerMessage
bytes message = 2;
// The opaque payload to send to other nodes
bytes content = 2;
}

// All information about a block that is relevant to consensus
Expand Down Expand Up @@ -127,7 +121,9 @@ message ConsensusNotifyPeerDisconnected {

// A new message was received from a peer
message ConsensusNotifyPeerMessage {
// The message sent
ConsensusPeerMessage message = 1;
// The node that sent the message, not necessarily the node that created it
bytes sender_id = 2;
}

Expand Down Expand Up @@ -188,8 +184,17 @@ message ConsensusNotifyAck {}

// Send a consensus message to a specific, connected peer
message ConsensusSendToRequest {
ConsensusPeerMessage message = 1;
bytes peer_id = 2;
// Payload to send to peer
//
// NOTE: This payload will be wrapped up in a ConsensusPeerMessage struct,
// which includes computing its SHA-512 digest, inserting this engine's
// registration info, and the validator's public key, and signing everything
// with the validator's private key.
bytes content = 1;
string message_type = 3;

// Peer that this message is destined for
bytes receiver_id = 2;
}

message ConsensusSendToResponse {
Expand All @@ -207,7 +212,14 @@ message ConsensusSendToResponse {

// Broadcast a consensus message to all peers
message ConsensusBroadcastRequest {
ConsensusPeerMessage message = 1;
// Payload to broadcast peers
//
// NOTE: This payload will be wrapped up in a ConsensusPeerMessage struct,
// which includes computing its SHA-512 digest, inserting this engine's
// registration info, and the validator's public key, and signing everything
// with the validator's private key.
bytes content = 1;
string message_type = 2;
}

message ConsensusBroadcastResponse {
Expand Down
2 changes: 1 addition & 1 deletion sdk/examples/devmode_rust/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ impl Engine for DevmodeEngine {
}

Update::PeerMessage(message, sender_id) => {
match DevmodeMessage::from_str(message.message_type.as_ref()).unwrap() {
match DevmodeMessage::from_str(message.header.message_type.as_ref()).unwrap() {
DevmodeMessage::Published => {
let block_id = BlockId::from(message.content);
info!(
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/sawtooth_sdk/consensus/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
['chain_head', 'peers', 'local_peer_info'])


PeerMessage = namedtuple(
'PeerMessage',
['header', 'header_bytes', 'header_signature', 'content'])


class Engine(metaclass=abc.ABCMeta):
@abc.abstractmethod
def start(self, updates, service, startup_state):
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/sawtooth_sdk/consensus/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ class Service(metaclass=abc.ABCMeta):
# -- P2P --

@abc.abstractmethod
def send_to(self, peer_id, message_type, payload):
def send_to(self, receiver_id, message_type, payload):
'''Send a consensus message to a specific connected peer.
Args:
peer_id (bytes)
receiver_id (bytes)
message_type (str)
payload (bytes)
'''
Expand Down
12 changes: 11 additions & 1 deletion sdk/python/sawtooth_sdk/consensus/zmq_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from sawtooth_sdk.consensus.driver import Driver
from sawtooth_sdk.consensus.engine import StartupState
from sawtooth_sdk.consensus.engine import PeerMessage
from sawtooth_sdk.consensus.zmq_service import ZmqService
from sawtooth_sdk.consensus import exceptions
from sawtooth_sdk.messaging.stream import Stream
Expand Down Expand Up @@ -138,7 +139,16 @@ def _process(self, message):
notification = consensus_pb2.ConsensusNotifyPeerMessage()
notification.ParseFromString(message.content)

data = notification.message, notification.sender_id
header = consensus_pb2.ConsensusPeerMessageHeader()
header.ParseFromString(notification.message.header)

peer_message = PeerMessage(
header=header,
header_bytes=notification.message.header,
header_signature=notification.message.header_signature,
content=notification.message.content)

data = peer_message, notification.sender_id

elif type_tag == Message.CONSENSUS_NOTIFY_BLOCK_NEW:
notification = consensus_pb2.ConsensusNotifyBlockNew()
Expand Down
15 changes: 5 additions & 10 deletions sdk/python/sawtooth_sdk/consensus/zmq_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,11 @@ def _send(self, request, message_type, response_type):

# -- P2P --

def send_to(self, peer_id, message_type, payload):
message = consensus_pb2.ConsensusPeerMessage(
message_type=message_type,
content=payload)

def send_to(self, receiver_id, message_type, payload):
request = consensus_pb2.ConsensusSendToRequest(
message=message,
peer_id=peer_id)
message_type=message_type,
content=payload,
receiver_id=receiver_id)

response = self._send(
request=request,
Expand All @@ -57,12 +54,10 @@ def send_to(self, peer_id, message_type, payload):
'Failed with status {}'.format(response.status))

def broadcast(self, message_type, payload):
message = consensus_pb2.ConsensusPeerMessage(
request = consensus_pb2.ConsensusBroadcastRequest(
message_type=message_type,
content=payload)

request = consensus_pb2.ConsensusBroadcastRequest(message=message)

response = self._send(
request=request,
message_type=Message.CONSENSUS_BROADCAST_REQUEST,
Expand Down
14 changes: 6 additions & 8 deletions sdk/python/tests/test_zmq_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,16 @@ def test_send_to(self):
).SerializeToString())

self.service.send_to(
peer_id=b'peer_id',
receiver_id=b'receiver_id',
message_type='message_type',
payload=b'payload')

self.mock_stream.send.assert_called_with(
message_type=Message.CONSENSUS_SEND_TO_REQUEST,
content=consensus_pb2.ConsensusSendToRequest(
message=consensus_pb2.ConsensusPeerMessage(
message_type='message_type',
content=b'payload'),
peer_id=b'peer_id').SerializeToString())
message_type='message_type',
content=b'payload',
receiver_id=b'receiver_id').SerializeToString())

def test_broadcast(self):
self.mock_stream.send.return_value = self._make_future(
Expand All @@ -70,9 +69,8 @@ def test_broadcast(self):
self.mock_stream.send.assert_called_with(
message_type=Message.CONSENSUS_BROADCAST_REQUEST,
content=consensus_pb2.ConsensusBroadcastRequest(
message=consensus_pb2.ConsensusPeerMessage(
message_type='message_type',
content=b'payload')).SerializeToString())
message_type='message_type',
content=b'payload').SerializeToString())

def test_initialize_block(self):
self.mock_stream.send.return_value = self._make_future(
Expand Down
18 changes: 17 additions & 1 deletion sdk/rust/src/consensus/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,26 @@ pub struct PeerInfo {
/// A consensus-related message sent between peers
#[derive(Default, Debug, Clone)]
pub struct PeerMessage {
pub message_type: String,
pub header: PeerMessageHeader,
pub header_bytes: Vec<u8>,
pub header_signature: Vec<u8>,
pub content: Vec<u8>,
}

/// A header associated with a consensus-related message sent from a peer, can be used to verify
/// the origin of the message
#[derive(Default, Debug, Clone)]
pub struct PeerMessageHeader {
/// The public key of the validator where this message originated
///
/// NOTE: This may not be the validator that sent the message
pub signer_id: Vec<u8>,
pub content_sha512: Vec<u8>,
pub message_type: String,
pub name: String,
pub version: String,
}

/// Engine is the only trait that needs to be implemented when adding a new consensus engine.
///
/// The consensus engine should listen for notifications from the validator about the status of
Expand Down
26 changes: 19 additions & 7 deletions sdk/rust/src/consensus/zmq_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,11 @@ fn handle_update(
CONSENSUS_NOTIFY_PEER_MESSAGE => {
let mut request: ConsensusNotifyPeerMessage =
protobuf::parse_from_bytes(msg.get_content())?;
let mut header: ConsensusPeerMessageHeader =
protobuf::parse_from_bytes(request.get_message().get_header())?;
let mut message = request.take_message();
Update::PeerMessage(
request.take_message().into(),
from_consensus_peer_message(message, header),
request.take_sender_id().into(),
)
}
Expand Down Expand Up @@ -321,12 +324,21 @@ impl From<ConsensusPeerInfo> for PeerInfo {
}
}

impl From<ConsensusPeerMessage> for PeerMessage {
fn from(mut c_msg: ConsensusPeerMessage) -> PeerMessage {
PeerMessage {
message_type: c_msg.take_message_type(),
content: c_msg.take_content(),
}
fn from_consensus_peer_message(
mut c_msg: ConsensusPeerMessage,
mut c_msg_header: ConsensusPeerMessageHeader,
) -> PeerMessage {
PeerMessage {
header: PeerMessageHeader {
signer_id: c_msg_header.take_signer_id(),
content_sha512: c_msg_header.take_content_sha512(),
message_type: c_msg_header.take_message_type(),
name: c_msg_header.take_name(),
version: c_msg_header.take_version(),
},
header_bytes: c_msg.take_header(),
header_signature: c_msg.take_header_signature(),
content: c_msg.take_content(),
}
}

Expand Down
16 changes: 5 additions & 11 deletions sdk/rust/src/consensus/zmq_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,10 @@ impl Service for ZmqService {
message_type: &str,
payload: Vec<u8>,
) -> Result<(), Error> {
let mut message = ConsensusPeerMessage::new();
message.set_message_type(message_type.into());
message.set_content(payload);

let mut request = ConsensusSendToRequest::new();
request.set_message(message);
request.set_peer_id((*peer).clone().into());
request.set_content(payload);
request.set_message_type(message_type.into());
request.set_receiver_id((*peer).clone().into());

let response: ConsensusSendToResponse = self.rpc(
&request,
Expand All @@ -113,12 +110,9 @@ impl Service for ZmqService {
}

fn broadcast(&mut self, message_type: &str, payload: Vec<u8>) -> Result<(), Error> {
let mut message = ConsensusPeerMessage::new();
message.set_message_type(message_type.into());
message.set_content(payload);

let mut request = ConsensusBroadcastRequest::new();
request.set_message(message);
request.set_content(payload);
request.set_message_type(message_type.into());

let response: ConsensusBroadcastResponse = self.rpc(
&request,
Expand Down
8 changes: 5 additions & 3 deletions validator/sawtooth_validator/consensus/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,9 @@ def __init__(self, proxy):
def handle_request(self, request, response, connection_id):
try:
self._proxy.send_to(
request.peer_id,
request.message.SerializeToString(),
request.receiver_id,
request.message_type,
request.content,
connection_id)
except Exception: # pylint: disable=broad-except
LOGGER.exception("ConsensusSendTo")
Expand All @@ -200,7 +201,8 @@ def __init__(self, proxy):
def handle_request(self, request, response, connection_id):
try:
self._proxy.broadcast(
request.message.SerializeToString(),
request.message_type,
request.content,
connection_id)
except Exception: # pylint: disable=broad-except
LOGGER.exception("ConsensusBroadcast")
Expand Down
Loading

0 comments on commit 10cf27b

Please sign in to comment.