Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimal Worker support #1476

Merged
merged 2 commits into from Jan 8, 2019
Merged
Changes from all commits
Commits
File filter
Filter file types
Jump to
Jump to file
Failed to load files.

Always

Just for now

@@ -99,6 +99,7 @@ ts_sources = [
"js/url.ts",
"js/url_search_params.ts",
"js/util.ts",
"js/workers.ts",
"js/write_file.ts",
"tsconfig.json",

@@ -19,6 +19,7 @@ import * as textEncoding from "./text_encoding";
import * as timers from "./timers";
import * as url from "./url";
import * as urlSearchParams from "./url_search_params";
import * as workers from "./workers";

// These imports are not exposed and therefore are fine to just import the
// symbols required.
@@ -86,3 +87,8 @@ window.TextEncoder = textEncoding.TextEncoder;
export type TextEncoder = textEncoding.TextEncoder;
window.TextDecoder = textEncoding.TextDecoder;
export type TextDecoder = textEncoding.TextDecoder;

window.workerMain = workers.workerMain;
// TODO These shouldn't be available in main isolate.
This conversation was marked as resolved by ry

This comment has been minimized.

@piscisaureus

piscisaureus Jan 8, 2019
Member

Or should they ... ? ;)

window.postMessage = workers.postMessage;
window.close = workers.workerClose;
@@ -0,0 +1,75 @@
// Copyright 2018-2019 the Deno authors. All rights reserved. MIT license.
import * as dispatch from "./dispatch";
import { libdeno } from "./libdeno";
import * as msg from "gen/msg_generated";
import * as flatbuffers from "./flatbuffers";
import { assert, log } from "./util";
import { globalEval } from "./global_eval";

export async function postMessage(data: Uint8Array): Promise<void> {
const builder = flatbuffers.createBuilder();
msg.WorkerPostMessage.startWorkerPostMessage(builder);
const inner = msg.WorkerPostMessage.endWorkerPostMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerPostMessage,
inner,
data
);
assert(baseRes != null);
}

export async function getMessage(): Promise<null | Uint8Array> {
log("getMessage");
const builder = flatbuffers.createBuilder();
msg.WorkerGetMessage.startWorkerGetMessage(builder);
const inner = msg.WorkerGetMessage.endWorkerGetMessage(builder);
const baseRes = await dispatch.sendAsync(
builder,
msg.Any.WorkerGetMessage,
inner
);
assert(baseRes != null);
assert(
msg.Any.WorkerGetMessageRes === baseRes!.innerType(),
This conversation was marked as resolved by ry

This comment has been minimized.

@piscisaureus

piscisaureus Jan 8, 2019
Member

Flatbuffers man...

`base.innerType() unexpectedly is ${baseRes!.innerType()}`
);
const res = new msg.WorkerGetMessageRes();
assert(baseRes!.inner(res) != null);

const dataArray = res.dataArray();
if (dataArray == null) {
return null;
} else {
return new Uint8Array(dataArray!);
}
}

let isClosing = false;

export function workerClose(): void {
isClosing = true;
}

export async function workerMain() {
log("workerMain");
libdeno.recv(dispatch.handleAsyncMsgFromRust);

// TODO avoid using globalEval to get Window. But circular imports if getting
// it from globals.ts
const window = globalEval("this");

while (!isClosing) {
const data = await getMessage();
if (data == null) {
log("workerMain got null message. quitting.");
break;
}
if (window["onmessage"]) {
const event = { data };
window.onmessage(event);
} else {
break;
}
}
}
@@ -15,7 +15,7 @@ macro_rules! svec {
}

