Skip to content
This repository has been archived by the owner on Feb 1, 2024. It is now read-only.

Commit

Permalink
Merge pull request #38 from knkski/consensus-seal-refactoring
Browse files Browse the repository at this point in the history
Add consensus seal to PBFT
  • Loading branch information
knkski committed Nov 14, 2018
2 parents 419b548 + ae815c1 commit 41a68f4
Show file tree
Hide file tree
Showing 12 changed files with 785 additions and 405 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ hex = "0.3"
log = "0.4"
log4rs = "0.8"
log4rs-syslog = "3.0"
openssl = "0.10"
protobuf = { version = "2", features = ["with-serde"] }
sawtooth-sdk = "^0.1"
serde = "1.0"
Expand All @@ -58,7 +59,6 @@ log4rs = { git = "https://github.com/ltseeley/log4rs", branch = "config-loading"

[dev-dependencies]
rand = "0.5"
rust-crypto = "0.2"

[build-dependencies]
protoc-rust = "2"
Expand Down
22 changes: 22 additions & 0 deletions protos/pbft_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,25 @@ message PbftViewChange {
// checkpoint mentioned in info's `sequence_number`
repeated PbftMessage checkpoint_messages = 2;
}

message PbftSignedCommitVote {
// Serialized ConsensusPeerMessage header
bytes header_bytes = 1;

// Signature of the serialized ConsensusPeerMessageHeader
bytes header_signature = 2;

// Serialized PBFT Message
bytes message_bytes = 3;
}

message PbftSeal {
// ID of the previous block
bytes previous_id = 1;

// Summary of the previous block
bytes summary = 2;

// 2f + 1 votes
repeated PbftSignedCommitVote previous_commit_votes = 3;
}
14 changes: 5 additions & 9 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,11 @@ fn merge_millis_setting_if_set(
/// hash.
#[cfg(test)]
pub fn mock_config(num_nodes: usize) -> PbftConfig {
use crypto::digest::Digest;
use crypto::sha2::Sha256;

let mut ids = Vec::new();
for i in 0..num_nodes {
let mut sha = Sha256::new();
sha.input_str(format!("I'm a node with ID {}", i).as_str());
ids.push(PeerId::from(sha.result_str().as_bytes().to_vec()));
}
use hash::hash_sha256;

let ids = (0..num_nodes)
.map(|i| PeerId::from(hash_sha256(format!("I'm a node with ID {}", i).as_bytes())))
.collect::<Vec<_>>();

let mut config = PbftConfig::default();
config.peers = ids;
Expand Down
13 changes: 12 additions & 1 deletion src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use sawtooth_sdk::consensus::{engine::*, service::Service};

use config;
use error::PbftError;
use message_type::ParsedMessage;
use node::PbftNode;
use state::PbftState;
use storage::get_storage;
Expand Down Expand Up @@ -133,7 +134,17 @@ fn handle_update(
}
Ok(Update::BlockCommit(block_id)) => node.on_block_commit(block_id, state)?,
Ok(Update::PeerMessage(message, sender_id)) => {
node.on_peer_message(&message.content, &sender_id, state)?
let parsed_message = ParsedMessage::from_peer_message(message, false)?;
let signer_id = parsed_message.info().get_signer_id().to_vec();

if signer_id != sender_id {
return Err(PbftError::InternalError(format!(
"Mismatch between sender ID ({:?}) and signer ID ({:?})!",
sender_id, signer_id
)));
}

node.on_peer_message(parsed_message, state)?
}
Ok(Update::Shutdown) => return Ok(false),
Ok(Update::PeerConnected(_)) | Ok(Update::PeerDisconnected(_)) => {
Expand Down
104 changes: 47 additions & 57 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use sawtooth_sdk::consensus::service::Service;

use error::PbftError;
use message_log::PbftLog;
use message_type::ParsedMessage;
use message_type::{PbftHint, PbftMessageType};
use protos::pbft_message::{PbftBlock, PbftMessage, PbftMessageInfo, PbftViewChange};
use protos::pbft_message::{PbftBlock, PbftMessage, PbftMessageInfo};
use state::{PbftPhase, PbftState, WorkingBlockOption};

/// Handle a `PrePrepare` message
Expand All @@ -37,9 +38,10 @@ use state::{PbftPhase, PbftState, WorkingBlockOption};
pub fn pre_prepare(
state: &mut PbftState,
msg_log: &mut PbftLog,
pbft_message: &PbftMessage,
message: &ParsedMessage,
) -> Result<(), PbftError> {
let info = pbft_message.get_info();
let pbft_message = message.get_pbft_message();
let info = message.info();

check_view_mismatch(state, info)?;

Expand Down Expand Up @@ -164,33 +166,23 @@ pub fn commit(
state: &mut PbftState,
msg_log: &mut PbftLog,
service: &mut Service,
pbft_message: &PbftMessage,
msg_content: Vec<u8>,
sender_id: &PeerId,
message: &ParsedMessage,
) -> Result<(), PbftError> {
let working_block = clone_working_block(state)?;

state.switch_phase(PbftPhase::Finished);

check_if_block_already_seen(state, &working_block, pbft_message)?;
check_if_block_already_seen(state, &working_block, message)?;

check_if_commiting_with_current_chain_head(
state,
msg_log,
service,
pbft_message,
msg_content,
&working_block,
sender_id,
)?;
check_if_commiting_with_current_chain_head(state, msg_log, service, message, &working_block)?;

info!(
"{}: Committing block {:?}",
state,
pbft_message.get_block().block_id.clone()
message.get_block().block_id.clone()
);

commit_block_from_message(service, pbft_message)?;
commit_block_from_message(service, message)?;

reset_working_block(state);

Expand All @@ -208,20 +200,18 @@ fn clone_working_block(state: &PbftState) -> Result<PbftBlock, PbftError> {
fn check_if_block_already_seen(
state: &PbftState,
working_block: &PbftBlock,
pbft_message: &PbftMessage,
message: &ParsedMessage,
) -> Result<(), PbftError> {
let block = message.get_block();
let block_id = &block.block_id;
// Don't commit if we've seen this block already, but go ahead if we somehow
// skipped a block.
if pbft_message.get_block().get_block_id() != working_block.get_block_id()
&& pbft_message.get_block().get_block_num() >= working_block.get_block_num()
if block_id != &working_block.get_block_id()
&& block.get_block_num() >= working_block.get_block_num()
{
warn!(
"{}: Not committing block {:?}",
state,
pbft_message.get_block().block_id
);
warn!("{}: Not committing block {:?}", state, block_id);
Err(PbftError::BlockMismatch(
pbft_message.get_block().clone(),
block.clone(),
working_block.clone(),
))
} else {
Expand All @@ -234,27 +224,28 @@ fn check_if_commiting_with_current_chain_head(
state: &mut PbftState,
msg_log: &mut PbftLog,
service: &mut Service,
pbft_message: &PbftMessage,
msg_content: Vec<u8>,
message: &ParsedMessage,
working_block: &PbftBlock,
sender_id: &PeerId,
) -> Result<(), PbftError> {
let block = message.get_block();
let block_id = block.get_block_id().to_vec();

let head = service
.get_chain_head()
.map_err(|e| PbftError::InternalError(e.description().to_string()))?;
let cur_block = get_block_by_id(
&mut *service,
&pbft_message.get_block().get_block_id().to_vec(),
).ok_or_else(|| PbftError::WrongNumBlocks)?;

let cur_block = get_block_by_id(&mut *service, &block_id.to_vec())
.ok_or_else(|| PbftError::WrongNumBlocks)?;

if cur_block.previous_id != head.block_id {
warn!(
"{}: Not committing block {:?} but pushing to backlog",
state,
pbft_message.get_block().block_id.clone()
block_id.clone()
);
msg_log.push_backlog(msg_content, sender_id.clone());
msg_log.push_backlog(message.clone());
Err(PbftError::BlockMismatch(
pbft_message.get_block().clone(),
block.clone(),
working_block.clone(),
))
} else {
Expand All @@ -264,10 +255,10 @@ fn check_if_commiting_with_current_chain_head(

fn commit_block_from_message(
service: &mut Service,
pbft_message: &PbftMessage,
message: &ParsedMessage,
) -> Result<(), PbftError> {
service
.commit_block(pbft_message.get_block().block_id.clone())
.commit_block(message.get_block().block_id.clone())
.map_err(|_| PbftError::InternalError(String::from("Failed to commit block")))
}

Expand All @@ -280,15 +271,15 @@ fn reset_working_block(state: &mut PbftState) {
/// which in turn call `action_from_hint()`, and either push to backlog for future messages, or add
/// to message log for past messages. This usually only makes sense for regular multicast messages
/// (`PrePrepare`, `Prepare`, and `Commit`)
pub fn multicast_hint(state: &PbftState, pbft_message: &PbftMessage) -> PbftHint {
let msg_info = pbft_message.get_info();
pub fn multicast_hint(state: &PbftState, message: &ParsedMessage) -> PbftHint {
let msg_info = message.info();
let msg_type = PbftMessageType::from(msg_info.get_msg_type());

if msg_info.get_seq_num() > state.seq_num {
debug!(
"{}: seq {} > {}, accept all.",
state,
pbft_message.get_info().get_seq_num(),
msg_info.get_seq_num(),
state.seq_num
);
return PbftHint::FutureMessage;
Expand Down Expand Up @@ -345,7 +336,7 @@ pub fn view_change(
state: &mut PbftState,
msg_log: &mut PbftLog,
service: &mut Service,
vc_message: &PbftViewChange,
vc_message: &ParsedMessage,
) -> Result<(), PbftError> {
check_received_enough_view_changes(state, msg_log, vc_message)?;

Expand Down Expand Up @@ -380,13 +371,13 @@ pub fn force_view_change(state: &mut PbftState, service: &mut Service) {
fn check_received_enough_view_changes(
state: &PbftState,
msg_log: &PbftLog,
vc_message: &PbftViewChange,
vc_message: &ParsedMessage,
) -> Result<(), PbftError> {
msg_log.check_msg_against_log(&vc_message, true, 2 * state.f + 1)
msg_log.check_msg_against_log(vc_message, true, 2 * state.f + 1)
}

fn set_current_view_from_msg(state: &mut PbftState, vc_message: &PbftViewChange) {
set_current_view(state, vc_message.get_info().get_view())
fn set_current_view_from_msg(state: &mut PbftState, vc_message: &ParsedMessage) {
set_current_view(state, vc_message.info().get_view())
}

fn set_current_view(state: &mut PbftState, view: u64) {
Expand Down Expand Up @@ -476,19 +467,18 @@ pub fn pbft_block_from_block(block: Block) -> PbftBlock {
mod tests {
use super::*;
use config;
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use hash::hash_sha256;

fn mock_peer_id(num: u64) -> PeerId {
let mut sha = Sha256::new();
sha.input_str(format!("I'm a peer (number {})", num).as_str());
PeerId::from(sha.result_str().as_bytes().to_vec())
PeerId::from(hash_sha256(
format!("I'm a peer (number {})", num).as_bytes(),
))
}

fn mock_block_id(num: u64) -> BlockId {
let mut sha = Sha256::new();
sha.input_str(format!("I'm a block with block num {}", num).as_str());
BlockId::from(sha.result_str().as_bytes().to_vec())
BlockId::from(hash_sha256(
format!("I'm a block with block num {}", num).as_bytes(),
))
}

fn mock_block(num: u64) -> Block {
Expand All @@ -508,12 +498,12 @@ mod tests {
seq_num: u64,
block: Block,
from: u64,
) -> PbftMessage {
) -> ParsedMessage {
let info = make_msg_info(&msg_type, view, seq_num, mock_peer_id(from));
let mut pbft_msg = PbftMessage::new();
pbft_msg.set_info(info);
pbft_msg.set_block(pbft_block_from_block(block.clone()));
pbft_msg
ParsedMessage::from_pbft_message(pbft_msg)
}

#[test]
Expand Down
36 changes: 36 additions & 0 deletions src/hash.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/// Contains common hashing functions
use openssl::sha::{Sha256, Sha512};

use error::PbftError;

/// Hashes the given bytes with SHA-256
pub fn hash_sha256(bytes: &[u8]) -> Vec<u8> {
let mut sha = Sha256::new();
sha.update(bytes);
let mut bytes = Vec::new();
bytes.extend(sha.finish().iter());
bytes
}

/// Hashes the given bytes with SHA-512
pub fn hash_sha512(bytes: &[u8]) -> Vec<u8> {
let mut sha = Sha512::new();
sha.update(bytes);
let mut bytes = Vec::new();
bytes.extend(sha.finish().iter());
bytes
}

/// Verifies that the SHA-512 hash of the given content matches the given hash
pub fn verify_sha512(content: &[u8], content_hash: &[u8]) -> Result<(), PbftError> {
let computed_sha512 = hash_sha512(&content);

if computed_sha512 != content_hash {
Err(PbftError::InternalError(format!(
"Hash verification failed! Content: `{:?}`, Hash: `{:?}`",
content, content_hash
)))
} else {
Ok(())
}
}
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
extern crate atomicwrites;
#[macro_use]
extern crate clap;
#[cfg(test)]
extern crate crypto;
#[macro_use]
extern crate log;
extern crate hex;
extern crate log4rs;
extern crate log4rs_syslog;
extern crate openssl;
extern crate protobuf;
extern crate sawtooth_sdk;
extern crate serde;
Expand All @@ -50,6 +49,7 @@ pub mod config;
pub mod engine;
pub mod error;
pub mod handlers;
pub mod hash;
pub mod message_extensions;
pub mod message_log;
pub mod message_type;
Expand Down

0 comments on commit 41a68f4

Please sign in to comment.