Skip to content

Commit

Permalink
feat(node): Generate events for data sampling that can be used by fro…
Browse files Browse the repository at this point in the history
…nt-end (#276)

Signed-off-by: Yiannis Marangos <psyberbits@gmail.com>
Co-authored-by: Mikołaj Florkiewicz <mikolaj@florkiewicz.me>
  • Loading branch information
oblique and fl0rek committed May 20, 2024
1 parent e0ced8d commit dbdf1ce
Show file tree
Hide file tree
Showing 10 changed files with 573 additions and 86 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::env;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context, Result};
use celestia_rpc::prelude::*;
Expand All @@ -10,11 +9,11 @@ use clap::Parser;
use directories::ProjectDirs;
use libp2p::{identity, multiaddr::Protocol, Multiaddr};
use lumina_node::blockstore::RedbBlockstore;
use lumina_node::events::NodeEvent;
use lumina_node::network::{canonical_network_bootnodes, network_genesis, network_id, Network};
use lumina_node::node::{Node, NodeConfig};
use lumina_node::store::{RedbStore, Store};
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use tracing::info;
use tracing::warn;

Expand Down Expand Up @@ -80,11 +79,18 @@ pub(crate) async fn run(args: Params) -> Result<()> {
.context("Failed to start node")?;

node.wait_connected_trusted().await?;
let mut events = node.event_subscriber();

// We have nothing else to do, but we want to keep main alive
loop {
sleep(Duration::from_secs(1)).await;
while let Ok(ev) = events.recv().await {
match ev.event {
// Skip noisy events
NodeEvent::ShareSamplingResult { .. } => continue,
event => info!("{event}"),
}
}

Ok(())
}

async fn open_db(path: Option<PathBuf>, network_id: &str) -> Result<Arc<redb::Database>> {
Expand Down
3 changes: 2 additions & 1 deletion node-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ lumina-node = { workspace = true }

anyhow = "1.0.71"
console_error_panic_hook = "0.1.7"
js-sys = "0.3.64"
js-sys = "0.3.68"
serde = { version = "1.0.164", features = ["derive"] }
serde_repr = "0.1"
serde-wasm-bindgen = "0.6.0"
Expand All @@ -39,3 +39,4 @@ tracing-subscriber = { version = "0.3.18", features = ["time"] }
tracing-web = "0.1.2"
wasm-bindgen = "0.2.88"
wasm-bindgen-futures = "0.4.37"
web-sys = { version = "0.3.68", features = ["BroadcastChannel", "Crypto"] }
83 changes: 56 additions & 27 deletions node-wasm/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ use serde::Serialize;
use serde_wasm_bindgen::{from_value, to_value};
use tracing::info;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
use web_sys::BroadcastChannel;

use crate::utils::js_value_from_display;
use crate::utils::JsContext;
use crate::utils::Network;
use crate::utils::{get_crypto, js_value_from_display, JsContext, Network};
use crate::wrapper::libp2p::NetworkInfo;
use crate::Result;

/// Lumina wasm node.
#[wasm_bindgen(js_name = Node)]
struct WasmNode(Node<IndexedDbStore>);
struct WasmNode {
node: Node<IndexedDbStore>,
events_channel_name: String,
}

/// Config for the lumina wasm node.
#[wasm_bindgen(js_name = NodeConfig)]
Expand Down Expand Up @@ -55,38 +58,59 @@ impl WasmNode {
.await
.js_context("Failed to start the node")?;

Ok(Self(node))
let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());
let events_channel = BroadcastChannel::new(&events_channel_name)
.map_err(|_| JsError::new("Failed to allocate BroadcastChannel"))?;

let mut events_sub = node.event_subscriber();

spawn_local(async move {
while let Ok(ev) = events_sub.recv().await {
if let Ok(val) = to_value(&ev) {
if events_channel.post_message(&val).is_err() {
break;
}
}
}

events_channel.close();
});

Ok(WasmNode {
node,
events_channel_name,
})
}

/// Get node's local peer ID.
pub fn local_peer_id(&self) -> String {
self.0.local_peer_id().to_string()
self.node.local_peer_id().to_string()
}

/// Get current [`PeerTracker`] info.
pub fn peer_tracker_info(&self) -> Result<JsValue> {
let peer_tracker_info = self.0.peer_tracker_info();
let peer_tracker_info = self.node.peer_tracker_info();
Ok(to_value(&peer_tracker_info)?)
}

/// Wait until the node is connected to at least 1 peer.
pub async fn wait_connected(&self) -> Result<()> {
Ok(self.0.wait_connected().await?)
Ok(self.node.wait_connected().await?)
}

/// Wait until the node is connected to at least 1 trusted peer.
pub async fn wait_connected_trusted(&self) -> Result<()> {
Ok(self.0.wait_connected_trusted().await?)
Ok(self.node.wait_connected_trusted().await?)
}

/// Get current network info.
pub async fn network_info(&self) -> Result<NetworkInfo> {
Ok(self.0.network_info().await?.into())
Ok(self.node.network_info().await?.into())
}

/// Get all the multiaddresses on which the node listens.
pub async fn listeners(&self) -> Result<Array> {
let listeners = self.0.listeners().await?;
let listeners = self.node.listeners().await?;

Ok(listeners
.iter()
Expand All @@ -97,7 +121,7 @@ impl WasmNode {
/// Get all the peers that node is connected to.
pub async fn connected_peers(&self) -> Result<Array> {
Ok(self
.0
.node
.connected_peers()
.await?
.iter()
Expand All @@ -108,25 +132,25 @@ impl WasmNode {
/// Trust or untrust the peer with a given ID.
pub async fn set_peer_trust(&self, peer_id: &str, is_trusted: bool) -> Result<()> {
let peer_id = peer_id.parse().js_context("Parsing peer id failed")?;
Ok(self.0.set_peer_trust(peer_id, is_trusted).await?)
Ok(self.node.set_peer_trust(peer_id, is_trusted).await?)
}

/// Request the head header from the network.
pub async fn request_head_header(&self) -> Result<JsValue> {
let eh = self.0.request_head_header().await?;
let eh = self.node.request_head_header().await?;
Ok(to_value(&eh)?)
}

/// Request a header for the block with a given hash from the network.
pub async fn request_header_by_hash(&self, hash: &str) -> Result<JsValue> {
let hash: Hash = hash.parse()?;
let eh = self.0.request_header_by_hash(&hash).await?;
let eh = self.node.request_header_by_hash(&hash).await?;
Ok(to_value(&eh)?)
}

/// Request a header for the block with a given height from the network.
pub async fn request_header_by_height(&self, height: u64) -> Result<JsValue> {
let eh = self.0.request_header_by_height(height).await?;
let eh = self.node.request_header_by_height(height).await?;
Ok(to_value(&eh)?)
}

Expand All @@ -136,7 +160,7 @@ impl WasmNode {
pub async fn request_verified_headers(&self, from: JsValue, amount: u64) -> Result<Array> {
let header =
from_value::<ExtendedHeader>(from).js_context("Parsing extended header failed")?;
let verified_headers = self.0.request_verified_headers(&header, amount).await?;
let verified_headers = self.node.request_verified_headers(&header, amount).await?;

Ok(verified_headers
.iter()
Expand All @@ -146,32 +170,32 @@ impl WasmNode {

/// Get current header syncing info.
pub async fn syncer_info(&self) -> Result<JsValue> {
let syncer_info = self.0.syncer_info().await?;
let syncer_info = self.node.syncer_info().await?;
Ok(to_value(&syncer_info)?)
}

/// Get the latest header announced in the network.
pub fn get_network_head_header(&self) -> Result<JsValue> {
let maybe_head_hedaer = self.0.get_network_head_header();
let maybe_head_hedaer = self.node.get_network_head_header();
Ok(to_value(&maybe_head_hedaer)?)
}

/// Get the latest locally synced header.
pub async fn get_local_head_header(&self) -> Result<JsValue> {
let local_head = self.0.get_local_head_header().await?;
let local_head = self.node.get_local_head_header().await?;
Ok(to_value(&local_head)?)
}

/// Get a synced header for the block with a given hash.
pub async fn get_header_by_hash(&self, hash: &str) -> Result<JsValue> {
let hash: Hash = hash.parse().js_context("parsing hash failed")?;
let eh = self.0.get_header_by_hash(&hash).await?;
let eh = self.node.get_header_by_hash(&hash).await?;
Ok(to_value(&eh)?)
}

/// Get a synced header for the block with a given height.
pub async fn get_header_by_height(&self, height: u64) -> Result<JsValue> {
let eh = self.0.get_header_by_height(height).await?;
let eh = self.node.get_header_by_height(height).await?;
Ok(to_value(&eh)?)
}

Expand All @@ -190,18 +214,18 @@ impl WasmNode {
end_height: Option<u64>,
) -> Result<JsValue> {
let headers = match (start_height, end_height) {
(None, None) => self.0.get_headers(..).await,
(Some(start), None) => self.0.get_headers(start..).await,
(None, Some(end)) => self.0.get_headers(..=end).await,
(Some(start), Some(end)) => self.0.get_headers(start..=end).await,
(None, None) => self.node.get_headers(..).await,
(Some(start), None) => self.node.get_headers(start..).await,
(None, Some(end)) => self.node.get_headers(..=end).await,
(Some(start), Some(end)) => self.node.get_headers(start..=end).await,
}?;

Ok(to_value(&headers)?)
}

/// Get data sampling metadata of an already sampled height.
pub async fn get_sampling_metadata(&self, height: u64) -> Result<JsValue> {
let metadata = self.0.get_sampling_metadata(height).await?;
let metadata = self.node.get_sampling_metadata(height).await?;

#[derive(Serialize)]
struct Intermediate {
Expand All @@ -216,6 +240,11 @@ impl WasmNode {

Ok(to_value(&metadata)?)
}

/// Returns a [`BroadcastChannel`] for events generated by [`Node`].
pub fn events_channel(&self) -> Result<BroadcastChannel> {
Ok(BroadcastChannel::new(&self.events_channel_name).unwrap())
}
}

#[wasm_bindgen(js_class = NodeConfig)]
Expand Down
8 changes: 8 additions & 0 deletions node-wasm/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing_subscriber::fmt::time::UtcTime;
use tracing_subscriber::prelude::*;
use tracing_web::{performance_layer, MakeConsoleWriter};
use wasm_bindgen::prelude::*;
use web_sys::Crypto;

/// Supported Celestia networks.
#[wasm_bindgen]
Expand Down Expand Up @@ -87,3 +88,10 @@ where
self.map_err(|e| JsError::new(&format!("{context}: {e}")))
}
}

pub(crate) fn get_crypto() -> Result<Crypto, JsError> {
js_sys::Reflect::get(&js_sys::global(), &JsValue::from_str("crypto"))
.map_err(|_| JsError::new("failed to get `crypto` from global object"))?
.dyn_into::<web_sys::Crypto>()
.map_err(|_| JsError::new("`crypto` is not `Crypto` type"))
}
Loading

0 comments on commit dbdf1ce

Please sign in to comment.