Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't always expect a PING on connect #96

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 37 additions & 30 deletions libs/wasm-loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use log::*;
use serde::Deserialize;
use std::fmt::Debug;
use std::io::Read;
use std::{fs::File, path::Path};
use std::{fs::File, net::TcpStream, path::Path};
use subrpcer::state;
use tungstenite::Message;
use tungstenite::{stream::MaybeTlsStream, Message, WebSocket};

const CODE: &str = "0x3a636f6465"; // :code in hex
pub const CODE_BLOB_BOMB_LIMIT: usize = 50 * 1024 * 1024;
Expand All @@ -36,24 +36,43 @@ pub struct WasmLoader {
compression: Compression,
}

impl WasmLoader {
/// Fetch the wasm blob from a node
fn fetch_wasm_from_rpc(reference: &OnchainBlock) -> Result<WasmBytes> {
#[derive(Deserialize)]
struct Response {
result: String,
}
#[derive(Deserialize)]
struct Response {
result: String,
}

fn map_err<O, E1, E2>(r: std::result::Result<O, E1>, e: E2) -> std::result::Result<O, E2>
where
E1: Debug,
{
r.map_err(|e_| {
eprintln!("{e_:?}");
e
})
fn map_err<O, E1, E2>(r: std::result::Result<O, E1>, e: E2) -> std::result::Result<O, E2>
where
E1: Debug,
{
r.map_err(|e_| {
eprintln!("{e_:?}");
e
})
}

impl WasmLoader {
fn read_wasm_hex_response_from_web_socket(
mut ws: WebSocket<MaybeTlsStream<TcpStream>>,
url: &String,
) -> Result<String> {
match map_err(ws.read(), WasmLoaderError::WsClient(url.to_string()))? {
Message::Text(t) => {
Ok(serde_json::from_str::<Response>(&t).map(|r| r.result).expect("unexpected response from node"))
}
Message::Ping(_) => {
log::debug!("Got ping from node, retrying.");
Self::read_wasm_hex_response_from_web_socket(ws, url)
}
m => {
log::warn!("Got unexpected message {:?} from node, retrying.", m);
Self::read_wasm_hex_response_from_web_socket(ws, url)
}
}
}

/// Fetch the wasm blob from a node
fn fetch_wasm_from_rpc(reference: &OnchainBlock) -> Result<WasmBytes> {
let block_ref = reference.block_ref.as_ref();
let data = state::get_storage(0, CODE, block_ref);
let wasm_hex = match &reference.endpoint {
Expand All @@ -70,19 +89,7 @@ impl WasmLoader {
ws.send(Message::Binary(serde_json::to_vec(&data).expect("invalid data"))),
WasmLoaderError::WsClient(url.to_string()),
)?;

let mut wasm_hex = None;

// One for Ping, one for response.
for _ in 0..2_u8 {
let Message::Text(t) = map_err(ws.read(), WasmLoaderError::WsClient(url.to_string()))? else {
continue;
};

wasm_hex = serde_json::from_str::<Response>(&t).map(|r| r.result).ok();
}

wasm_hex.expect("unexpected response from node")
Self::read_wasm_hex_response_from_web_socket(ws, url)?
}
};
let wasm = array_bytes::hex2bytes(wasm_hex).expect("Decoding bytes");
Expand Down
Loading