Skip to content

Commit

Permalink
feat: use async for UnitReader and UnitWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
maan2003 committed Jan 19, 2024
1 parent 976c164 commit 57e0cfc
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 20 deletions.
26 changes: 20 additions & 6 deletions consensus/src/backup/loader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData};

use async_trait::async_trait;
use codec::{Decode, Error as CodecError};
use futures::channel::oneshot;
use log::{error, info, warn};
Expand Down Expand Up @@ -63,14 +64,28 @@ impl From<CodecError> for LoaderError {
}
}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: Read> {
#[async_trait]
pub trait UnitReader {
async fn read(&mut self) -> std::io::Result<Vec<u8>>;
}

#[async_trait]
impl<R: Read + Send> UnitReader for R {
async fn read(&mut self) -> std::io::Result<Vec<u8>> {
let mut buf = Vec::new();
self.read_to_end(&mut buf)?;

This comment has been minimized.

Copy link
@douglaz

douglaz Jan 19, 2024

shouldn't you block_in_place here?

This comment has been minimized.

Copy link
@maan2003

maan2003 Jan 19, 2024

Author Owner

right, but ideally this impl should never be used. i just left it for not breaking backwards compat

Ok(buf)
}
}

pub struct BackupLoader<H: Hasher, D: Data, S: Signature, R: UnitReader> {
backup: R,
index: NodeIndex,
session_id: SessionId,
_phantom: PhantomData<(H, D, S)>,
}

impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
impl<H: Hasher, D: Data, S: Signature, R: UnitReader> BackupLoader<H, D, S, R> {
pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader<H, D, S, R> {
BackupLoader {
backup,
Expand All @@ -80,9 +95,8 @@ impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
}
}

fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let mut buf = Vec::new();
self.backup.read_to_end(&mut buf)?;
async fn load(&mut self) -> Result<Vec<UncheckedSignedUnit<H, D, S>>, LoaderError> {
let buf = self.backup.read().await?;
let input = &mut &buf[..];
let mut result = Vec::new();
while !input.is_empty() {
Expand Down Expand Up @@ -163,7 +177,7 @@ impl<H: Hasher, D: Data, S: Signature, R: Read> BackupLoader<H, D, S, R> {
starting_round: oneshot::Sender<Option<Round>>,
next_round_collection: oneshot::Receiver<Round>,
) {
let units = match self.load() {
let units = match self.load().await {
Ok(items) => items,
Err(e) => {
error!(target: LOG_TARGET, "unable to load backup data: {}", e);
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/backup/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub use loader::BackupLoader;
pub use loader::UnitReader;
pub use saver::BackupSaver;
pub use saver::UnitWriter;

mod loader;
mod saver;
29 changes: 22 additions & 7 deletions consensus/src/backup/saver.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
use std::io::Write;

use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature, Terminator};
use async_trait::async_trait;
use codec::Encode;
use futures::{FutureExt, StreamExt};
use log::{debug, error};

const LOG_TARGET: &str = "AlephBFT-backup-saver";

#[async_trait]
pub trait UnitWriter {
async fn write(&mut self, data: &[u8]) -> std::io::Result<()>;
}

#[async_trait]
impl<W: Write + Send> UnitWriter for W {
async fn write(&mut self, data: &[u8]) -> std::io::Result<()> {
Write::write(self, data)?;
self.flush()
}
}

/// Component responsible for saving units into backup.
/// It waits for items to appear on its receivers, and writes them to backup.
/// It announces a successful write through an appropriate response sender.
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: Write> {
pub struct BackupSaver<H: Hasher, D: Data, S: Signature, W: UnitWriter> {
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
backup: W,
}

impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
impl<H: Hasher, D: Data, S: Signature, W: UnitWriter> BackupSaver<H, D, S, W> {
pub fn new(
units_from_runway: Receiver<UncheckedSignedUnit<H, D, S>>,
responses_for_runway: Sender<UncheckedSignedUnit<H, D, S>>,
Expand All @@ -29,10 +43,11 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
}
}

pub fn save_item(&mut self, item: &UncheckedSignedUnit<H, D, S>) -> Result<(), std::io::Error> {
self.backup.write_all(&item.encode())?;
self.backup.flush()?;
Ok(())
pub async fn save_item(
&mut self,
item: &UncheckedSignedUnit<H, D, S>,
) -> Result<(), std::io::Error> {
self.backup.write(&item.encode()).await
}

pub async fn run(&mut self, mut terminator: Terminator) {
Expand All @@ -47,7 +62,7 @@ impl<H: Hasher, D: Data, S: Signature, W: Write> BackupSaver<H, D, S, W> {
break;
},
};
if let Err(e) = self.save_item(&item) {
if let Err(e) = self.save_item(&item).await {

This comment has been minimized.

Copy link
@dpc

dpc Jan 19, 2024

Oh, so did I read it right that the old implementation was calling a blocking io trait without offloading it from the executor? That would explain why things were weird when attempting to fix it. @maan2003

error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e);
break;
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use aleph_bft_types::{
PartialMultisignature, PartiallyMultisigned, Recipient, Round, SessionId, Signable, Signature,
SignatureError, SignatureSet, Signed, SpawnHandle, TaskHandle, UncheckedSigned,
};
pub use backup::{UnitReader, UnitWriter};
pub use config::{
create_config, default_config, default_delay_config, exponential_slowdown, Config, DelayConfig,
};
Expand Down
11 changes: 9 additions & 2 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
units::{UncheckedSignedUnit, UnitCoord},
Config, Data, DataProvider, FinalizationHandler, Hasher, MultiKeychain, Network, NodeIndex,
Receiver, Recipient, Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned,
UnitReader, UnitWriter,
};
use aleph_bft_types::NodeMap;
use codec::{Decode, Encode};
Expand Down Expand Up @@ -108,15 +109,21 @@ enum TaskDetails<H: Hasher, D: Data, S: Signature> {
}

#[derive(Clone)]
pub struct LocalIO<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read> {
pub struct LocalIO<
D: Data,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
US: UnitWriter,
UL: UnitReader,
> {
data_provider: DP,
finalization_handler: FH,
unit_saver: US,
unit_loader: UL,
_phantom: PhantomData<D>,
}

impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: Write, UL: Read>
impl<D: Data, DP: DataProvider<D>, FH: FinalizationHandler<D>, US: UnitWriter, UL: UnitReader>
LocalIO<D, DP, FH, US, UL>
{
pub fn new(
Expand Down
10 changes: 5 additions & 5 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
},
Config, Data, DataProvider, FinalizationHandler, Hasher, Index, Keychain, MultiKeychain,
NodeCount, NodeIndex, NodeMap, Receiver, Round, Sender, Signature, Signed, SpawnHandle,
Terminator, UncheckedSigned,
Terminator, UncheckedSigned, UnitReader, UnitWriter,
};
use aleph_bft_types::Recipient;
use futures::{
Expand Down Expand Up @@ -871,8 +871,8 @@ pub struct RunwayIO<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: UnitWriter + Send + Sync + 'static,
R: UnitReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> {
Expand All @@ -887,8 +887,8 @@ impl<
H: Hasher,
D: Data,
MK: MultiKeychain,
W: Write + Send + Sync + 'static,
R: Read + Send + Sync + 'static,
W: UnitWriter + Send + Sync + 'static,
R: UnitReader + Send + Sync + 'static,
DP: DataProvider<D>,
FH: FinalizationHandler<D>,
> RunwayIO<H, D, MK, W, R, DP, FH>
Expand Down

0 comments on commit 57e0cfc

Please sign in to comment.