Skip to content

Commit

Permalink
Use a timer callback when re-establishing a connection
Browse files Browse the repository at this point in the history
  • Loading branch information
KiChjang committed Nov 11, 2016
1 parent 0b32b62 commit a5c2c0b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 67 deletions.
100 changes: 33 additions & 67 deletions components/script/dom/eventsource.rs
Expand Up @@ -17,14 +17,15 @@ use dom::globalscope::GlobalScope;
use dom::messageevent::MessageEvent;
use encoding::Encoding;
use encoding::all::UTF_8;
use euclid::length::Length;
use hyper::header::{Accept, qitem};
use ipc_channel::ipc;
use ipc_channel::router::ROUTER;
use js::conversions::ToJSValConvertible;
use js::jsapi::JSAutoCompartment;
use js::jsval::UndefinedValue;
use mime::{Mime, TopLevel, SubLevel};
use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseListener, NetworkError};
use net_traits::{CoreResourceMsg, FetchMetadata, FetchResponseMsg, FetchResponseListener, NetworkError};
use net_traits::request::{CacheMode, CorsSettings, CredentialsMode};
use net_traits::request::{RequestInit, RequestMode};
use network_listener::{NetworkListener, PreInvoke};
Expand All @@ -34,10 +35,8 @@ use std::cell::Cell;
use std::mem;
use std::str::{Chars, FromStr};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Sender, channel};
use std::thread;
use std::time::Duration;
use task_source::TaskSource;
use timers::OneshotTimerCallback;
use url::Url;

