From 780a1bd6cbafd8961d575385bee4c9e06376061b Mon Sep 17 00:00:00 2001 From: Gregory Terzian Date: Tue, 11 Feb 2020 15:00:27 +0800 Subject: [PATCH] add a core resource thread-pool --- components/net/fetch/methods.rs | 4 +- components/net/filemanager_thread.rs | 414 +++++++++++---------- components/net/resource_thread.rs | 151 +++++++- components/net/tests/fetch.rs | 16 +- components/net/tests/filemanager_thread.rs | 6 +- components/net/tests/http_loader.rs | 16 +- components/net/tests/main.rs | 11 +- 7 files changed, 399 insertions(+), 219 deletions(-) diff --git a/components/net/fetch/methods.rs b/components/net/fetch/methods.rs index e8c35e4a9015..55cef372c1bc 100644 --- a/components/net/fetch/methods.rs +++ b/components/net/fetch/methods.rs @@ -4,7 +4,7 @@ use crate::data_loader::decode; use crate::fetch::cors_cache::CorsCache; -use crate::filemanager_thread::{fetch_file_in_chunks, FileManager, FILE_CHUNK_SIZE}; +use crate::filemanager_thread::{FileManager, FILE_CHUNK_SIZE}; use crate::http_loader::{determine_request_referrer, http_fetch, HttpState}; use crate::http_loader::{set_default_accept, set_default_accept_language}; use crate::subresource_integrity::is_response_integrity_valid; @@ -701,7 +701,7 @@ fn scheme_fetch( *done_chan = Some((done_sender.clone(), done_receiver)); *response.body.lock().unwrap() = ResponseBody::Receiving(vec![]); - fetch_file_in_chunks( + context.filemanager.fetch_file_in_chunks( done_sender, reader, response.body.clone(), diff --git a/components/net/filemanager_thread.rs b/components/net/filemanager_thread.rs index 0672f0cfc65b..d765c37ce325 100644 --- a/components/net/filemanager_thread.rs +++ b/components/net/filemanager_thread.rs @@ -3,6 +3,7 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ use crate::fetch::methods::{CancellationListener, Data, RangeRequestBounds}; +use crate::resource_thread::CoreResourceThreadPool; use crossbeam_channel::Sender; use embedder_traits::{EmbedderMsg, EmbedderProxy, FilterPattern}; use headers::{ContentLength, ContentType, HeaderMap, HeaderMapExt}; @@ -24,8 +25,7 @@ use std::mem; use std::ops::Index; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, RwLock}; -use std::thread; +use std::sync::{Arc, Mutex, RwLock, Weak}; use url::Url; use uuid::Uuid; @@ -72,13 +72,18 @@ enum FileImpl { pub struct FileManager { embedder_proxy: EmbedderProxy, store: Arc, + thread_pool: Weak, } impl FileManager { - pub fn new(embedder_proxy: EmbedderProxy) -> FileManager { + pub fn new( + embedder_proxy: EmbedderProxy, + pool_handle: Weak, + ) -> FileManager { FileManager { embedder_proxy: embedder_proxy, store: Arc::new(FileManagerStore::new()), + thread_pool: pool_handle, } } @@ -90,14 +95,19 @@ impl FileManager { origin: FileOrigin, ) { let store = self.store.clone(); - thread::Builder::new() - .name("read file".to_owned()) - .spawn(move || { - if let Err(e) = store.try_read_file(&sender, id, check_url_validity, origin) { - let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e))); - } + self.thread_pool + .upgrade() + .and_then(|pool| { + pool.spawn(move || { + if let Err(e) = store.try_read_file(&sender, id, check_url_validity, origin) { + let _ = sender.send(Err(FileManagerThreadError::BlobURLStoreError(e))); + } + }); + Some(()) }) - .expect("Thread spawning failed"); + .unwrap_or_else(|| { + warn!("FileManager tried to read a file after CoreResourceManager has exited."); + }); } // Read a file for the Fetch implementation. @@ -113,7 +123,7 @@ impl FileManager { response: &mut Response, range: RangeRequestBounds, ) -> Result<(), BlobURLStoreError> { - self.store.fetch_blob_buf( + self.fetch_blob_buf( done_sender, cancellation_listener, &id, @@ -134,22 +144,36 @@ impl FileManager { FileManagerThreadMsg::SelectFile(filter, sender, origin, opt_test_path) => { let store = self.store.clone(); let embedder = self.embedder_proxy.clone(); - thread::Builder::new() - .name("select file".to_owned()) - .spawn(move || { - store.select_file(filter, sender, origin, opt_test_path, embedder); + self.thread_pool + .upgrade() + .and_then(|pool| { + pool.spawn(move || { + store.select_file(filter, sender, origin, opt_test_path, embedder); + }); + Some(()) }) - .expect("Thread spawning failed"); + .unwrap_or_else(|| { + warn!( + "FileManager tried to select a file after CoreResourceManager has exited." + ); + }); }, FileManagerThreadMsg::SelectFiles(filter, sender, origin, opt_test_paths) => { let store = self.store.clone(); let embedder = self.embedder_proxy.clone(); - thread::Builder::new() - .name("select files".to_owned()) - .spawn(move || { - store.select_files(filter, sender, origin, opt_test_paths, embedder); + self.thread_pool + .upgrade() + .and_then(|pool| { + pool.spawn(move || { + store.select_files(filter, sender, origin, opt_test_paths, embedder); + }); + Some(()) }) - .expect("Thread spawning failed"); + .unwrap_or_else(|| { + warn!( + "FileManager tried to select multiple files after CoreResourceManager has exited." + ); + }); }, FileManagerThreadMsg::ReadFile(sender, id, check_url_validity, origin) => { self.read_file(sender, id, check_url_validity, origin); @@ -171,6 +195,183 @@ impl FileManager { }, } } + + pub fn fetch_file_in_chunks( + &self, + done_sender: Sender, + mut reader: BufReader, + res_body: ServoArc>, + cancellation_listener: Arc>, + range: RelativePos, + ) { + self.thread_pool + .upgrade() + .and_then(|pool| { + pool.spawn(move || { + loop { + if cancellation_listener.lock().unwrap().cancelled() { + *res_body.lock().unwrap() = ResponseBody::Done(vec![]); + let _ = done_sender.send(Data::Cancelled); + return; + } + let length = { + let buffer = reader.fill_buf().unwrap().to_vec(); + let mut buffer_len = buffer.len(); + if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() + { + let offset = usize::min( + { + if let Some(end) = range.end { + // HTTP Range requests are specified with closed ranges, + // while Rust uses half-open ranges. We add +1 here so + // we don't skip the last requested byte. + let remaining_bytes = + end as usize - range.start as usize - body.len() + + 1; + if remaining_bytes <= FILE_CHUNK_SIZE { + // This is the last chunk so we set buffer + // len to 0 to break the reading loop. + buffer_len = 0; + remaining_bytes + } else { + FILE_CHUNK_SIZE + } + } else { + FILE_CHUNK_SIZE + } + }, + buffer.len(), + ); + let chunk = &buffer[0..offset]; + body.extend_from_slice(chunk); + let _ = done_sender.send(Data::Payload(chunk.to_vec())); + } + buffer_len + }; + if length == 0 { + let mut body = res_body.lock().unwrap(); + let completed_body = match *body { + ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]), + _ => vec![], + }; + *body = ResponseBody::Done(completed_body); + let _ = done_sender.send(Data::Done); + break; + } + reader.consume(length); + } + }); + Some(()) + }) + .unwrap_or_else(|| { + warn!("FileManager tried to fetch a file in chunks after CoreResourceManager has exited."); + }); + } + + fn fetch_blob_buf( + &self, + done_sender: &Sender, + cancellation_listener: Arc>, + id: &Uuid, + origin_in: &FileOrigin, + range: RangeRequestBounds, + check_url_validity: bool, + response: &mut Response, + ) -> Result<(), BlobURLStoreError> { + let file_impl = self.store.get_impl(id, origin_in, check_url_validity)?; + match file_impl { + FileImpl::Memory(buf) => { + let range = match range.get_final(Some(buf.size)) { + Ok(range) => range, + Err(_) => { + return Err(BlobURLStoreError::InvalidRange); + }, + }; + + let range = range.to_abs_range(buf.size as usize); + let len = range.len() as u64; + + set_headers( + &mut response.headers, + len, + buf.type_string.parse().unwrap_or(mime::TEXT_PLAIN), + /* filename */ None, + ); + + let mut bytes = vec![]; + bytes.extend_from_slice(buf.bytes.index(range)); + + let _ = done_sender.send(Data::Payload(bytes)); + let _ = done_sender.send(Data::Done); + + Ok(()) + }, + FileImpl::MetaDataOnly(metadata) => { + /* XXX: Snapshot state check (optional) https://w3c.github.io/FileAPI/#snapshot-state. + Concretely, here we create another file, and this file might not + has the same underlying file state (meta-info plus content) as the time + create_entry is called. + */ + + let file = File::open(&metadata.path) + .map_err(|e| BlobURLStoreError::External(e.to_string()))?; + + let range = match range.get_final(Some(metadata.size)) { + Ok(range) => range, + Err(_) => { + return Err(BlobURLStoreError::InvalidRange); + }, + }; + + let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); + if reader.seek(SeekFrom::Start(range.start as u64)).is_err() { + return Err(BlobURLStoreError::External( + "Unexpected method for blob".into(), + )); + } + + let filename = metadata + .path + .file_name() + .and_then(|osstr| osstr.to_str()) + .map(|s| s.to_string()); + + set_headers( + &mut response.headers, + metadata.size, + mime_guess::from_path(metadata.path) + .first() + .unwrap_or(mime::TEXT_PLAIN), + filename, + ); + + self.fetch_file_in_chunks( + done_sender.clone(), + reader, + response.body.clone(), + cancellation_listener, + range, + ); + + Ok(()) + }, + FileImpl::Sliced(parent_id, inner_rel_pos) => { + // Next time we don't need to check validity since + // we have already done that for requesting URL if necessary. + return self.fetch_blob_buf( + done_sender, + cancellation_listener, + &parent_id, + origin_in, + RangeRequestBounds::Final( + RelativePos::full_range().slice_inner(&inner_rel_pos), + ), + false, + response, + ); + }, + } + } } /// File manager's data store. It maintains a thread-safe mapping @@ -188,7 +389,7 @@ impl FileManagerStore { } /// Copy out the file backend implementation content - fn get_impl( + pub fn get_impl( &self, id: &Uuid, origin_in: &FileOrigin, @@ -510,111 +711,6 @@ impl FileManagerStore { ) } - fn fetch_blob_buf( - &self, - done_sender: &Sender, - cancellation_listener: Arc>, - id: &Uuid, - origin_in: &FileOrigin, - range: RangeRequestBounds, - check_url_validity: bool, - response: &mut Response, - ) -> Result<(), BlobURLStoreError> { - let file_impl = self.get_impl(id, origin_in, check_url_validity)?; - match file_impl { - FileImpl::Memory(buf) => { - let range = match range.get_final(Some(buf.size)) { - Ok(range) => range, - Err(_) => { - return Err(BlobURLStoreError::InvalidRange); - }, - }; - - let range = range.to_abs_range(buf.size as usize); - let len = range.len() as u64; - - set_headers( - &mut response.headers, - len, - buf.type_string.parse().unwrap_or(mime::TEXT_PLAIN), - /* filename */ None, - ); - - let mut bytes = vec![]; - bytes.extend_from_slice(buf.bytes.index(range)); - - let _ = done_sender.send(Data::Payload(bytes)); - let _ = done_sender.send(Data::Done); - - Ok(()) - }, - FileImpl::MetaDataOnly(metadata) => { - /* XXX: Snapshot state check (optional) https://w3c.github.io/FileAPI/#snapshot-state. - Concretely, here we create another file, and this file might not - has the same underlying file state (meta-info plus content) as the time - create_entry is called. - */ - - let file = File::open(&metadata.path) - .map_err(|e| BlobURLStoreError::External(e.to_string()))?; - - let range = match range.get_final(Some(metadata.size)) { - Ok(range) => range, - Err(_) => { - return Err(BlobURLStoreError::InvalidRange); - }, - }; - - let mut reader = BufReader::with_capacity(FILE_CHUNK_SIZE, file); - if reader.seek(SeekFrom::Start(range.start as u64)).is_err() { - return Err(BlobURLStoreError::External( - "Unexpected method for blob".into(), - )); - } - - let filename = metadata - .path - .file_name() - .and_then(|osstr| osstr.to_str()) - .map(|s| s.to_string()); - - set_headers( - &mut response.headers, - metadata.size, - mime_guess::from_path(metadata.path) - .first() - .unwrap_or(mime::TEXT_PLAIN), - filename, - ); - - fetch_file_in_chunks( - done_sender.clone(), - reader, - response.body.clone(), - cancellation_listener, - range, - ); - - Ok(()) - }, - FileImpl::Sliced(parent_id, inner_rel_pos) => { - // Next time we don't need to check validity since - // we have already done that for requesting URL if necessary. - return self.fetch_blob_buf( - done_sender, - cancellation_listener, - &parent_id, - origin_in, - RangeRequestBounds::Final( - RelativePos::full_range().slice_inner(&inner_rel_pos), - ), - false, - response, - ); - }, - } - } - fn dec_ref(&self, id: &Uuid, origin_in: &FileOrigin) -> Result<(), BlobURLStoreError> { let (do_remove, opt_parent_id) = match self.entries.read().unwrap().get(id) { Some(entry) => { @@ -763,70 +859,6 @@ fn read_file_in_chunks( } } -pub fn fetch_file_in_chunks( - done_sender: Sender, - mut reader: BufReader, - res_body: ServoArc>, - cancellation_listener: Arc>, - range: RelativePos, -) { - thread::Builder::new() - .name("fetch file worker thread".to_string()) - .spawn(move || { - loop { - if cancellation_listener.lock().unwrap().cancelled() { - *res_body.lock().unwrap() = ResponseBody::Done(vec![]); - let _ = done_sender.send(Data::Cancelled); - return; - } - let length = { - let buffer = reader.fill_buf().unwrap().to_vec(); - let mut buffer_len = buffer.len(); - if let ResponseBody::Receiving(ref mut body) = *res_body.lock().unwrap() { - let offset = usize::min( - { - if let Some(end) = range.end { - // HTTP Range requests are specified with closed ranges, - // while Rust uses half-open ranges. We add +1 here so - // we don't skip the last requested byte. - let remaining_bytes = - end as usize - range.start as usize - body.len() + 1; - if remaining_bytes <= FILE_CHUNK_SIZE { - // This is the last chunk so we set buffer - // len to 0 to break the reading loop. - buffer_len = 0; - remaining_bytes - } else { - FILE_CHUNK_SIZE - } - } else { - FILE_CHUNK_SIZE - } - }, - buffer.len(), - ); - let chunk = &buffer[0..offset]; - body.extend_from_slice(chunk); - let _ = done_sender.send(Data::Payload(chunk.to_vec())); - } - buffer_len - }; - if length == 0 { - let mut body = res_body.lock().unwrap(); - let completed_body = match *body { - ResponseBody::Receiving(ref mut body) => mem::replace(body, vec![]), - _ => vec![], - }; - *body = ResponseBody::Done(completed_body); - let _ = done_sender.send(Data::Done); - break; - } - reader.consume(length); - } - }) - .expect("Failed to create fetch file worker thread"); -} - fn set_headers(headers: &mut HeaderMap, content_length: u64, mime: Mime, filename: Option) { headers.typed_insert(ContentLength(content_length)); headers.typed_insert(ContentType::from(mime.clone())); diff --git a/components/net/resource_thread.rs b/components/net/resource_thread.rs index bdbc815baa49..0bf17e7570f7 100644 --- a/components/net/resource_thread.rs +++ b/components/net/resource_thread.rs @@ -46,6 +46,7 @@ use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, RwLock}; use std::thread; +use std::time::Duration; /// Returns a tuple of (public, private) senders to the new threads. pub fn new_resource_threads( @@ -345,6 +346,7 @@ impl ResourceChannelManager { Err(_) => warn!("Error writing hsts list to disk"), } } + self.resource_manager.exit(); let _ = sender.send(()); return false; }, @@ -429,10 +431,135 @@ pub struct CoreResourceManager { devtools_chan: Option>, swmanager_chan: Option>, filemanager: FileManager, - fetch_pool: rayon::ThreadPool, + thread_pool: Arc, certificate_path: Option, } +/// The state of the thread-pool used by CoreResource. +struct ThreadPoolState { + /// The number of active workers. + active_workers: u32, + /// Whether the pool can spawn additional work. + active: bool, +} + +impl ThreadPoolState { + pub fn new() -> ThreadPoolState { + ThreadPoolState { + active_workers: 0, + active: true, + } + } + + /// Is the pool still able to spawn new work? + pub fn is_active(&self) -> bool { + self.active + } + + /// How many workers are currently active? + pub fn active_workers(&self) -> u32 { + self.active_workers + } + + /// Prevent additional work from being spawned. + pub fn switch_to_inactive(&mut self) { + self.active = false; + } + + /// Add to the count of active workers. + pub fn increment_active(&mut self) { + self.active_workers += 1; + } + + /// Substract from the count of active workers. + pub fn decrement_active(&mut self) { + self.active_workers -= 1; + } +} + +/// Threadpool used by Fetch and file operations. +pub struct CoreResourceThreadPool { + pool: rayon::ThreadPool, + state: Arc>, +} + +impl CoreResourceThreadPool { + pub fn new(num_threads: usize) -> CoreResourceThreadPool { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .unwrap(); + let state = Arc::new(Mutex::new(ThreadPoolState::new())); + CoreResourceThreadPool { pool: pool, state } + } + + /// Spawn work on the thread-pool, if still active. + /// + /// There is no need to give feedback to the caller, + /// because if we do not perform work, + /// it is because the system as a whole is exiting. + pub fn spawn(&self, work: OP) + where + OP: FnOnce() + Send + 'static, + { + { + let mut state = self.state.lock().unwrap(); + if state.is_active() { + state.increment_active(); + } else { + // Don't spawn any work. + return; + } + } + + let state = self.state.clone(); + + self.pool.spawn(move || { + { + let mut state = state.lock().unwrap(); + if !state.is_active() { + // Decrement number of active workers and return, + // without doing any work. + return state.decrement_active(); + } + } + // Perform work. + work(); + { + // Decrement number of active workers. + let mut state = state.lock().unwrap(); + state.decrement_active(); + } + }); + } + + /// Prevent further work from being spawned, + /// and wait until all workers are done, + /// or a timeout of roughly one second has been reached. + pub fn exit(&self) { + { + let mut state = self.state.lock().unwrap(); + state.switch_to_inactive(); + } + let mut rounds = 0; + loop { + rounds += 1; + { + let state = self.state.lock().unwrap(); + let still_active = state.active_workers(); + + if still_active == 0 || rounds == 10 { + if still_active > 0 { + debug!("Exiting CoreResourceThreadPool with {:?} still working(should be zero)", still_active); + } + break; + } + } + thread::sleep(Duration::from_millis(100)); + } + } +} + impl CoreResourceManager { pub fn new( user_agent: Cow<'static, str>, @@ -441,20 +568,28 @@ impl CoreResourceManager { embedder_proxy: EmbedderProxy, certificate_path: Option, ) -> CoreResourceManager { - let pool = rayon::ThreadPoolBuilder::new() - .num_threads(16) - .build() - .unwrap(); + let pool = CoreResourceThreadPool::new(16); + let pool_handle = Arc::new(pool); CoreResourceManager { user_agent: user_agent, devtools_chan: devtools_channel, swmanager_chan: None, - filemanager: FileManager::new(embedder_proxy), - fetch_pool: pool, + filemanager: FileManager::new(embedder_proxy, Arc::downgrade(&pool_handle)), + thread_pool: pool_handle, certificate_path, } } + /// Exit the core resource manager. + pub fn exit(&mut self) { + // Prevents further work from being spawned on the pool, + // blocks until all workers in the pool are done, + // or a short timeout has been reached. + self.thread_pool.exit(); + + debug!("Exited CoreResourceManager"); + } + fn set_cookie_for_url( &mut self, request: &ServoUrl, @@ -486,7 +621,7 @@ impl CoreResourceManager { _ => ResourceTimingType::Resource, }; - self.fetch_pool.spawn(move || { + self.thread_pool.spawn(move || { let mut request = request_builder.build(); // XXXManishearth: Check origin against pipeline id (also ensure that the mode is allowed) // todo load context / mimesniff in fetch diff --git a/components/net/tests/fetch.rs b/components/net/tests/fetch.rs index eddd8f67779e..b936c70f3a5e 100644 --- a/components/net/tests/fetch.rs +++ b/components/net/tests/fetch.rs @@ -27,6 +27,7 @@ use net::fetch::cors_cache::CorsCache; use net::fetch::methods::{self, CancellationListener, FetchContext}; use net::filemanager_thread::FileManager; use net::hsts::HstsEntry; +use net::resource_thread::CoreResourceThreadPool; use net::test::HttpState; use net_traits::request::{ Destination, Origin, RedirectMode, Referrer, Request, RequestBuilder, RequestMode, @@ -42,7 +43,7 @@ use std::fs; use std::iter::FromIterator; use std::path::Path; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; use std::time::{Duration, SystemTime}; use uuid::Uuid; @@ -154,7 +155,7 @@ fn test_fetch_blob() { } } - let context = new_fetch_context(None, None); + let context = new_fetch_context(None, None, None); let bytes = b"content"; let blob_buf = BlobBuf { @@ -215,9 +216,14 @@ fn test_file() { let origin = Origin::Origin(url.origin()); let mut request = Request::new(url, Some(origin), None); - let fetch_response = fetch(&mut request, None); + let pool = CoreResourceThreadPool::new(1); + let pool_handle = Arc::new(pool); + let mut context = new_fetch_context(None, None, Some(Arc::downgrade(&pool_handle))); + let fetch_response = fetch_with_context(&mut request, &mut context); + // We should see an opaque-filtered response. assert_eq!(fetch_response.response_type, ResponseType::Opaque); + assert!(!fetch_response.is_network_error()); assert_eq!(fetch_response.headers.len(), 0); let resp_body = fetch_response.body.lock().unwrap(); @@ -676,7 +682,7 @@ fn test_fetch_with_hsts() { state: Arc::new(HttpState::new(tls_config)), user_agent: DEFAULT_USER_AGENT.into(), devtools_chan: None, - filemanager: FileManager::new(create_embedder_proxy()), + filemanager: FileManager::new(create_embedder_proxy(), Weak::new()), cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))), timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new( ResourceTimingType::Navigation, @@ -728,7 +734,7 @@ fn test_load_adds_host_to_hsts_list_when_url_is_https() { state: Arc::new(HttpState::new(tls_config)), user_agent: DEFAULT_USER_AGENT.into(), devtools_chan: None, - filemanager: FileManager::new(create_embedder_proxy()), + filemanager: FileManager::new(create_embedder_proxy(), Weak::new()), cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))), timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new( ResourceTimingType::Navigation, diff --git a/components/net/tests/filemanager_thread.rs b/components/net/tests/filemanager_thread.rs index baf38a80392f..9438fcc88eeb 100644 --- a/components/net/tests/filemanager_thread.rs +++ b/components/net/tests/filemanager_thread.rs @@ -6,6 +6,7 @@ use crate::create_embedder_proxy; use embedder_traits::FilterPattern; use ipc_channel::ipc; use net::filemanager_thread::FileManager; +use net::resource_thread::CoreResourceThreadPool; use net_traits::blob_url_store::BlobURLStoreError; use net_traits::filemanager_thread::{ FileManagerThreadError, FileManagerThreadMsg, ReadFileProgress, @@ -14,10 +15,13 @@ use servo_config::set_pref; use std::fs::File; use std::io::Read; use std::path::PathBuf; +use std::sync::Arc; #[test] fn test_filemanager() { - let filemanager = FileManager::new(create_embedder_proxy()); + let pool = CoreResourceThreadPool::new(1); + let pool_handle = Arc::new(pool); + let filemanager = FileManager::new(create_embedder_proxy(), Arc::downgrade(&pool_handle)); set_pref!(dom.testing.html_input_element.select_files.enabled, true); // Try to open a dummy file "components/net/tests/test.jpeg" in tree diff --git a/components/net/tests/http_loader.rs b/components/net/tests/http_loader.rs index 7c9bc9ee470a..ed1b2939b597 100644 --- a/components/net/tests/http_loader.rs +++ b/components/net/tests/http_loader.rs @@ -562,7 +562,7 @@ fn test_load_doesnt_add_host_to_hsts_list_when_url_is_http_even_if_hsts_headers_ .pipeline_id(Some(TEST_PIPELINE_ID)) .build(); - let mut context = new_fetch_context(None, None); + let mut context = new_fetch_context(None, None, None); let response = fetch_with_context(&mut request, &mut context); let _ = server.close(); @@ -596,7 +596,7 @@ fn test_load_sets_cookies_in_the_resource_manager_when_it_get_set_cookie_header_ }; let (server, url) = make_server(handler); - let mut context = new_fetch_context(None, None); + let mut context = new_fetch_context(None, None, None); assert_cookie_for_domain(&context.state.cookie_jar, url.as_str(), None); @@ -639,7 +639,7 @@ fn test_load_sets_requests_cookies_header_for_url_by_getting_cookies_from_the_re }; let (server, url) = make_server(handler); - let mut context = new_fetch_context(None, None); + let mut context = new_fetch_context(None, None, None); { let mut cookie_jar = context.state.cookie_jar.write().unwrap(); @@ -685,7 +685,7 @@ fn test_load_sends_cookie_if_nonhttp() { }; let (server, url) = make_server(handler); - let mut context = new_fetch_context(None, None); + let mut context = new_fetch_context(None, None, None); { let mut cookie_jar = context.state.cookie_jar.write().unwrap(); @@ -731,7 +731,7 @@ fn test_cookie_set_with_httponly_should_not_be_available_using_getcookiesforurl( }; let (server, url) = make_server(handler); - let mut context = new_fetch_context(None, None); + let mut context = new_fetch_context(None, None, None); assert_cookie_for_domain(&context.state.cookie_jar, url.as_str(), None); @@ -778,7 +778,7 @@ fn test_when_cookie_received_marked_secure_is_ignored_for_http() { }; let (server, url) = make_server(handler); - let mut context = new_fetch_context(None, None); + let mut context = new_fetch_context(None, None, None); assert_cookie_for_domain(&context.state.cookie_jar, url.as_str(), None); @@ -1180,7 +1180,7 @@ fn test_redirect_from_x_to_y_provides_y_cookies_from_y() { let url_y = ServoUrl::parse(&format!("http://mozilla.org:{}/org/", port)).unwrap(); *shared_url_y_clone.lock().unwrap() = Some(url_y.clone()); - let mut context = new_fetch_context(None, None); + let mut context = new_fetch_context(None, None, None); { let mut cookie_jar = context.state.cookie_jar.write().unwrap(); let cookie_x = Cookie::new_wrapped( @@ -1290,7 +1290,7 @@ fn test_if_auth_creds_not_in_url_but_in_cache_it_sets_it() { .credentials_mode(CredentialsMode::Include) .build(); - let mut context = new_fetch_context(None, None); + let mut context = new_fetch_context(None, None, None); let auth_entry = AuthCacheEntry { user_name: "username".to_owned(), diff --git a/components/net/tests/main.rs b/components/net/tests/main.rs index 6f301692c0c8..727403026de5 100644 --- a/components/net/tests/main.rs +++ b/components/net/tests/main.rs @@ -33,6 +33,7 @@ use net::connector::{create_tls_config, ALPN_H2_H1}; use net::fetch::cors_cache::CorsCache; use net::fetch::methods::{self, CancellationListener, FetchContext}; use net::filemanager_thread::FileManager; +use net::resource_thread::CoreResourceThreadPool; use net::test::HttpState; use net_traits::request::Request; use net_traits::response::Response; @@ -42,7 +43,7 @@ use servo_arc::Arc as ServoArc; use servo_url::ServoUrl; use std::net::TcpListener as StdTcpListener; use std::path::PathBuf; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; use tokio::net::TcpListener; use tokio::reactor::Handle; use tokio::runtime::Runtime; @@ -86,15 +87,17 @@ fn create_embedder_proxy() -> EmbedderProxy { fn new_fetch_context( dc: Option>, fc: Option, + pool_handle: Option>, ) -> FetchContext { let certs = resources::read_string(Resource::SSLCertificates); let tls_config = create_tls_config(&certs, ALPN_H2_H1); let sender = fc.unwrap_or_else(|| create_embedder_proxy()); + FetchContext { state: Arc::new(HttpState::new(tls_config)), user_agent: DEFAULT_USER_AGENT.into(), devtools_chan: dc, - filemanager: FileManager::new(sender), + filemanager: FileManager::new(sender, pool_handle.unwrap_or_else(|| Weak::new())), cancellation_listener: Arc::new(Mutex::new(CancellationListener::new(None))), timing: ServoArc::new(Mutex::new(ResourceFetchTiming::new( ResourceTimingType::Navigation, @@ -113,7 +116,7 @@ impl FetchTaskTarget for FetchResponseCollector { } fn fetch(request: &mut Request, dc: Option>) -> Response { - fetch_with_context(request, &mut new_fetch_context(dc, None)) + fetch_with_context(request, &mut new_fetch_context(dc, None, None)) } fn fetch_with_context(request: &mut Request, mut context: &mut FetchContext) -> Response { @@ -133,7 +136,7 @@ fn fetch_with_cors_cache(request: &mut Request, cache: &mut CorsCache) -> Respon request, cache, &mut target, - &mut new_fetch_context(None, None), + &mut new_fetch_context(None, None, None), ); receiver.recv().unwrap()