Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/db/parity_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite};
use crate::rpc::eth::types::EthHash;
use crate::utils::{broadcast::has_subscribers, multihash::prelude::*};
use anyhow::Context as _;
use bytes::Bytes;
use cid::Cid;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::DAG_CBOR;
Expand Down Expand Up @@ -83,7 +84,7 @@ impl DbColumn {
}
}

type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<Vec<(Cid, bytes::Bytes)>>;
type WriteOpsBroadcastTxSender = tokio::sync::broadcast::Sender<Vec<(Cid, Bytes)>>;

pub struct ParityDb {
pub db: parity_db::Db,
Expand Down Expand Up @@ -242,7 +243,7 @@ impl Blockstore for ParityDb {
self.write_to_column(k.to_bytes(), block, column)?;
match &*self.write_ops_broadcast_tx.read() {
Some(tx) if has_subscribers(tx) => {
let _ = tx.send(vec![(*k, bytes::Bytes::copy_from_slice(block))]);
let _ = tx.send(vec![(*k, Bytes::copy_from_slice(block))]);
}
_ => {}
}
Expand All @@ -263,7 +264,7 @@ impl Blockstore for ParityDb {
let column = Self::choose_column(&k);
let v = v.as_ref().to_vec();
if has_subscribers {
values_for_subscriber.push((k, bytes::Bytes::copy_from_slice(&v)));
values_for_subscriber.push((k, Bytes::copy_from_slice(&v)));
}
(column, k.to_bytes(), v)
});
Expand Down Expand Up @@ -372,7 +373,7 @@ impl ParityDb {
}

impl super::BlockstoreWriteOpsSubscribable for ParityDb {
fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, bytes::Bytes)>> {
fn subscribe_write_ops(&self) -> tokio::sync::broadcast::Receiver<Vec<(Cid, Bytes)>> {
let tx_lock = self.write_ops_broadcast_tx.read();
if let Some(tx) = &*tx_lock {
return tx.subscribe();
Expand Down Expand Up @@ -557,7 +558,7 @@ mod test {
for (idx, cid) in cids.iter().enumerate() {
let data_entry = &data[idx];
db.put_keyed(cid, data_entry).unwrap();
let expected = vec![(*cid, bytes::Bytes::copy_from_slice(data_entry))];
let expected = vec![(*cid, Bytes::copy_from_slice(data_entry))];
assert_eq!(rx1.blocking_recv().unwrap(), expected);
assert_eq!(rx2.blocking_recv().unwrap(), expected);
}
Expand Down
2 changes: 1 addition & 1 deletion src/libp2p_bitswap/internals/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl request_response::Codec for BitswapRequestResponseCodec {
let cid = prefix.to_cid(&payload.data).map_err(io::Error::other)?;
parts.push(BitswapMessage::Response(
cid,
BitswapResponse::Block(payload.data.to_vec()),
BitswapResponse::Block(payload.data),
));
}

Expand Down
2 changes: 1 addition & 1 deletion src/rpc/methods/eth/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl EthEventHandler {
}
};

let entries: Vec<crate::shim::executor::Entry> = event.event().entries();
let entries: Vec<crate::shim::executor::Entry> = event.entries();
let matched = if let Some(spec) = spec {
spec.matches(&resolved, &entries)?
} else {
Expand Down
9 changes: 3 additions & 6 deletions src/rpc/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,9 @@ lotus_json_with_self!(Event);

impl From<StampedEvent> for Event {
fn from(stamped: StampedEvent) -> Self {
let emitter = stamped.emitter();
let entries = stamped
.event()
.entries()
.into_entries()
.into_iter()
.map(|entry| {
let (flags, key, codec, value) = entry.into_parts();
Expand All @@ -561,10 +561,7 @@ impl From<StampedEvent> for Event {
})
.collect();

Event {
emitter: stamped.emitter(),
entries,
}
Event { emitter, entries }
}
}

Expand Down
18 changes: 9 additions & 9 deletions src/shim/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,6 @@ pub enum ActorEvent {
V4(ActorEvent_v4),
}

impl ActorEvent {
pub fn entries(&self) -> Vec<Entry> {
delegate_actor_event!(self => |e| e.entries.clone().into_iter().map(Into::into).collect())
}
}

/// Event with extra information stamped by the FVM.
#[delegated_enum(impl_conversions)]
#[derive(Clone, Debug, Serialize)]
Expand All @@ -246,9 +240,15 @@ impl StampedEvent {
delegate_stamped_event!(self.emitter)
}

/// Returns the event as emitted by the actor.
pub fn event(&self) -> ActorEvent {
delegate_stamped_event!(self.event.clone().into())
pub fn entries(&self) -> Vec<Entry> {
delegate_stamped_event!(self => |e| e.event.entries.iter().cloned().map(Into::into).collect())
}

pub fn into_entries(self) -> Vec<Entry> {
match self {
StampedEvent::V3(e) => e.event.entries.into_iter().map(Into::into).collect(),
StampedEvent::V4(e) => e.event.entries.into_iter().map(Into::into).collect(),
}
Comment thread
LesnyRumcajs marked this conversation as resolved.
}

/// Loads events directly from the events AMT root CID.
Expand Down
Loading