Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Commit

Permalink
Switch from tungstenite-rs to tokio-tungstenite
Browse files Browse the repository at this point in the history
  • Loading branch information
EverlastingBugstopper committed Jan 9, 2020
1 parent 6aa8b0e commit f794855
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 57 deletions.
101 changes: 101 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ tokio = "0.2.0"
# chrome-devtools-rs = { path = "../chrome-devtools-rs" }
chrome-devtools-rs = { git = "https://github.com/everlastingbugstopper/chrome-devtools-rs", rev = "b7a0e9f" }
ws = "0.9.0"
futures = "0.3"
futures-util = "0.3"
tungstenite = "0.9"

[dependencies.tungstenite]
version = "0.9.2"
[dependencies.tokio-tungstenite]
git = "https://github.com/snapview/tokio-tungstenite"
rev = "adb4f43"
features = ["tls"]

[dev-dependencies]
Expand Down
5 changes: 4 additions & 1 deletion src/commands/dev/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ pub fn dev(
let preview_id = get_preview_id(target, user, &server_config, &session_id)?;

// create a new thread to listen for devtools messages
thread::spawn(move || socket::listen(session_id));
thread::spawn(move || {
let mut runtime = TokioRuntime::new().unwrap();
runtime.block_on(socket::listen(session_id)).unwrap();
});

// spawn tokio runtime on the main thread to handle incoming HTTP requests
let mut runtime = TokioRuntime::new()?;
Expand Down
103 changes: 49 additions & 54 deletions src/commands/dev/socket.rs
Original file line number Diff line number Diff line change
@@ -1,90 +1,85 @@
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::SystemTime;

use chrome_devtools::events::DevtoolsEvent;

use console::style;

use tungstenite::client::AutoStream;
use tungstenite::{connect, Message, WebSocket};
use futures::{future, pin_mut, StreamExt};
use futures_util::sink::SinkExt;

use tokio_tungstenite::connect_async;
use tungstenite::protocol::Message;

use url::Url;

pub fn listen(session_id: String) -> Result<(), failure::Error> {
const KEEP_ALIVE_INTERVAL: u64 = 10;

pub async fn listen(session_id: String) -> Result<(), failure::Error> {
let socket_url = format!("wss://rawhttp.cloudflareworkers.com/inspect/{}", session_id);
let socket_url = Url::parse(&socket_url)?;
let (socket, _) = connect(socket_url)?;

let socket = Arc::new(Mutex::new(socket));
let (ws_stream, _) = connect_async(socket_url)
.await
.expect("Failed to connect to devtools instance");

let (mut write, read) = ws_stream.split();

let enable_runtime = r#"{
"id": 1,
"method": "Runtime.enable"
}"#;
write.send(Message::Text(enable_runtime.into())).await?;

{
let socket = Arc::clone(&socket);
let mut socket = socket.lock().unwrap();
socket
.write_message(Message::Text(enable_runtime.into()))
.unwrap();
}

{
let socket = Arc::clone(&socket);
thread::spawn(move || keep_alive(socket));
}
let (keepalive_tx, keepalive_rx) = futures::channel::mpsc::unbounded();
tokio::spawn(keep_alive(keepalive_tx));
let keepalive_to_ws = keepalive_rx.map(Ok).forward(write);

loop {
let msg = socket
.lock()
.unwrap()
.read_message()
.expect("Error reading message from devtools")
.into_text()?;
log::info!("{}", msg);
let msg: Result<DevtoolsEvent, serde_json::Error> = serde_json::from_str(&msg);
match msg {
Ok(msg) => match msg {
DevtoolsEvent::ConsoleAPICalled(event) => match event.log_type.as_str() {
"log" => println!("{}", style(event).blue()),
"error" => eprintln!("{}", style(event).red()),
_ => println!("unknown console event: {}", event),
},
DevtoolsEvent::ExceptionThrown(event) => eprintln!("{}", style(event).bold().red()),
},
Err(e) => {
// this event was not parsed as a DevtoolsEvent
// TODO: change this to a warn after chrome-devtools-rs is parsing all messages
log::info!("this event was not parsed as a DevtoolsEvent:\n{}", e);
let ws_to_stdout = {
read.for_each(|msg| {
async {
let msg = msg.unwrap().into_text().unwrap();
log::info!("{}", msg);
let msg: Result<DevtoolsEvent, serde_json::Error> = serde_json::from_str(&msg);
match msg {
Ok(msg) => match msg {
DevtoolsEvent::ConsoleAPICalled(event) => match event.log_type.as_str() {
"log" => println!("{}", style(event).blue()),
"error" => eprintln!("{}", style(event).red()),
_ => println!("unknown console event: {}", event),
},
DevtoolsEvent::ExceptionThrown(event) => {
eprintln!("{}", style(event).bold().red())
}
},
Err(e) => {
// this event was not parsed as a DevtoolsEvent
// TODO: change this to a warn after chrome-devtools-rs is parsing all messages
log::info!("this event was not parsed as a DevtoolsEvent:\n{}", e);
}
}
}
}
}
})
};
pin_mut!(keepalive_to_ws, ws_to_stdout);
future::select(keepalive_to_ws, ws_to_stdout).await;
Ok(())
}

fn keep_alive(socket: Arc<Mutex<WebSocket<AutoStream>>>) {
async fn keep_alive(tx: futures::channel::mpsc::UnboundedSender<Message>) {
let mut keep_alive_time = SystemTime::now();
let mut id = 2;
loop {
let elapsed = keep_alive_time.elapsed().unwrap().as_secs();
println!("elapsed: {}", elapsed);
if elapsed >= 5 {
if elapsed >= KEEP_ALIVE_INTERVAL {
let keep_alive_message = format!(
r#"{{
"id": {},
"method": "Runtime.getIsolateId"
}}"#,
id
);
println!("before sending keepalive message");
{
let mut socket = socket.lock().unwrap();
socket
.write_message(Message::Text(keep_alive_message.into()))
.unwrap();
}
println!("after sending keepalive message");
tx.unbounded_send(Message::Text(keep_alive_message.into()))
.unwrap();
id += 1;
keep_alive_time = SystemTime::now();
}
Expand Down

0 comments on commit f794855

Please sign in to comment.