Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): Generate events for data sampling that can be used by front-end #276

Merged
merged 10 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions cli/src/native.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::env;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context, Result};
use celestia_rpc::prelude::*;
Expand All @@ -10,11 +9,11 @@ use clap::Parser;
use directories::ProjectDirs;
use libp2p::{identity, multiaddr::Protocol, Multiaddr};
use lumina_node::blockstore::RedbBlockstore;
use lumina_node::events::NodeEvent;
use lumina_node::network::{canonical_network_bootnodes, network_genesis, network_id, Network};
use lumina_node::node::{Node, NodeConfig};
use lumina_node::store::{RedbStore, Store};
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use tracing::info;
use tracing::warn;

Expand Down Expand Up @@ -80,11 +79,18 @@ pub(crate) async fn run(args: Params) -> Result<()> {
.context("Failed to start node")?;

node.wait_connected_trusted().await?;
let mut events = node.event_subscriber();

// We have nothing else to do, but we want to keep main alive
loop {
sleep(Duration::from_secs(1)).await;
while let Ok(ev) = events.recv().await {
match ev.event {
// Skip noisy events
NodeEvent::ShareSamplingResult { .. } => continue,
event => info!("{event}"),
}
}

Ok(())
}

async fn open_db(path: Option<PathBuf>, network_id: &str) -> Result<Arc<redb::Database>> {
Expand Down
3 changes: 2 additions & 1 deletion node-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ lumina-node = { workspace = true }

anyhow = "1.0.71"
console_error_panic_hook = "0.1.7"
js-sys = "0.3.64"
js-sys = "0.3.68"
serde = { version = "1.0.164", features = ["derive"] }
serde_repr = "0.1"
serde-wasm-bindgen = "0.6.0"
Expand All @@ -39,3 +39,4 @@ tracing-subscriber = { version = "0.3.18", features = ["time"] }
tracing-web = "0.1.2"
wasm-bindgen = "0.2.88"
wasm-bindgen-futures = "0.4.37"
web-sys = { version = "0.3.68", features = ["BroadcastChannel", "Crypto"] }
83 changes: 56 additions & 27 deletions node-wasm/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ use serde::Serialize;
use serde_wasm_bindgen::{from_value, to_value};
use tracing::info;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
use web_sys::BroadcastChannel;

use crate::utils::js_value_from_display;
use crate::utils::JsContext;
use crate::utils::Network;
use crate::utils::{get_crypto, js_value_from_display, JsContext, Network};
use crate::wrapper::libp2p::NetworkInfo;
use crate::Result;

/// Lumina wasm node.
#[wasm_bindgen(js_name = Node)]
struct WasmNode(Node<IndexedDbStore>);
struct WasmNode {
node: Node<IndexedDbStore>,
events_channel_name: String,
}

/// Config for the lumina wasm node.
#[wasm_bindgen(js_name = NodeConfig)]
Expand Down Expand Up @@ -55,38 +58,59 @@ impl WasmNode {
.await
.js_context("Failed to start the node")?;

Ok(Self(node))
let events_channel_name = format!("NodeEventChannel-{}", get_crypto()?.random_uuid());
let events_channel = BroadcastChannel::new(&events_channel_name)
.map_err(|_| JsError::new("Failed to allocate BroadcastChannel"))?;

let mut events_sub = node.event_subscriber();

spawn_local(async move {
while let Ok(ev) = events_sub.recv().await {
if let Ok(val) = to_value(&ev) {
if events_channel.post_message(&val).is_err() {
break;
}
}
}

events_channel.close();
});

Ok(WasmNode {
node,
events_channel_name,
})
}

/// Get node's local peer ID.
pub fn local_peer_id(&self) -> String {
self.0.local_peer_id().to_string()
self.node.local_peer_id().to_string()
}

/// Get current [`PeerTracker`] info.
pub fn peer_tracker_info(&self) -> Result<JsValue> {
let peer_tracker_info = self.0.peer_tracker_info();
let peer_tracker_info = self.node.peer_tracker_info();
Ok(to_value(&peer_tracker_info)?)
}

/// Wait until the node is connected to at least 1 peer.
pub async fn wait_connected(&self) -> Result<()> {
Ok(self.0.wait_connected().await?)
Ok(self.node.wait_connected().await?)
}

/// Wait until the node is connected to at least 1 trusted peer.
pub async fn wait_connected_trusted(&self) -> Result<()> {
Ok(self.0.wait_connected_trusted().await?)
Ok(self.node.wait_connected_trusted().await?)
}

/// Get current network info.
pub async fn network_info(&self) -> Result<NetworkInfo> {
Ok(self.0.network_info().await?.into())
Ok(self.node.network_info().await?.into())
}

/// Get all the multiaddresses on which the node listens.
pub async fn listeners(&self) -> Result<Array> {
let listeners = self.0.listeners().await?;
let listeners = self.node.listeners().await?;

Ok(listeners
.iter()
Expand All @@ -97,7 +121,7 @@ impl WasmNode {
/// Get all the peers that node is connected to.
pub async fn connected_peers(&self) -> Result<Array> {
Ok(self
.0
.node
.connected_peers()
.await?
.iter()
Expand All @@ -108,25 +132,25 @@ impl WasmNode {
/// Trust or untrust the peer with a given ID.
pub async fn set_peer_trust(&self, peer_id: &str, is_trusted: bool) -> Result<()> {
let peer_id = peer_id.parse().js_context("Parsing peer id failed")?;
Ok(self.0.set_peer_trust(peer_id, is_trusted).await?)
Ok(self.node.set_peer_trust(peer_id, is_trusted).await?)
}

/// Request the head header from the network.
pub async fn request_head_header(&self) -> Result<JsValue> {
let eh = self.0.request_head_header().await?;
let eh = self.node.request_head_header().await?;
Ok(to_value(&eh)?)
}

/// Request a header for the block with a given hash from the network.
pub async fn request_header_by_hash(&self, hash: &str) -> Result<JsValue> {
let hash: Hash = hash.parse()?;
let eh = self.0.request_header_by_hash(&hash).await?;
let eh = self.node.request_header_by_hash(&hash).await?;
Ok(to_value(&eh)?)
}

/// Request a header for the block with a given height from the network.
pub async fn request_header_by_height(&self, height: u64) -> Result<JsValue> {
let eh = self.0.request_header_by_height(height).await?;
let eh = self.node.request_header_by_height(height).await?;
Ok(to_value(&eh)?)
}

Expand All @@ -136,7 +160,7 @@ impl WasmNode {
pub async fn request_verified_headers(&self, from: JsValue, amount: u64) -> Result<Array> {
let header =
from_value::<ExtendedHeader>(from).js_context("Parsing extended header failed")?;
let verified_headers = self.0.request_verified_headers(&header, amount).await?;
let verified_headers = self.node.request_verified_headers(&header, amount).await?;

Ok(verified_headers
.iter()
Expand All @@ -146,32 +170,32 @@ impl WasmNode {

/// Get current header syncing info.
pub async fn syncer_info(&self) -> Result<JsValue> {
let syncer_info = self.0.syncer_info().await?;
let syncer_info = self.node.syncer_info().await?;
Ok(to_value(&syncer_info)?)
}

/// Get the latest header announced in the network.
pub fn get_network_head_header(&self) -> Result<JsValue> {
let maybe_head_hedaer = self.0.get_network_head_header();
let maybe_head_hedaer = self.node.get_network_head_header();
Ok(to_value(&maybe_head_hedaer)?)
}

/// Get the latest locally synced header.
pub async fn get_local_head_header(&self) -> Result<JsValue> {
let local_head = self.0.get_local_head_header().await?;
let local_head = self.node.get_local_head_header().await?;
Ok(to_value(&local_head)?)
}

/// Get a synced header for the block with a given hash.
pub async fn get_header_by_hash(&self, hash: &str) -> Result<JsValue> {
let hash: Hash = hash.parse().js_context("parsing hash failed")?;
let eh = self.0.get_header_by_hash(&hash).await?;
let eh = self.node.get_header_by_hash(&hash).await?;
Ok(to_value(&eh)?)
}

/// Get a synced header for the block with a given height.
pub async fn get_header_by_height(&self, height: u64) -> Result<JsValue> {
let eh = self.0.get_header_by_height(height).await?;
let eh = self.node.get_header_by_height(height).await?;
Ok(to_value(&eh)?)
}

Expand All @@ -190,18 +214,18 @@ impl WasmNode {
end_height: Option<u64>,
) -> Result<JsValue> {
let headers = match (start_height, end_height) {
(None, None) => self.0.get_headers(..).await,
(Some(start), None) => self.0.get_headers(start..).await,
(None, Some(end)) => self.0.get_headers(..=end).await,
(Some(start), Some(end)) => self.0.get_headers(start..=end).await,
(None, None) => self.node.get_headers(..).await,
(Some(start), None) => self.node.get_headers(start..).await,
(None, Some(end)) => self.node.get_headers(..=end).await,
(Some(start), Some(end)) => self.node.get_headers(start..=end).await,
}?;

Ok(to_value(&headers)?)
}

/// Get data sampling metadata of an already sampled height.
pub async fn get_sampling_metadata(&self, height: u64) -> Result<JsValue> {
let metadata = self.0.get_sampling_metadata(height).await?;
let metadata = self.node.get_sampling_metadata(height).await?;

#[derive(Serialize)]
struct Intermediate {
Expand All @@ -216,6 +240,11 @@ impl WasmNode {

Ok(to_value(&metadata)?)
}

/// Returns a [`BroadcastChannel`] for events generated by [`Node`].
pub fn events_channel(&self) -> Result<BroadcastChannel> {
Ok(BroadcastChannel::new(&self.events_channel_name).unwrap())
}
}

#[wasm_bindgen(js_class = NodeConfig)]
Expand Down
8 changes: 8 additions & 0 deletions node-wasm/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing_subscriber::fmt::time::UtcTime;
use tracing_subscriber::prelude::*;
use tracing_web::{performance_layer, MakeConsoleWriter};
use wasm_bindgen::prelude::*;
use web_sys::Crypto;

/// Supported Celestia networks.
#[wasm_bindgen]
Expand Down Expand Up @@ -87,3 +88,10 @@ where
self.map_err(|e| JsError::new(&format!("{context}: {e}")))
}
}

pub(crate) fn get_crypto() -> Result<Crypto, JsError> {
js_sys::Reflect::get(&js_sys::global(), &JsValue::from_str("crypto"))
.map_err(|_| JsError::new("failed to get `crypto` from global object"))?
.dyn_into::<web_sys::Crypto>()
.map_err(|_| JsError::new("`crypto` is not `Crypto` type"))
}
Loading
Loading