#[cfg_attr(feature = "cargo-clippy", allow(stutter))]
#[derive(Debug, PartialEq, Default)]
#[derive(Clone, Debug, PartialEq, Default)]
pub struct DenoFlags {
pub help: bool,
pub log_debug: bool,
@@ -12,6 +12,7 @@ use js_errors::JSError;
use libdeno;
use permissions::DenoPermissions;

use futures::sync::mpsc as async_mpsc;
use futures::Future;
use libc::c_char;
use libc::c_void;
@@ -23,6 +24,7 @@ use std::ffi::CString;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;
use tokio;
@@ -53,6 +55,10 @@ pub struct Isolate {
pub state: Arc<IsolateState>,
}

pub type WorkerSender = async_mpsc::Sender<Buf>;
pub type WorkerReceiver = async_mpsc::Receiver<Buf>;
pub type WorkerChannels = (WorkerSender, WorkerReceiver);

// Isolate cannot be passed between threads but IsolateState can.
// IsolateState satisfies Send and Sync.
// So any state that needs to be accessed outside the main V8 thread should be
@@ -64,20 +70,35 @@ pub struct IsolateState {
pub permissions: DenoPermissions,
pub flags: flags::DenoFlags,
pub metrics: Metrics,
pub worker_channels: Option<Mutex<WorkerChannels>>,
}

impl IsolateState {
pub fn new(flags: flags::DenoFlags, argv_rest: Vec<String>) -> Self {
pub fn new(
flags: flags::DenoFlags,
argv_rest: Vec<String>,
worker_channels: Option<WorkerChannels>,
) -> Self {
let custom_root = env::var("DENO_DIR").map(|s| s.into()).ok();

Self {
dir: deno_dir::DenoDir::new(flags.reload, custom_root).unwrap(),
argv: argv_rest,
permissions: DenoPermissions::new(&flags),
flags,
metrics: Metrics::default(),
worker_channels: worker_channels.map(|wc| Mutex::new(wc)),
}
}

#[cfg(test)]
pub fn mock() -> Arc<IsolateState> {
let argv = vec![String::from("./deno"), String::from("hello.js")];
// For debugging: argv.push_back(String::from("-D"));
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
Arc::new(IsolateState::new(flags, rest_argv, None))
}

#[inline]
pub fn check_write(&self, filename: &str) -> DenoResult<()> {
self.permissions.check_write(filename)
@@ -449,10 +470,7 @@ mod tests {

#[test]
fn test_dispatch_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();

let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| {
@@ -491,9 +509,7 @@ mod tests {

#[test]
fn test_metrics_sync() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_sync);
tokio_util::init(|| {
@@ -527,9 +543,7 @@ mod tests {

#[test]
fn test_metrics_async() {
let argv = vec![String::from("./deno"), String::from("hello.js")];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();
let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = IsolateState::mock();
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, metrics_dispatch_async);
tokio_util::init(|| {
@@ -617,7 +631,7 @@ mod tests {
let argv = vec![String::from("./deno"), String::from(filename)];
let (flags, rest_argv, _) = flags::set_flags(argv).unwrap();

let state = Arc::new(IsolateState::new(flags, rest_argv));
let state = Arc::new(IsolateState::new(flags, rest_argv, None));
let snapshot = libdeno::deno_buf::empty();
let isolate = Isolate::new(snapshot, state, dispatch_sync);
tokio_util::init(|| {
@@ -47,6 +47,7 @@ pub mod snapshot;
mod tokio_util;
mod tokio_write;
pub mod version;
mod workers;

#[cfg(unix)]
mod eager_unix;
@@ -96,7 +97,7 @@ fn main() {
log::LevelFilter::Warn
});

let state = Arc::new(isolate::IsolateState::new(flags, rest_argv));
let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None));
let snapshot = snapshot::deno_snapshot();
let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch);
tokio_util::init(|| {
@@ -1,6 +1,9 @@
union Any {
Start,
StartRes,
WorkerGetMessage,
WorkerGetMessageRes,
WorkerPostMessage,
CodeFetch,
CodeFetchRes,
CodeCache,
@@ -149,6 +152,18 @@ table StartRes {
v8_version: string;
}

table WorkerGetMessage {
unused: int8;
}

table WorkerGetMessageRes {
data: [ubyte];
}

table WorkerPostMessage {
// data passed thru the zero-copy data parameter.
}

table CodeFetch {
specifier: string;
referrer: string;
ProTip! Use n and p to navigate between commits in a pull request.