header! { (LastEventId, "Last-Event-ID") => [String] }
Expand Down Expand Up @@ -78,6 +77,8 @@ enum ParserState {
struct EventSourceContext {
event_source: Trusted<EventSource>,
gen_id: GenerationId,
action_sender: ipc::IpcSender<FetchResponseMsg>,

parser_state: ParserState,
field: String,
value: String,
Expand Down Expand Up @@ -114,36 +115,16 @@ impl EventSourceContext {
// https://html.spec.whatwg.org/multipage/#reestablish-the-connection
fn reestablish_the_connection(&self) {
let event_source = self.event_source.root();
let (sender, receiver) = channel();
// Step 1
let runnable = box ReestablishConnectionRunnable {
event_source: self.event_source.clone(),
done_chan: sender
};
if self.gen_id != self.event_source.root().generation_id.get() {
return;
}
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
// Step 2
thread::sleep(Duration::from_millis(event_source.reconnection_time.get()));
// TODO Step 3: Optionally wait some more
// Step 4
if self.gen_id != self.event_source.root().generation_id.get() {

if self.gen_id != event_source.generation_id.get() {
return;
}
let _ = receiver.recv();
// Step 5
let runnable = box RefetchRequestRunnable {
event_source: self.event_source.clone(),
gen_id: self.gen_id,

event_type: self.event_type.clone(),
data: self.data.clone(),
last_event_id: self.last_event_id.clone(),
// Step 1
let runnable = box ReestablishConnectionRunnable {
event_source: self.event_source.clone(),
action_sender: self.action_sender.clone()
};
if self.gen_id != self.event_source.root().generation_id.get() {
return;
}
let _ = event_source.global().networking_task_source().queue(runnable, &*event_source.global());
}

Expand Down Expand Up @@ -317,7 +298,7 @@ impl FetchResponseListener for EventSourceContext {
}

fn process_response_eof(&mut self, _response: Result<(), NetworkError>) {

self.reestablish_the_connection();
}
}

Expand Down Expand Up @@ -394,9 +375,12 @@ impl EventSource {
// Step 12
*ev.request.borrow_mut() = Some(request.clone());
// Step 14
let (action_sender, action_receiver) = ipc::channel().unwrap();
let context = EventSourceContext {
event_source: Trusted::new(&ev),
gen_id: ev.generation_id.get(),
action_sender: action_sender.clone(),

parser_state: ParserState::Eol,
field: String::new(),
value: String::new(),
Expand All @@ -411,7 +395,6 @@ impl EventSource {
task_source: global.networking_task_source(),
wrapper: Some(global.get_runnable_wrapper())
};
let (action_sender, action_receiver) = ipc::channel().unwrap();
ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
listener.notify_fetch(message.to().unwrap());
});
Expand Down Expand Up @@ -490,7 +473,7 @@ impl Runnable for FailConnectionRunnable {

pub struct ReestablishConnectionRunnable {
event_source: Trusted<EventSource>,
done_chan: Sender<()>
action_sender: ipc::IpcSender<FetchResponseMsg>,
}

impl Runnable for ReestablishConnectionRunnable {
Expand All @@ -501,31 +484,35 @@ impl Runnable for ReestablishConnectionRunnable {
let event_source = self.event_source.root();
// Step 1.1
if event_source.ready_state.get() == ReadyState::Closed {
self.done_chan.send(()).unwrap();
return;
}
// Step 1.2
event_source.ready_state.set(ReadyState::Connecting);
// Step 1.3
event_source.upcast::<EventTarget>().fire_event(atom!("error"));
self.done_chan.send(()).unwrap();
// Step 2
let duration = Length::new(event_source.reconnection_time.get());
// TODO Step 3: Optionally wait some more
// Steps 4-5
let callback = OneshotTimerCallback::EventSourceTimeout(EventSourceTimeoutCallback {
event_source: self.event_source.clone(),
action_sender: self.action_sender.clone()
});
let _ = event_source.global().schedule_callback(callback, duration);
}
}

pub struct RefetchRequestRunnable {
#[derive(JSTraceable, HeapSizeOf)]
pub struct EventSourceTimeoutCallback {
#[ignore_heap_size_of = "Because it is non-owning"]
event_source: Trusted<EventSource>,
gen_id: GenerationId,

event_type: String,
data: String,
last_event_id: String,
#[ignore_heap_size_of = "Because it is non-owning"]
action_sender: ipc::IpcSender<FetchResponseMsg>,
}

impl Runnable for RefetchRequestRunnable {
fn name(&self) -> &'static str { "EventSource RefetchRequestRunnable" }

impl EventSourceTimeoutCallback {
// https://html.spec.whatwg.org/multipage/#reestablish-the-connection
fn handler(self: Box<RefetchRequestRunnable>) {
pub fn invoke(self) {
let event_source = self.event_source.root();
let global = event_source.global();
// Step 5.1
Expand All @@ -539,28 +526,7 @@ impl Runnable for RefetchRequestRunnable {
request.headers.set(LastEventId(String::from(event_source.last_event_id.borrow().clone())));
}
// Step 5.4
let context = EventSourceContext {
event_source: self.event_source.clone(),
gen_id: self.gen_id,
parser_state: ParserState::Eol,
field: String::new(),
value: String::new(),
origin: String::new(),

event_type: self.event_type.clone(),
data: self.data.clone(),
last_event_id: self.last_event_id.clone()
};
let listener = NetworkListener {
context: Arc::new(Mutex::new(context)),
task_source: global.networking_task_source(),
wrapper: Some(global.get_runnable_wrapper())
};
let (action_sender, action_receiver) = ipc::channel().unwrap();
ROUTER.add_route(action_receiver.to_opaque(), box move |message| {
listener.notify_fetch(message.to().unwrap());
});
global.core_resource_thread().send(CoreResourceMsg::Fetch(request, action_sender)).unwrap();
global.core_resource_thread().send(CoreResourceMsg::Fetch(request, self.action_sender)).unwrap();
}
}

Expand Down
3 changes: 3 additions & 0 deletions components/script/timers.rs
Expand Up @@ -7,6 +7,7 @@ use dom::bindings::cell::DOMRefCell;
use dom::bindings::codegen::Bindings::FunctionBinding::Function;
use dom::bindings::reflector::Reflectable;
use dom::bindings::str::DOMString;
use dom::eventsource::EventSourceTimeoutCallback;
use dom::globalscope::GlobalScope;
use dom::testbinding::TestBindingCallback;
use dom::xmlhttprequest::XHRTimeoutCallback;
Expand Down Expand Up @@ -67,6 +68,7 @@ struct OneshotTimer {
#[derive(JSTraceable, HeapSizeOf)]
pub enum OneshotTimerCallback {
XhrTimeout(XHRTimeoutCallback),
EventSourceTimeout(EventSourceTimeoutCallback),
JsTimer(JsTimerTask),
TestBindingCallback(TestBindingCallback),
}
Expand All @@ -75,6 +77,7 @@ impl OneshotTimerCallback {
fn invoke<T: Reflectable>(self, this: &T, js_timers: &JsTimers) {
match self {
OneshotTimerCallback::XhrTimeout(callback) => callback.invoke(),
OneshotTimerCallback::EventSourceTimeout(callback) => callback.invoke(),
OneshotTimerCallback::JsTimer(task) => task.invoke(this, js_timers),
OneshotTimerCallback::TestBindingCallback(callback) => callback.invoke(),
}
Expand Down

0 comments on commit a5c2c0b

Please sign in to comment.